Source code for components.retriever.postgres_retriever

"""Leverage a postgres database to store and retrieve documents."""

from typing import List, Optional, Any
from enum import Enum
import numpy as np
import logging

from adalflow.core.retriever import (
    Retriever,
)
from adalflow.core.embedder import Embedder

from adalflow.core.types import (
    RetrieverOutput,
    RetrieverStrQueryType,
    RetrieverStrQueriesType,
    Document,
)
from adalflow.database.sqlalchemy.sqlachemy_manager import DatabaseManager

log = logging.getLogger(__name__)


[docs] class DistanceToOperator(Enum): __doc__ = r"""Enum for the distance to operator. About pgvector: 1. L2 distance: <->, inner product (<#>), cosine distance (<=>), and L1 distance (<+>, added in 0.7.0) """ L2 = "<->" INNER_PRODUCT = ( "<#>" # cosine similarity when the vector is normalized, in range [-1, 1] ) COSINE = "<=>" # cosine distance, in range [0, 1] = 1 - cosine_similarity L1 = "<+>"
[docs] class PostgresRetriever(Retriever[Any, RetrieverStrQueryType]): __doc__ = r"""Use a postgres database to store and retrieve documents. Users can follow this example and to customize the prompt or additionally ask it to output score along with the indices. Args: top_k (Optional[int], optional): top k documents to fetch. Defaults to 1. database_url (str): the database url to connect to. Defaults to postgresql://postgres:password@localhost:5432/vector_db. References: [1] pgvector extension: https://github.com/pgvector/pgvector """ def __init__( self, embedder: Embedder, top_k: Optional[int] = 1, database_url: str = None, table_name: str = "document", distance_operator: DistanceToOperator = DistanceToOperator.INNER_PRODUCT, ): super().__init__() self.top_k = top_k self.table_name = table_name db_name = "vector_db" self.database_url = ( database_url or f"postgresql://postgres:password@localhost:5432/{db_name}" ) self.db_manager = DatabaseManager(self.database_url) self.embedder = embedder self.distance_operator = distance_operator self.db_score_prob_fun_map = { DistanceToOperator.COSINE: self._convert_cosine_distance_to_probability, DistanceToOperator.L2: self._convert_l2_distance_to_probability, DistanceToOperator.INNER_PRODUCT: self._convert_cosine_similarity_to_probability, } self.score_prob_fun = ( self.db_score_prob_fun_map[self.distance_operator] if self.distance_operator in self.db_score_prob_fun_map else None )
[docs] @classmethod def format_vector_search_query( cls, table_name: str, vector_column: str, query_embedding: List[float], top_k: int, distance_operator: DistanceToOperator, sort_desc: bool = True, ) -> str: """ Formats a SQL query string to select all columns from a table, order the results by the distance or similarity score to a provided embedding, and also return that score. Args: table_name (str): The name of the table to query. column (str): The name of the column containing the vector data. query_embedding (list or str): The embedding vector to compare against. top_k (int): The number of top results to return. Returns: str: A formatted SQL query string that includes the score. """ # Convert the list embedding to a string format suitable for SQL if isinstance(query_embedding, list): embedding_str = str(query_embedding).replace( " ", "" ) # Remove spaces for cleaner SQL else: embedding_str = query_embedding # Determine sorting order order_by = "DESC" if sort_desc else "ASC" # SQL query that includes the score in the selected columns sql_query = f""" SELECT *, ({vector_column} {distance_operator.value} '{embedding_str}') AS score FROM {table_name} ORDER BY score {order_by} LIMIT {top_k}; """ return sql_query
[docs] def retrieve_by_sql(self, query: str) -> List[str]: """Retrieve documents from the postgres database.""" results = self.db_manager.execute_query(query) print(results) return results
def _convert_cosine_similarity_to_probability( self, cosine_similarity: List[float] ) -> List[float]: """Convert cosine similarity to probability.""" return [(1 + cosine_similarity) / 2 for cosine_similarity in cosine_similarity] def _convert_l2_distance_to_probability( self, l2_distance: List[float] ) -> List[float]: """Convert L2 distance to probability. note: Ensure the vector is normalized so that the l2_distance will be in range [0, 2] """ distance = np.array(l2_distance) # clip to ensure the distance is in range [0, 2] distance = np.clip(distance, 0, 2) # convert to probability prob_score = 1 - distance / 2 return prob_score.tolist() def _convert_cosine_distance_to_probability( self, cosine_distance: List[float] ) -> List[float]: """Convert cosine distance to probability.""" return [(1 - cosine_distance) for cosine_distance in cosine_distance]
[docs] def call( self, input: RetrieverStrQueriesType, top_k: Optional[int] = None, **kwargs, ) -> List[RetrieverOutput]: top_k = top_k or self.top_k queries: List[str] = input if isinstance(input, list) else [input] retrieved_outputs: List[RetrieverOutput] = [] queries_embeddings = self.embedder(queries) sort_desc = False if ( self.distance_operator == DistanceToOperator.INNER_PRODUCT ): # cosine similarity sort_desc = True for idx, query in enumerate(queries): query_embedding = queries_embeddings.data[idx].embedding query_str = self.format_vector_search_query( table_name=self.table_name, vector_column="vector", query_embedding=query_embedding, top_k=top_k, distance_operator=self.distance_operator, sort_desc=sort_desc, ) retrieved_documents = self.retrieve_by_sql(query_str) doc_indices = [doc["id"] for doc in retrieved_documents] doc_scores = [doc["score"] for doc in retrieved_documents] doc_scores_prob = ( self.score_prob_fun(doc_scores) if self.score_prob_fun else None ) documents: List[Document] = [] for doc in retrieved_documents: documents.append(Document.from_dict(doc)) retrieved_outputs.append( RetrieverOutput( doc_indices=doc_indices, doc_scores=doc_scores_prob if doc_scores_prob else doc_scores, query=query, documents=documents, ) ) return retrieved_outputs
# if __name__ == "__main__": # from adalflow.core.embedder import Embedder # from adalflow.core.types import Document # from adalflow.database.sqlalchemy.pipeline.default_config import default_config # from adalflow.utils import setup_env # noqa # documents = [ # { # "meta_data": {"title": "Li Yin's profile"}, # "text": "My name is Li Yin, I love rock climbing" # + "lots of nonsense text" * 500, # "id": "doc1", # }, # { # "meta_data": {"title": "Interviewing Li Yin"}, # "text": "lots of more nonsense text" * 250 # + "Li Yin is a software developer and AI researcher" # + "lots of more nonsense text" * 250, # "id": "doc2", # }, # ] # db_name = "vector_db" # postgres_url = f"postgresql://postgres:password@localhost:5432/{db_name}" # vector_config = default_config["to_embeddings"]["component_config"]["embedder"][ # "component_config" # ] # eb = Embedder.from_config(vector_config) # pr = PostgresRetriever( # embedder=eb, # database_url=postgres_url, # top_k=2, # distance_operator=DistanceToOperator.INNER_PRODUCT, # ) # output = pr("What did the author do?") # print(output)