This commit is contained in:
Nathan Lenas 2024-11-28 15:02:58 +01:00 committed by GitHub
commit 494bb9eea9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 136 additions and 24 deletions

View file

@ -40,7 +40,12 @@ class BaseIngestComponent(abc.ABC):
self.transformations = transformations
@abc.abstractmethod
def ingest(self, file_name: str, file_data: Path) -> list[Document]:
def ingest(
self,
file_name: str,
file_data: Path,
file_metadata: dict[str, Any] | None = None,
) -> list[Document]:
pass
@abc.abstractmethod
@ -117,9 +122,16 @@ class SimpleIngestComponent(BaseIngestComponentWithIndex):
) -> None:
super().__init__(storage_context, embed_model, transformations, *args, **kwargs)
def ingest(self, file_name: str, file_data: Path) -> list[Document]:
def ingest(
self,
file_name: str,
file_data: Path,
file_metadata: dict[str, Any] | None = None,
) -> list[Document]:
logger.info("Ingesting file_name=%s", file_name)
documents = IngestionHelper.transform_file_into_documents(file_name, file_data)
documents = IngestionHelper.transform_file_into_documents(
file_name, file_data, file_metadata
)
logger.info(
"Transformed file=%s into count=%s documents", file_name, len(documents)
)
@ -175,9 +187,16 @@ class BatchIngestComponent(BaseIngestComponentWithIndex):
processes=self.count_workers
)
def ingest(self, file_name: str, file_data: Path) -> list[Document]:
def ingest(
self,
file_name: str,
file_data: Path,
file_metadata: dict[str, Any] | None = None,
) -> list[Document]:
logger.info("Ingesting file_name=%s", file_name)
documents = IngestionHelper.transform_file_into_documents(file_name, file_data)
documents = IngestionHelper.transform_file_into_documents(
file_name, file_data, file_metadata
)
logger.info(
"Transformed file=%s into count=%s documents", file_name, len(documents)
)
@ -185,6 +204,7 @@ class BatchIngestComponent(BaseIngestComponentWithIndex):
return self._save_docs(documents)
def bulk_ingest(self, files: list[tuple[str, Path]]) -> list[Document]:
documents = list(
itertools.chain.from_iterable(
self._file_to_documents_work_pool.starmap(
@ -257,12 +277,18 @@ class ParallelizedIngestComponent(BaseIngestComponentWithIndex):
processes=self.count_workers
)
def ingest(self, file_name: str, file_data: Path) -> list[Document]:
def ingest(
self,
file_name: str,
file_data: Path,
file_metadata: dict[str, Any] | None = None,
) -> list[Document]:
logger.info("Ingesting file_name=%s", file_name)
# Running in a single (1) process to release the current
# thread, and take a dedicated CPU core for computation
documents = self._file_to_documents_work_pool.apply(
IngestionHelper.transform_file_into_documents, (file_name, file_data)
IngestionHelper.transform_file_into_documents,
(file_name, file_data, file_metadata),
)
logger.info(
"Transformed file=%s into count=%s documents", file_name, len(documents)
@ -271,9 +297,9 @@ class ParallelizedIngestComponent(BaseIngestComponentWithIndex):
return self._save_docs(documents)
def bulk_ingest(self, files: list[tuple[str, Path]]) -> list[Document]:
# Lightweight threads, used for parallelize the
# underlying IO calls made in the ingestion
documents = list(
itertools.chain.from_iterable(
self._ingest_work_pool.starmap(self.ingest, files)
@ -459,8 +485,15 @@ class PipelineIngestComponent(BaseIngestComponentWithIndex):
self.node_q.put(("flush", None, None, None))
self.node_q.join()
def ingest(self, file_name: str, file_data: Path) -> list[Document]:
documents = IngestionHelper.transform_file_into_documents(file_name, file_data)
def ingest(
self,
file_name: str,
file_data: Path,
file_metadata: dict[str, Any] | None = None,
) -> list[Document]:
documents = IngestionHelper.transform_file_into_documents(
file_name, file_data, file_metadata
)
self.doc_q.put(("process", file_name, documents))
self._flush()
return documents