Register
Login
Resources
Docs Blog Datasets Glossary Case Studies Tutorials & Webinars
Product
Data Engine LLMs Platform Enterprise
Pricing Explore
Connect to our Discord channel

ingest.py 5.0 KB

You have to be logged in to leave a comment. Sign In
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
  1. """
  2. PDF document ingestion script for RAG implementation.
  3. Loads PDF files from a remote source in parallel, splits them into chunks,
  4. and stores them in a Chroma vector database.
  5. """
  6. from __future__ import annotations
  7. import logging
  8. import os
  9. from concurrent.futures import ThreadPoolExecutor, as_completed
  10. from typing import Dict, List, Optional, Tuple
  11. from urllib.parse import quote
  12. from langchain.docstore.document import Document
  13. from langchain.text_splitter import RecursiveCharacterTextSplitter
  14. from langchain_chroma import Chroma
  15. from langchain_community.document_loaders import PyPDFLoader
  16. from langchain_openai import OpenAIEmbeddings
  17. from tqdm import tqdm
  18. # Configure logging
  19. logging.basicConfig(
  20. level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
  21. )
  22. # Constants
  23. OPENAI_API_KEY: Optional[str] = os.getenv("OPENAI_API_KEY")
  24. if not OPENAI_API_KEY:
  25. raise ValueError("OPENAI_API_KEY environment variable is not set")
  26. CHROMA_PATH: str = "db"
  27. BASE_URL: str = "https://storage.googleapis.com/promptfoo-public-1/examples/rag-sec/"
  28. CHUNK_SIZE: int = 500
  29. CHUNK_OVERLAP: int = 50
  30. MAX_WORKERS: int = 5
  31. OPENAI_AI_EMBEDDING_MODEL: str = "text-embedding-3-large"
  32. # List of PDF files to process
  33. PDF_FILES: List[str] = [
  34. "2022 Q3 AAPL.pdf",
  35. "2022 Q3 AMZN.pdf",
  36. "2022 Q3 INTC.pdf",
  37. "2022 Q3 MSFT.pdf",
  38. "2022 Q3 NVDA.pdf",
  39. "2023 Q1 AAPL.pdf",
  40. "2023 Q1 AMZN.pdf",
  41. "2023 Q1 INTC.pdf",
  42. "2023 Q1 MSFT.pdf",
  43. "2023 Q1 NVDA.pdf",
  44. "2023 Q2 AAPL.pdf",
  45. "2023 Q2 AMZN.pdf",
  46. "2023 Q2 INTC.pdf",
  47. "2023 Q2 MSFT.pdf",
  48. "2023 Q2 NVDA.pdf",
  49. "2023 Q3 AAPL.pdf",
  50. "2023 Q3 AMZN.pdf",
  51. "2023 Q3 INTC.pdf",
  52. "2023 Q3 MSFT.pdf",
  53. "2023 Q3 NVDA.pdf",
  54. ]
  55. def process_single_pdf(pdf_file: str) -> Tuple[str, List[Document]]:
  56. """
  57. Process a single PDF file and return its chunks.
  58. Args:
  59. pdf_file: Name of the PDF file to process
  60. Returns:
  61. Tuple containing filename and list of document chunks
  62. """
  63. doc_url: str = BASE_URL + quote(pdf_file)
  64. try:
  65. loader: PyPDFLoader = PyPDFLoader(doc_url)
  66. pages: List[Document] = loader.load()
  67. text_splitter: RecursiveCharacterTextSplitter = RecursiveCharacterTextSplitter(
  68. chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP
  69. )
  70. chunks: List[Document] = text_splitter.split_documents(pages)
  71. return pdf_file, chunks
  72. except Exception as e:
  73. logging.error(f"Error processing {pdf_file}: {str(e)}")
  74. return pdf_file, []
  75. def process_pdfs() -> List[Document]:
  76. """
  77. Process PDF files from the remote source in parallel and split them into chunks.
  78. Returns:
  79. List of document chunks
  80. """
  81. all_chunks: List[Document] = []
  82. with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
  83. # Submit all PDF processing tasks
  84. future_to_pdf: Dict[
  85. concurrent.futures.Future[Tuple[str, List[Document]]], str
  86. ] = {
  87. executor.submit(process_single_pdf, pdf_file): pdf_file
  88. for pdf_file in PDF_FILES
  89. }
  90. # Process completed tasks with progress bar
  91. with tqdm(total=len(PDF_FILES), desc="Processing PDFs") as pbar:
  92. for future in as_completed(future_to_pdf):
  93. pdf_file: str = future_to_pdf[future]
  94. try:
  95. _, chunks = future.result()
  96. all_chunks.extend(chunks)
  97. pbar.update(1)
  98. except Exception as e:
  99. logging.error(f"Failed to process {pdf_file}: {str(e)}")
  100. pbar.update(1)
  101. logging.info(f"Processed {len(all_chunks)} chunks from {len(PDF_FILES)} files")
  102. return all_chunks
  103. def create_vector_store(chunks: List[Document], batch_size: int = 100) -> None:
  104. """
  105. Create and persist the vector store from document chunks in batches.
  106. Args:
  107. chunks: List of document chunks to embed
  108. batch_size: Number of documents to process in each batch
  109. """
  110. embeddings: OpenAIEmbeddings = OpenAIEmbeddings(
  111. model=OPENAI_AI_EMBEDDING_MODEL, openai_api_key=OPENAI_API_KEY
  112. )
  113. logging.info("Creating vector store...")
  114. # Process first batch
  115. current_batch: List[Document] = chunks[:batch_size]
  116. db: Chroma = Chroma.from_documents(
  117. current_batch,
  118. embeddings,
  119. persist_directory=CHROMA_PATH,
  120. collection_name="rag_collection",
  121. )
  122. # Process remaining batches
  123. with tqdm(
  124. total=len(chunks), initial=batch_size, desc="Embedding documents"
  125. ) as pbar:
  126. for i in range(batch_size, len(chunks), batch_size):
  127. current_batch = chunks[i : i + batch_size]
  128. db.add_documents(current_batch)
  129. pbar.update(len(current_batch))
  130. logging.info(f"Vector store created and persisted to {CHROMA_PATH}")
  131. def main() -> None:
  132. """Main execution function."""
  133. chunks: List[Document] = process_pdfs()
  134. if chunks:
  135. create_vector_store(chunks)
  136. if __name__ == "__main__":
  137. main()
Tip!

Press p or to see the previous file or, n or to see the next file

Comments

Loading...