feat: Add optional metadata param to ingest routes

This commit is contained in:
Nathan Lenas 2024-07-23 08:50:54 +02:00
parent b62669784b
commit d559d54e1a
6 changed files with 76 additions and 32 deletions

View file

@ -40,11 +40,11 @@ class BaseIngestComponent(abc.ABC):
self.transformations = transformations
@abc.abstractmethod
def ingest(self, file_name: str, file_data: Path) -> list[Document]:
def ingest(self, file_name: str, file_data: Path, file_metadata : dict | None = None) -> list[Document]:
pass
@abc.abstractmethod
def bulk_ingest(self, files: list[tuple[str, Path]]) -> list[Document]:
def bulk_ingest(self, files: list[tuple[str, Path]], metadata : dict | None = None) -> list[Document]:
pass
@abc.abstractmethod
@ -117,20 +117,20 @@ class SimpleIngestComponent(BaseIngestComponentWithIndex):
) -> None:
super().__init__(storage_context, embed_model, transformations, *args, **kwargs)
def ingest(self, file_name: str, file_data: Path) -> list[Document]:
def ingest(self, file_name: str, file_data: Path, file_metadata : dict | None = None) -> list[Document]:
logger.info("Ingesting file_name=%s", file_name)
documents = IngestionHelper.transform_file_into_documents(file_name, file_data)
documents = IngestionHelper.transform_file_into_documents(file_name, file_data, file_metadata)
logger.info(
"Transformed file=%s into count=%s documents", file_name, len(documents)
)
logger.debug("Saving the documents in the index and doc store")
return self._save_docs(documents)
def bulk_ingest(self, files: list[tuple[str, Path]]) -> list[Document]:
def bulk_ingest(self, files: list[tuple[str, Path]], metadata : dict | None = None) -> list[Document]:
saved_documents = []
for file_name, file_data in files:
documents = IngestionHelper.transform_file_into_documents(
file_name, file_data
file_name, file_data, metadata
)
saved_documents.extend(self._save_docs(documents))
return saved_documents
@ -175,20 +175,20 @@ class BatchIngestComponent(BaseIngestComponentWithIndex):
processes=self.count_workers
)
def ingest(self, file_name: str, file_data: Path) -> list[Document]:
def ingest(self, file_name: str, file_data: Path, file_metadata : dict | None = None) -> list[Document]:
logger.info("Ingesting file_name=%s", file_name)
documents = IngestionHelper.transform_file_into_documents(file_name, file_data)
documents = IngestionHelper.transform_file_into_documents(file_name, file_data, file_metadata)
logger.info(
"Transformed file=%s into count=%s documents", file_name, len(documents)
)
logger.debug("Saving the documents in the index and doc store")
return self._save_docs(documents)
def bulk_ingest(self, files: list[tuple[str, Path]]) -> list[Document]:
def bulk_ingest(self, files: list[tuple[str, Path]], metadata : dict | None = None) -> list[Document]:
documents = list(
itertools.chain.from_iterable(
self._file_to_documents_work_pool.starmap(
IngestionHelper.transform_file_into_documents, files
IngestionHelper.transform_file_into_documents, files, metadata
)
)
)
@ -257,12 +257,12 @@ class ParallelizedIngestComponent(BaseIngestComponentWithIndex):
processes=self.count_workers
)
def ingest(self, file_name: str, file_data: Path) -> list[Document]:
def ingest(self, file_name: str, file_data: Path, file_metadata : dict | None = None) -> list[Document]:
logger.info("Ingesting file_name=%s", file_name)
# Running in a single (1) process to release the current
# thread, and take a dedicated CPU core for computation
documents = self._file_to_documents_work_pool.apply(
IngestionHelper.transform_file_into_documents, (file_name, file_data)
IngestionHelper.transform_file_into_documents, (file_name, file_data, file_metadata)
)
logger.info(
"Transformed file=%s into count=%s documents", file_name, len(documents)
@ -270,13 +270,13 @@ class ParallelizedIngestComponent(BaseIngestComponentWithIndex):
logger.debug("Saving the documents in the index and doc store")
return self._save_docs(documents)
def bulk_ingest(self, files: list[tuple[str, Path]]) -> list[Document]:
def bulk_ingest(self, files: list[tuple[str, Path]], metadata : dict | None = None) -> list[Document]:
# Lightweight threads, used for parallelize the
# underlying IO calls made in the ingestion
documents = list(
itertools.chain.from_iterable(
self._ingest_work_pool.starmap(self.ingest, files)
self._ingest_work_pool.starmap(self.ingest, files, metadata)
)
)
return documents
@ -459,18 +459,18 @@ class PipelineIngestComponent(BaseIngestComponentWithIndex):
self.node_q.put(("flush", None, None, None))
self.node_q.join()
def ingest(self, file_name: str, file_data: Path) -> list[Document]:
documents = IngestionHelper.transform_file_into_documents(file_name, file_data)
def ingest(self, file_name: str, file_data: Path, file_metadata : dict | None = None) -> list[Document]:
documents = IngestionHelper.transform_file_into_documents(file_name, file_data, file_metadata)
self.doc_q.put(("process", file_name, documents))
self._flush()
return documents
def bulk_ingest(self, files: list[tuple[str, Path]]) -> list[Document]:
def bulk_ingest(self, files: list[tuple[str, Path]], metadata : dict | None = None) -> list[Document]:
docs = []
for file_name, file_data in eta(files):
try:
documents = IngestionHelper.transform_file_into_documents(
file_name, file_data
file_name, file_data, metadata
)
self.doc_q.put(("process", file_name, documents))
docs.extend(documents)