fix: specify dict type, fix bulk ingestion with metadata

This commit is contained in:
Nathan Lenas 2024-07-23 09:18:27 +02:00
parent 8863154baa
commit 50388f6a33
6 changed files with 112 additions and 42 deletions

View file

@ -40,11 +40,18 @@ class BaseIngestComponent(abc.ABC):
self.transformations = transformations
@abc.abstractmethod
def ingest(self, file_name: str, file_data: Path, file_metadata : dict | None = None) -> list[Document]:
def ingest(
self,
file_name: str,
file_data: Path,
file_metadata: dict[str, str] | None = None,
) -> list[Document]:
pass
@abc.abstractmethod
def bulk_ingest(self, files: list[tuple[str, Path]], metadata : dict | None = None) -> list[Document]:
def bulk_ingest(
self, files: list[tuple[str, Path]], metadata: dict[str, str] | None = None
) -> list[Document]:
pass
@abc.abstractmethod
@ -117,16 +124,25 @@ class SimpleIngestComponent(BaseIngestComponentWithIndex):
) -> None:
super().__init__(storage_context, embed_model, transformations, *args, **kwargs)
def ingest(self, file_name: str, file_data: Path, file_metadata : dict | None = None) -> list[Document]:
def ingest(
self,
file_name: str,
file_data: Path,
file_metadata: dict[str, str] | None = None,
) -> list[Document]:
logger.info("Ingesting file_name=%s", file_name)
documents = IngestionHelper.transform_file_into_documents(file_name, file_data, file_metadata)
documents = IngestionHelper.transform_file_into_documents(
file_name, file_data, file_metadata
)
logger.info(
"Transformed file=%s into count=%s documents", file_name, len(documents)
)
logger.debug("Saving the documents in the index and doc store")
return self._save_docs(documents)
def bulk_ingest(self, files: list[tuple[str, Path]], metadata : dict | None = None) -> list[Document]:
def bulk_ingest(
self, files: list[tuple[str, Path]], metadata: dict[str, str] | None = None
) -> list[Document]:
saved_documents = []
for file_name, file_data in files:
documents = IngestionHelper.transform_file_into_documents(
@ -175,20 +191,32 @@ class BatchIngestComponent(BaseIngestComponentWithIndex):
processes=self.count_workers
)
def ingest(self, file_name: str, file_data: Path, file_metadata : dict | None = None) -> list[Document]:
def ingest(
self,
file_name: str,
file_data: Path,
file_metadata: dict[str, str] | None = None,
) -> list[Document]:
logger.info("Ingesting file_name=%s", file_name)
documents = IngestionHelper.transform_file_into_documents(file_name, file_data, file_metadata)
documents = IngestionHelper.transform_file_into_documents(
file_name, file_data, file_metadata
)
logger.info(
"Transformed file=%s into count=%s documents", file_name, len(documents)
)
logger.debug("Saving the documents in the index and doc store")
return self._save_docs(documents)
def bulk_ingest(self, files: list[tuple[str, Path]], metadata : dict | None = None) -> list[Document]:
def bulk_ingest(
self, files: list[tuple[str, Path]], metadata: dict[str, str] | None = None
) -> list[Document]:
# Pair the files with the metadata
args = [(file_tuple, metadata) for file_tuple in files]
documents = list(
itertools.chain.from_iterable(
self._file_to_documents_work_pool.starmap(
IngestionHelper.transform_file_into_documents, files, metadata
IngestionHelper.transform_file_into_documents, args
)
)
)
@ -257,12 +285,18 @@ class ParallelizedIngestComponent(BaseIngestComponentWithIndex):
processes=self.count_workers
)
def ingest(self, file_name: str, file_data: Path, file_metadata : dict | None = None) -> list[Document]:
def ingest(
self,
file_name: str,
file_data: Path,
file_metadata: dict[str, str] | None = None,
) -> list[Document]:
logger.info("Ingesting file_name=%s", file_name)
# Running in a single (1) process to release the current
# thread, and take a dedicated CPU core for computation
documents = self._file_to_documents_work_pool.apply(
IngestionHelper.transform_file_into_documents, (file_name, file_data, file_metadata)
IngestionHelper.transform_file_into_documents,
(file_name, file_data, file_metadata),
)
logger.info(
"Transformed file=%s into count=%s documents", file_name, len(documents)
@ -270,13 +304,16 @@ class ParallelizedIngestComponent(BaseIngestComponentWithIndex):
logger.debug("Saving the documents in the index and doc store")
return self._save_docs(documents)
def bulk_ingest(self, files: list[tuple[str, Path]], metadata : dict | None = None) -> list[Document]:
def bulk_ingest(
self, files: list[tuple[str, Path]], metadata: dict[str, str] | None = None
) -> list[Document]:
args = [(file_tuple, metadata) for file_tuple in files]
# Lightweight threads, used for parallelize the
# underlying IO calls made in the ingestion
documents = list(
itertools.chain.from_iterable(
self._ingest_work_pool.starmap(self.ingest, files, metadata)
self._ingest_work_pool.starmap(self.ingest, args)
)
)
return documents
@ -459,13 +496,22 @@ class PipelineIngestComponent(BaseIngestComponentWithIndex):
self.node_q.put(("flush", None, None, None))
self.node_q.join()
def ingest(self, file_name: str, file_data: Path, file_metadata : dict | None = None) -> list[Document]:
documents = IngestionHelper.transform_file_into_documents(file_name, file_data, file_metadata)
def ingest(
self,
file_name: str,
file_data: Path,
file_metadata: dict[str, str] | None = None,
) -> list[Document]:
documents = IngestionHelper.transform_file_into_documents(
file_name, file_data, file_metadata
)
self.doc_q.put(("process", file_name, documents))
self._flush()
return documents
def bulk_ingest(self, files: list[tuple[str, Path]], metadata : dict | None = None) -> list[Document]:
def bulk_ingest(
self, files: list[tuple[str, Path]], metadata: dict[str, str] | None = None
) -> list[Document]:
docs = []
for file_name, file_data in eta(files):
try: