Source code for components.data_process.data_components
"""Helper components for data transformation such as embeddings and document splitting."""fromcopyimportdeepcopyfromtypingimportList,TypeVar,Sequence,Union,Dict,Anyfromtqdmimporttqdmfromadalflow.core.componentimportDataComponentfromadalflow.core.typesimport(Document,RetrieverOutput,)fromadalflow.core.embedderimport(BatchEmbedder,BatchEmbedderOutputType,BatchEmbedderInputType,Embedder,)T=TypeVar("T")__all__=["ToEmbeddings","RetrieverOutputToContextStr","retriever_output_to_context_str",]# TODO: make the GeneratorOutput include the token usage too.
[docs]defretriever_output_to_context_str(retriever_output:Union[RetrieverOutput,List[RetrieverOutput]],deduplicate:bool=False,)->str:r"""The retrieved documents from one or multiple queries. Deduplicate is especially helpful when you used query expansion. """""" How to combine your retrieved chunks into the context is highly dependent on your use case. If you used query expansion, you might want to deduplicate the chunks. """chunks_to_use:List[Document]=[]context_str=""sep=" "ifisinstance(retriever_output,RetrieverOutput):chunks_to_use=retriever_output.documentselse:foroutputinretriever_output:chunks_to_use.extend(output.documents)ifdeduplicate:unique_chunks_ids=set([chunk.idforchunkinchunks_to_use])# id and if it is used, it will be Trueused_chunk_in_context_str:Dict[Any,bool]={id:Falseforidinunique_chunks_ids}forchunkinchunks_to_use:ifnotused_chunk_in_context_str[chunk.id]:context_str+=sep+chunk.textused_chunk_in_context_str[chunk.id]=Trueelse:context_str=sep.join([chunk.textforchunkinchunks_to_use])returncontext_str
"""For now these are the data transformation components"""ToEmbeddingsInputType=Sequence[Document]ToEmbeddingsOutputType=Sequence[Document]
[docs]classToEmbeddings(DataComponent):r"""It transforms a Sequence of Chunks or Documents to a List of Embeddings. It operates on a copy of the input data, and does not modify the input data. """def__init__(self,embedder:Embedder,batch_size:int=50)->None:super().__init__(batch_size=batch_size)self.embedder=embedderself.batch_size=batch_sizeself.batch_embedder=BatchEmbedder(embedder=embedder,batch_size=batch_size)def__call__(self,input:ToEmbeddingsInputType)->ToEmbeddingsOutputType:output=deepcopy(input)# convert documents to a list of stringsembedder_input:BatchEmbedderInputType=[chunk.textforchunkinoutput]outputs:BatchEmbedderOutputType=self.batch_embedder(input=embedder_input)# put them back to the original order along with its queryforbatch_idx,batch_outputintqdm(enumerate(outputs),desc="Adding embeddings to documents from batch"):foridx,embeddinginenumerate(batch_output.data):output[batch_idx*self.batch_size+idx].vector=embedding.embeddingreturnoutputdef_extra_repr(self)->str:s=f"batch_size={self.batch_size}"returns
[docs]classRetrieverOutputToContextStr(DataComponent):r""" Wrap on functional F.retriever_output_to_context_str """def__init__(self,deduplicate:bool=False):super().__init__()self.deduplicate=deduplicatedef__call__(self,input:Union[RetrieverOutput,List[RetrieverOutput]],)->str:returnretriever_output_to_context_str(retriever_output=input,deduplicate=self.deduplicate)def_extra_repr(self)->str:s=f"deduplicate={self.deduplicate}"returns