Source code for components.data_process.data_components

"""Helper components for data transformation such as embeddings and document splitting."""

from copy import deepcopy
from typing import List, TypeVar, Sequence, Union, Dict, Any
from tqdm import tqdm


from adalflow.core.component import Component

from adalflow.core.types import (
    Document,
    RetrieverOutput,
)
from adalflow.core.embedder import (
    BatchEmbedder,
    BatchEmbedderOutputType,
    BatchEmbedderInputType,
    Embedder,
)


T = TypeVar("T")
__all__ = [
    "ToEmbeddings",
    "RetrieverOutputToContextStr",
    "retriever_output_to_context_str",
]

# TODO: make the GeneratorOutput include the token usage too.


[docs] def retriever_output_to_context_str( retriever_output: Union[RetrieverOutput, List[RetrieverOutput]], deduplicate: bool = False, ) -> str: r"""The retrieved documents from one or multiple queries. Deduplicate is especially helpful when you used query expansion. """ """ How to combine your retrieved chunks into the context is highly dependent on your use case. If you used query expansion, you might want to deduplicate the chunks. """ chunks_to_use: List[Document] = [] context_str = "" sep = " " if isinstance(retriever_output, RetrieverOutput): chunks_to_use = retriever_output.documents else: for output in retriever_output: chunks_to_use.extend(output.documents) if deduplicate: unique_chunks_ids = set([chunk.id for chunk in chunks_to_use]) # id and if it is used, it will be True used_chunk_in_context_str: Dict[Any, bool] = { id: False for id in unique_chunks_ids } for chunk in chunks_to_use: if not used_chunk_in_context_str[chunk.id]: context_str += sep + chunk.text used_chunk_in_context_str[chunk.id] = True else: context_str = sep.join([chunk.text for chunk in chunks_to_use]) return context_str
""" For now these are the data transformation components """ ToEmbeddingsInputType = Sequence[Document] ToEmbeddingsOutputType = Sequence[Document]
[docs] class ToEmbeddings(Component): r"""It transforms a Sequence of Chunks or Documents to a List of Embeddings. It operates on a copy of the input data, and does not modify the input data. """ def __init__(self, embedder: Embedder, batch_size: int = 50) -> None: super().__init__(batch_size=batch_size) self.embedder = embedder self.batch_size = batch_size self.batch_embedder = BatchEmbedder(embedder=embedder, batch_size=batch_size) def __call__(self, input: ToEmbeddingsInputType) -> ToEmbeddingsOutputType: output = deepcopy(input) # convert documents to a list of strings embedder_input: BatchEmbedderInputType = [chunk.text for chunk in output] outputs: BatchEmbedderOutputType = self.batch_embedder(input=embedder_input) # n them back to the original order along with its query for batch_idx, batch_output in tqdm( enumerate(outputs), desc="Adding embeddings to documents from batch" ): for idx, embedding in enumerate(batch_output.data): output[batch_idx * self.batch_size + idx].vector = embedding.embedding return output def _extra_repr(self) -> str: s = f"batch_size={self.batch_size}" return s
[docs] class RetrieverOutputToContextStr(Component): r""" Wrap on functional F.retriever_output_to_context_str """ def __init__(self, deduplicate: bool = False): super().__init__() self.deduplicate = deduplicate def __call__( self, input: Union[RetrieverOutput, List[RetrieverOutput]], ) -> str: return retriever_output_to_context_str( retriever_output=input, deduplicate=self.deduplicate ) def _extra_repr(self) -> str: s = f"deduplicate={self.deduplicate}" return s