ModelClient#
ModelClient is the standardized protocol and base class for all model inference SDKs (either via APIs or local) to communicate with AdalFlow internal components.
Therefore, by switching out the ModelClient
in a Generator
, Embedder
, or Retriever
(those components that take models), you can make these functional components model-agnostic.
Note
All users are encouraged to customize their own ModelClient
whenever needed. You can refer to our code in components.model_client
directory.
Model Inference SDKs#
With cloud API providers like OpenAI, Groq, and Anthropic, it often comes with a sync and an async client via their SDKs. For example:
from openai import OpenAI, AsyncOpenAI
sync_client = OpenAI()
async_client = AsyncOpenAI()
# sync call using APIs
response = sync_client.chat.completions.create(...)
For local models, such as using huggingface transformers, you need to create these model inference SDKs yourself.
How you do this is highly flexible.
Here is an example of using a local embedding model (e.g., thenlper/gte-base
) as a model (Refer to TransformerEmbedder
for details).
It really is just normal model inference code.
ModelClient Protocol#
A model client can be used to manage different types of models, we defined a ModelType
to categorize the model type.
class ModelType(Enum):
EMBEDDER = auto()
LLM = auto()
RERANKER = auto()
UNDEFINED = auto()
We designed 6 abstract methods in the ModelClient class that can be implemented by subclasses to integrate with different model inference SDKs.
We will use OpenAIClient
as the cloud API example and TransformersClient
along with the local inference code TransformerEmbedder
as an example for local model clients.
First, we offer two methods, init_async_client and init_sync_client, for subclasses to initialize the SDK client.
You can refer to OpenAIClient
to see how these methods, along with the __init__ method, are implemented:
This is how TransformerClient
does the same thing:
class TransformersClient(ModelClient):
def __init__(self) -> None:
super().__init__()
self.sync_client = self.init_sync_client()
self.async_client = None
support_model_list = {
"thenlper/gte-base": {
"type": ModelType.EMBEDDER,
}
}
def init_sync_client(self):
return TransformerEmbedder()
Second, we use convert_inputs_to_api_kwargs for subclasses to convert AdalFlow inputs into the api_kwargs (SDK arguments).
def convert_inputs_to_api_kwargs(
self,
input: Optional[Any] = None,
model_kwargs: Dict = {},
model_type: ModelType = ModelType.UNDEFINED,
) -> Dict:
raise NotImplementedError(
f"{type(self).__name__} must implement _combine_input_and_model_kwargs method"
)
This is how OpenAIClient implements this method:
def convert_inputs_to_api_kwargs(
self,
input: Optional[Any] = None,
model_kwargs: Dict = {},
model_type: ModelType = ModelType.UNDEFINED,
) -> Dict:
final_model_kwargs = model_kwargs.copy()
if model_type == ModelType.EMBEDDER:
if isinstance(input, str):
input = [input]
# convert input to input
assert isinstance(input, Sequence), "input must be a sequence of text"
final_model_kwargs["input"] = input
elif model_type == ModelType.LLM:
messages: List[Dict[str, str]] = []
if input is not None and input != "":
messages.append({"role": "system", "content": input})
assert isinstance(
messages, Sequence
), "input must be a sequence of messages"
final_model_kwargs["messages"] = messages
else:
raise ValueError(f"model_type {model_type} is not supported")
return final_model_kwargs
For embedding, as Embedder
takes both str and List[str] as input, we need to convert the input to a list of strings that is acceptable by the SDK.
For LLM, as Generator
will takes a prompt_kwargs`(dict) and convert it into a single string, thus we need to convert the input to a list of messages.
For Rerankers, you can refer to :class:`CohereAPIClient<components.model_client.cohere_client.CohereAPIClient> for an example.
This is how TransformerClient
does the same thing:
def convert_inputs_to_api_kwargs(
self,
input: Any,
model_kwargs: dict = {},
model_type: ModelType = ModelType.UNDEFINED,
) -> dict:
final_model_kwargs = model_kwargs.copy()
if model_type == ModelType.EMBEDDER:
final_model_kwargs["input"] = input
return final_model_kwargs
else:
raise ValueError(f"model_type {model_type} is not supported")
In addition, you can add any method that parses the SDK-specific output to a format compatible with AdalFlow components.
Typically, an LLM needs to use parse_chat_completion to parse the completion to text and parse_embedding_response to parse the embedding response to a structure that AdalFlow components can understand.
You can refer to OpenAIClient
for API embedding model integration and TransformersClient
for local embedding model integration.
Lastly, the call and acall methods are used to call model inference via their own arguments. We encourage subclasses to provide error handling and retry mechanisms in these methods.
The OpenAIClient example:
def call(self, api_kwargs: Dict = {}, model_type: ModelType = ModelType.UNDEFINED):
if model_type == ModelType.EMBEDDER:
return self.sync_client.embeddings.create(**api_kwargs)
elif model_type == ModelType.LLM:
return self.sync_client.chat.completions.create(**api_kwargs)
else:
raise ValueError(f"model_type {model_type} is not supported")
The TransformerClient example:
def call(self, api_kwargs: Dict = {}, model_type: ModelType = ModelType.UNDEFINED):
return self.sync_client(**api_kwargs)
O ur library currently integrates with six providers: OpenAI, Groq, Anthropic, Huggingface, Google, and Cohere. Please check out ModelClient Integration.
Use ModelClient directly#
Though ModelClient
is often managed in a Generator
, Embedder
, or Retriever
component, you can use it directly if you plan to write your own component.
Here is an example of using OpenAIClient
directly, first on an LLM model:
from adalflow.components.model_client import OpenAIClient
from adalflow.core.types import ModelType
from adalflow.utils import setup_env
setup_env()
openai_client = OpenAIClient()
query = "What is the capital of France?"
# try LLM model
model_type = ModelType.LLM
prompt = f"User: {query}\n"
model_kwargs = {"model": "gpt-3.5-turbo", "temperature": 0.5, "max_tokens": 100}
api_kwargs = openai_client.convert_inputs_to_api_kwargs(input=prompt,
model_kwargs=model_kwargs,
model_type=model_type)
print(f"api_kwargs: {api_kwargs}")
response = openai_client.call(api_kwargs=api_kwargs, model_type=model_type)
response_text = openai_client.parse_chat_completion(response)
print(f"response_text: {response_text}")
The output will be:
api_kwargs: {'model': 'gpt-3.5-turbo', 'temperature': 0.5, 'max_tokens': 100, 'messages': [{'role': 'system', 'content': 'User: What is the capital of France?\n'}]}
response_text: The capital of France is Paris.
Then on Embedder model:
# try embedding model
model_type = ModelType.EMBEDDER
# do batch embedding
input = [query] * 2
model_kwargs = {"model": "text-embedding-3-small", "dimensions": 8, "encoding_format": "float"}
api_kwargs = openai_client.convert_inputs_to_api_kwargs(input=input, model_kwargs=model_kwargs, model_type=model_type)
print(f"api_kwargs: {api_kwargs}")
response = openai_client.call(api_kwargs=api_kwargs, model_type=model_type)
reponse_embedder_output = openai_client.parse_embedding_response(response)
print(f"reponse_embedder_output: {reponse_embedder_output}")
The output will be:
api_kwargs: {'model': 'text-embedding-3-small', 'dimensions': 8, 'encoding_format': 'float', 'input': ['What is the capital of France?', 'What is the capital of France?']}
reponse_embedder_output: EmbedderOutput(data=[Embedding(embedding=[0.6175549, 0.24047995, 0.4509756, 0.37041178, -0.33437008, -0.050995983, -0.24366009, 0.21549304], index=0), Embedding(embedding=[0.6175549, 0.24047995, 0.4509756, 0.37041178, -0.33437008, -0.050995983, -0.24366009, 0.21549304], index=1)], model='text-embedding-3-small', usage=Usage(prompt_tokens=14, total_tokens=14), error=None, raw_response=None)
OPENAI EMBEDDER - Embedding Processing Example#
In this example, we are using a collection of embeddings to demonstrate different functionalities such as calculating semantic similarity, finding nearest neighbors, and averaging embeddings. Below is the Python code used to achieve these tasks:
from typing import List
import numpy as np
from adalflow.core.types import ModelType, EmbedderOutput
from adalflow.components.model_client import OpenAIClient
from dataclasses import dataclass
from enum import Enum
from numpy.linalg import norm
Data Classes
We use two dataclass types to structure the collection and usage data:
EmbeddingCollection: Stores an individual embedding collection and its corresponding index. Usage: Keeps track of token usage, such as prompt_tokens and total_tokens.
@dataclass
class EmbeddingCollection:
collection: List[float]
cindex: int
@dataclass
class Usage:
prompt_tokens: int
total_tokens: int
The following function, get_openai_embedding, sends a request to the OpenAI API to retrieve embeddings for a given text. It sets the model type to EMBEDDER, prepares the required model-specific parameters, and processes the response:
openai_client = OpenAIClient()
def get_openai_embedding(text):
# Set model type to EMBEDDER for embedding functionality
model_type = ModelType.EMBEDDER
# Prepare input and model-specific parameters
input = text
model_kwargs = {
"model": "text-embedding-3-small",
"dimensions": 8,
"encoding_format": "float",
}
# Convert inputs to the required API format
api_kwargs = openai_client.convert_inputs_to_api_kwargs(
input=input, model_kwargs=model_kwargs, model_type=model_type
)
print(f"api_kwargs: {api_kwargs}") # Debug output to verify API arguments
# Call OpenAI API and parse response for embeddings
response = openai_client.call(api_kwargs=api_kwargs, model_type=model_type)
reponse_embedder_output = openai_client.parse_embedding_response(response)
print(
f"reponse_embedder_output: {reponse_embedder_output}"
) # Debug output to verify embeddings
return reponse_embedder_output
Embedding Processing
The function process_embeddings takes in a collection of embeddings and provides utilities for calculating similarity, averaging embeddings, and finding nearest neighbors:
Similarity: Measures the cosine similarity between two embeddings. Average Embedding: Computes the mean embedding across a set of embeddings. Nearest Neighbors: Identifies the top-k nearest neighbors based on cosine similarity.
def process_embeddings(embeddings_collection):
# Extract embedding data for each item in the collection
embeddingOutput = [emb.collection for emb in embeddings_collection]
embeddingDataList = [each_emb_out.data for each_emb_out in embeddingOutput]
embeddingList = [
each_item.embedding
for each_emb_data in embeddingDataList
for each_item in each_emb_data
]
# Convert to numpy array for easier manipulation and calculations
embeddings_array = np.array(embeddingList)
def calculate_similarity(emb1, emb2):
# Compute cosine similarity between two embeddings
return np.dot(emb1, emb2) / (norm(emb1) * norm(emb2))
def get_average_embedding(embeddings_list):
# Calculate the mean embedding across a list of embeddings
return np.mean(embeddings_list, axis=0)
def find_nearest_neighbors(
query_index: int, embedding_list: List[List[float]], k: int = 5
):
# Find top-k most similar embeddings to a query embedding, based on cosine similarity
query_embedding = embedding_list[query_index]
similarities = [
(i, calculate_similarity(query_embedding, emb))
for i, emb in enumerate(embedding_list)
if i != query_index
]
return sorted(similarities, key=lambda x: x[1], reverse=True)[:k]
# Return dictionary of functions and processed data for further use
return {
"embeddings_array": embeddings_array,
"calculate_similarity": calculate_similarity,
"average_embedding": get_average_embedding,
"find_nearest_neighbors": find_nearest_neighbors,
}
The function demonstrate_embeddings_usage showcases how to analyze semantic similarities, find nearest neighbors, and calculate average embeddings for sample texts. It selects random texts, compares their similarities, finds nearest neighbors for a specific query, and compares average embeddings for texts containing “Paris”.
# Demonstrate embeddings usage with sample data
def demonstrate_embeddings_usage(sample_embeddings, input_text_list):
# Initialize processor and retrieve embeddings array
processor = process_embeddings(sample_embeddings)
embeddings = processor["embeddings_array"]
print("1. Analyzing Semantic Similarities:")
print("-" * 50)
# Select a few random indices for similarity testing
num_indices = 5
assert len(input_text_list) == len(embeddings)
indices = np.random.choice(len(input_text_list), num_indices, replace=False)
selected_text = np.array(input_text_list)[indices]
selected_embeddings = np.array(embeddings)[indices]
# Display selected texts and their embeddings
print("Selected indices:", indices)
print("Selected elements from array1:", selected_text)
print("Selected elements from array2:", selected_embeddings)
# Calculate similarity between each pair of selected texts
for i in range(len(selected_text)):
for j in range(i + 1, len(selected_text)):
similarity = processor["calculate_similarity"](
selected_embeddings[i], selected_embeddings[j]
)
print(f"\nComparing:\n'{selected_text[i]}' \nwith:\n'{selected_text[j]}'")
print(f"Similarity score: {similarity:.4f}")
print("\n2. Finding Nearest Neighbors:")
print("-" * 50)
# Find and display the 3 nearest neighbors for the first text
query_idx = 0
neighbors = processor["find_nearest_neighbors"](query_idx, embeddings, k=3)
print(f"\nQuery text: '{input_text_list[query_idx]}'")
print("\nNearest neighbors:")
for idx, similarity in neighbors:
print(f"- '{input_text_list[idx]}' (similarity: {similarity:.4f})")
print("\n3. Using Average Embeddings:")
print("-" * 50)
# Calculate and compare the average embedding for texts containing "Paris"
paris_indices = [i for i, text in enumerate(input_text_list) if "Paris" in text]
paris_embeddings = embeddings[paris_indices]
avg_paris_embedding = processor["average_embedding"](paris_embeddings)
print("\nComparing average 'Paris' embedding with all texts:")
for i, text in enumerate(input_text_list):
similarity = processor["calculate_similarity"](
avg_paris_embedding, embeddings[i]
)
print(f"- '{text}' (similarity: {similarity:.4f})")
Running the Model Client
Finally, we run the model client by initializing a set of sample texts, generating their embeddings, and using the embedding processing functions to analyze similarities and neighbors.
def run_model_client_embedding_usage():
# Define a set of sample texts to test embedding and similarity functionalities
sample_texts = [
"What is the capital of France?",
"Paris is the capital of France.",
"What is the population of France?",
"How big is Paris?",
"What is the weather like in Paris?",
]
# Duplicate each sample text to form an input list with repeated entries (for embedding testing)
input_text_list = [text for text in sample_texts for _ in range(2)]
# Generate embeddings for each text in the input list, and store them in an EmbeddingCollection
embeddings_collection = [
EmbeddingCollection(collection=get_openai_embedding(text), cindex=i)
for i, text in enumerate(input_text_list)
]
print(
embeddings_collection
) # Debugging output to verify embeddings collection content
# Demonstrate the usage of embeddings by analyzing similarities, finding neighbors, etc.
demonstrate_embeddings_usage(embeddings_collection, input_text_list)
To execute the complete example, simply call the run_model_client_embedding_usage() function:
run_model_client_embedding_usage()
This will trigger the embedding retrieval and processing functions, and you will see the results printed out, demonstrating how embeddings can be used for similarity analysis, neighbor finding, and averaging.
OPENAI LLM Chat - Multichat Usage#
This example demonstrates how to create a multichat system using OpenAI’s LLM with adalflow, where the assistant’s responses depend on the entire conversation history. This allows for a more dynamic and context-aware conversation flow.
from adalflow.components.model_client import OpenAIClient
from adalflow.core.types import ModelType
from adalflow.utils import setup_env
from typing import List, Dict
ChatConversation Class
Here, we define a ChatConversation
class to manage the conversation history and make API calls to the OpenAI model. The assistant’s responses are generated based on the entire conversation history.
class ChatConversation:
def __init__(self):
# Initialize the OpenAI client for managing API calls
self.openai_client = OpenAIClient()
# Initialize an empty conversation history to store chat messages
self.conversation_history: str = ""
# Model parameters to customize the API call
self.model_kwargs = {
"model": "gpt-3.5-turbo",
"temperature": 0.5, # Controls randomness; 0.5 for balanced responses
"max_tokens": 100, # Limits the response length
}
def add_user_message(self, message: str):
"""Add a user message to the conversation history"""
self.conversation_history += (
f"<USER> {message} </USER>" # Format for user message
)
def add_assistant_message(self, message: str):
"""Add an assistant message to the conversation history"""
self.conversation_history += (
f"<ASSISTANT> {message} </ASSISTANT>" # Format for assistant message
)
def get_response(self) -> str:
"""Get response from the model based on conversation history"""
# Convert the conversation history and model parameters into API arguments
api_kwargs = self.openai_client.convert_inputs_to_api_kwargs(
input=self.conversation_history,
model_kwargs=self.model_kwargs,
model_type=ModelType.LLM,
)
print(f"api_kwargs: {api_kwargs}") # Debugging output to verify API parameters
# Call the API with the generated arguments to get a response
response = self.openai_client.call(
api_kwargs=api_kwargs, model_type=ModelType.LLM
)
print("response: ", response) # Debugging output for raw API response
# Extract and parse the text response from the API output
response_text = self.openai_client.parse_chat_completion(response)
# Update conversation history with the assistant's response
self.add_assistant_message(response_text)
return response_text # Return the assistant's response to the caller
Simulating a Multi-turn Conversation
In the check_chat_conversation()
function, we simulate a multi-turn conversation by iterating over a list of user questions. Each question is added to the conversation history, and the assistant responds based on the accumulated conversation context.
def check_chat_conversation():
# Initialize a new chat conversation
chat = ChatConversation()
# Example list of user questions to simulate a multi-turn conversation
questions = [
"What is the capital of France?",
"What is its population?",
"Tell me about its famous landmarks",
]
# Iterate through each question in the list
for question in questions:
print(f"\nUser: {question}") # Display the user's question
chat.add_user_message(
question
) # Add the user question to the conversation history
response = (
chat.get_response()
) # Get assistant's response based on conversation history
print(f"Assistant: {response}") # Display the assistant's response
# Display the full conversation history after all exchanges
print("\nFull Conversation History:")
print(chat.conversation_history) # Print the accumulated conversation history
Key Points You can observe that each question is depended on previous question and the chat responds in apt manner check_chat_conversation()
OPENAI LLM Chat - Multichat Usage - Asynchronous#
This example demonstrates how to create an asynchronous multichat system using OpenAI’s LLM with adalflow. The asynchronous approach allows handling multiple questions in parallel, making the interaction more efficient when dealing with unrelated queries.
import asyncio
from adalflow.components.model_client import OpenAIClient
from adalflow.core.types import ModelType
from typing import List
ChatConversationAsync Class
The ChatConversationAsync
class is designed to handle asynchronous API calls to the OpenAI model. It supports concurrent requests, which improves performance when interacting with multiple questions simultaneously.
class ChatConversationAsync:
def __init__(self):
# Initialize with an asynchronous OpenAI client
self.openai_client = OpenAIClient()
# Default model parameters for the chat
self.model_kwargs = {
"model": "gpt-3.5-turbo", # Model used for chat
"temperature": 0.5, # Controls randomness in response
"max_tokens": 100, # Maximum tokens in the generated response
}
async def get_response(self, message: str) -> str:
"""Asynchronously get a response from the model for a given user message"""
# Convert input message and model parameters into the format expected by the API
api_kwargs = self.openai_client.convert_inputs_to_api_kwargs(
input=message, # User's message input
model_kwargs=self.model_kwargs, # Model-specific settings
model_type=ModelType.LLM, # Specify the model type as a language model (LLM)
)
print(f"api_kwargs: {api_kwargs}") # Log the API arguments for debugging
# Make an asynchronous API call to OpenAI's model
response = await self.openai_client.acall(
api_kwargs=api_kwargs, # Pass the prepared arguments
model_type=ModelType.LLM, # Specify the model type again
)
print("response: ", response) # Print the raw response from the API
# Parse the API response to extract the assistant's reply (chat completion)
response_text = self.openai_client.parse_chat_completion(response)
return response_text # Return the parsed response text
Running Multiple Asynchronous Chat Sessions
In the check_chat_conversations_async()
function, we handle a list of unrelated user questions concurrently. This is done by creating a list of asynchronous tasks and gathering their responses.
async def check_chat_conversations_async():
# Create an instance of ChatConversationAsync to handle asynchronous operations
chat = ChatConversationAsync()
# List of unrelated questions that will be handled in parallel
questions = [
"What is the capital of France?", # Question 1
"Is dog a wild animal?", # Question 2
"Tell me about amazon forest", # Question 3
]
# Create a list of asynchronous tasks, one for each question
# Each task calls the get_response method asynchronously for a question
tasks = [chat.get_response(question) for question in questions]
# Gather the results of all asynchronous tasks concurrently
responses = await asyncio.gather(*tasks)
# Print the responses from the assistant along with the respective user questions
for question, response in zip(questions, responses):
print(f"\nUser: {question}")
print(f"Assistant: {response}")
Running the Asynchronous Function
To execute the asynchronous function, you can use the following methods based on your environment:
# Run the asynchronous function if in a file
# asyncio.run(check_chat_conversations_async())
# in jupyter notebook
await check_chat_conversations_async()
This approach allows you to handle multiple independent conversations concurrently, improving the system’s performance and responsiveness.
OPENAI LLM Chat - Multichat Usage - Benchmark sync() vs async()#
This section compares the performance of synchronous (call()
) vs. asynchronous (acall()
) API calls to OpenAI’s language model, benchmarking them using a sample prompt to determine which approach is more efficient for handling multiple API requests.
import asyncio
import time
from adalflow.components.model_client import (
OpenAIClient,
) # Assuming OpenAIClient with .call() and .acall() is available
from adalflow.core.types import ModelType
Setup for Benchmarking
We initialize the OpenAI client and set up a sample prompt to test both synchronous and asynchronous API calls.
# Initialize the OpenAI client
openai_client = OpenAIClient()
# Sample prompt for testing
prompt = "Tell me a joke."
model_kwargs = {"model": "gpt-3.5-turbo", "temperature": 0.5, "max_tokens": 100}
Synchronous Benchmarking
The benchmark_sync_call
function runs the synchronous .call()
method multiple times and measures the total time taken for all requests.
# Synchronous function for benchmarking .call()
def benchmark_sync_call(api_kwargs, runs=10):
"""
Benchmark the synchronous .call() method by running it multiple times.
Parameters:
- api_kwargs: The arguments to be passed to the API call
- runs: The number of times to run the call (default is 10)
"""
# List to store responses
responses = []
# Record the start time of the benchmark
start_time = time.time()
# Perform synchronous API calls for the specified number of runs
responses = [
openai_client.call(
api_kwargs=api_kwargs, # API arguments
model_type=ModelType.LLM, # Model type (e.g., LLM for language models)
)
for _ in range(runs) # Repeat 'runs' times
]
# Record the end time after all calls are completed
end_time = time.time()
# Output the results of each synchronous call
for i, response in enumerate(responses):
print(f"sync call {i + 1} completed: {response}")
# Print the total time taken for all synchronous calls
print(f"\nSynchronous benchmark completed in {end_time - start_time:.2f} seconds")
# Asynchronous function for benchmarking .acall()
async def benchmark_async_acall(api_kwargs, runs=10):
"""
Benchmark the asynchronous .acall() method by running it multiple times concurrently.
Parameters:
- api_kwargs: The arguments to be passed to the API call
- runs: The number of times to run the asynchronous call (default is 10)
"""
# Record the start time of the benchmark
start_time = time.time()
# Create a list of asynchronous tasks for the specified number of runs
tasks = [
openai_client.acall(
api_kwargs=api_kwargs, # API arguments
model_type=ModelType.LLM, # Model type (e.g., LLM for language models)
)
for _ in range(runs) # Repeat 'runs' times
]
# Execute all tasks concurrently and wait for them to finish
responses = await asyncio.gather(*tasks)
# Record the end time after all tasks are completed
end_time = time.time()
# Output the results of each asynchronous call
for i, response in enumerate(responses):
print(f"Async call {i + 1} completed: {response}")
# Print the total time taken for all asynchronous calls
print(f"\nAsynchronous benchmark completed in {end_time - start_time:.2f} seconds")
api_kwargs = openai_client.convert_inputs_to_api_kwargs(
input=prompt, model_kwargs=model_kwargs, model_type=ModelType.LLM
)
# Run both benchmarks
print("Starting synchronous benchmark...\n")
benchmark_sync_call(api_kwargs)
# Run the asynchronous function if in a file
# asyncio.run(benchmark_async_acall(api_kwargs))
print("\nStarting asynchronous benchmark...\n")
await benchmark_async_acall(api_kwargs)
OPENAI LLM Chat - Additional Utils#
This section demonstrates the use of additional utility functions for OpenAI’s language model client. The following utility functions are included:
get_first_message_content()
get_all_messages_content()
get_probabilities()
These utilities can be used to interact with the OpenAI model in various ways, such as extracting the first message content, retrieving all message content from a multi-chat scenario, and calculating the probabilities of tokens.
Code Setup
First, we import necessary components for utilizing the OpenAI client and the utilities from the adalflow
library.
from adalflow.components.model_client import OpenAIClient
from adalflow.core.types import ModelType
from adalflow.utils import setup_env
from adalflow.components.model_client.openai_client import (
get_first_message_content,
get_all_messages_content,
get_probabilities,
)
from adalflow.core import Generator
Function: check_openai_additional_utils
This function demonstrates how to use the OpenAI client along with a custom utility function for generating responses from the model, based on the given query and utility function.
def check_openai_additional_utils(func, model_kwargs):
"""
This function demonstrates the usage of the OpenAI client and a custom utility function
for generating responses from the LLM model, based on the given query in openai client.
Parameters:
- func: A function that will be used to parse the chat completion (for custom parsing).
- model_kwargs: The additional model parameters (e.g., temperature, max_tokens) to be used in the model.
Returns:
- output: The generated response from the model based on the query.
"""
# Initialize the OpenAI client with a custom chat completion parser
openai_client = OpenAIClient(chat_completion_parser=func)
# Define a sample query (user question)
query = "What is the capital of France?"
# Set the model type to LLM (Large Language Model)
model_type = ModelType.LLM
# Create the prompt by formatting the user query as a conversation
prompt = f"User: {query}\n"
# Define any additional parameters needed for the model (e.g., the input string)
prompt_kwargs = {
"input_str": "What is the capital of France?",
}
# Initialize the Generator with the OpenAI client and model parameters
generator = Generator(model_client=openai_client, model_kwargs=model_kwargs)
# Execute the generator to get a response for the prompt (using the defined prompt_kwargs)
output = generator(prompt_kwargs=prompt_kwargs)
# Return the generated output (response from the LLM)
return output
Function: run_utils_functions
This function runs a series of utility functions using different model configurations for generating responses. It demonstrates how to check OpenAI model outputs using various utility functions.
def run_utils_functions():
"""
This function runs a series of utility functions using different model
configurations for generating responses. It demonstrates how to check
OpenAI model outputs using various utility functions.
"""
# Define the model arguments for the probability-based function (with logprobs)
probability_model_kwargs = {
"model": "gpt-3.5-turbo", # Specify the model version
"logprobs": True, # Enable logprobs to get probability distributions for tokens
"n": 2, # Request 2 different completions for each query
}
# Define general model arguments for most other functions
model_kwargs = {
"model": "gpt-3.5-turbo", # Specify the model version
"temperature": 0.5, # Control the randomness of responses (0 is deterministic)
"max_tokens": 100, # Set the maximum number of tokens (words) in the response
}
# List of functions to run with corresponding model arguments
func_list = [
[
get_probabilities,
probability_model_kwargs,
], # Function to get probabilities with specific kwargs
[
get_first_message_content,
model_kwargs,
], # Function to get first message content
[
get_all_messages_content,
model_kwargs,
], # Function to get all messages content in multi-chat scenarios
]
# Loop through each function and its corresponding arguments
for each_func in func_list:
# Check the function output using the specified arguments
result = check_openai_additional_utils(each_func[0], each_func[1])
# Print the function and result for debugging purposes
print(f"Function: {each_func[0].__name__}, Model Args: {each_func[1]}")
print(f"Result: {result}")
Running the Utility Functions
To execute the utility functions, we call the run_utils_functions()
method, which runs the defined functions and prints their results.
run_utils_functions()
Purpose and Usage
These utilities (get_first_message_content
, get_all_messages_content
, and get_probabilities
) allow users to extract specific information from the OpenAI LLM responses, such as individual message contents in a chat or the probability distribution over tokens.
Groq LLM Chat - Multichat Usage#
Note: Groq doesnt have embedder method to get embeddings like openai
The following example demonstrates how to set up a multi-turn conversation with the Groq LLM using the GroqAPIClient
.
from adalflow.components.model_client import GroqAPIClient
from adalflow.core.types import ModelType
from adalflow.utils import setup_env
from typing import List, Dict
ChatConversation Class
This class handles the conversation flow by interacting with the Groq model, keeping track of the conversation history, and generating responses.
class ChatConversation:
def __init__(self):
"""
Initialize a new ChatConversation object.
- GroqAPIClient is used to interact with the Groq model.
- conversation_history keeps track of the conversation between the user and assistant.
- model_kwargs contains the model parameters like temperature and max tokens.
"""
self.groq_client = (
GroqAPIClient()
) # Initialize GroqAPIClient for model interaction
self.conversation_history: str = (
"" # Initialize conversation history as an empty string
)
self.model_kwargs = {
"model": "llama3-8b-8192", # Specify the model to use
"temperature": 0.5, # Set the temperature for response variability
"max_tokens": 100, # Limit the number of tokens in the response
}
def add_user_message(self, message: str):
"""
Add a user message to the conversation history in the required format.
The message is wrapped with <USER> tags for better processing by the assistant.
"""
self.conversation_history += (
f"<USER> {message} </USER>" # Append user message to history
)
def add_assistant_message(self, message: str):
"""
Add an assistant message to the conversation history in the required format.
The message is wrapped with <ASSISTANT> tags for better processing.
"""
self.conversation_history += (
f"<ASSISTANT> {message} </ASSISTANT>" # Append assistant message to history
)
def get_response(self) -> str:
"""
Generate a response from the assistant based on the conversation history.
- Converts the conversation history and model kwargs into the format required by the Groq API.
- Calls the API to get the response.
- Parses and adds the assistant's reply to the conversation history.
"""
# Prepare the request for the Groq API, converting the inputs into the correct format
api_kwargs = self.groq_client.convert_inputs_to_api_kwargs(
input=self.conversation_history, # Use the conversation history as input
model_kwargs=self.model_kwargs, # Include model-specific parameters
model_type=ModelType.LLM, # Specify the model type (Large Language Model)
)
print(f"api_kwargs: {api_kwargs}") # Log the API request parameters
# Call the Groq model API to get the response
response = self.groq_client.call(
api_kwargs=api_kwargs,
model_type=ModelType.LLM, # Specify the model type again for clarity
)
print("response: ", response) # Log the API response
# Parse the response to extract the assistant's reply
response_text = self.groq_client.parse_chat_completion(response)
# Add the assistant's message to the conversation history
self.add_assistant_message(response_text)
# Return the assistant's response text
return response_text
Example Multi-Turn Conversation
The following function simulates a multi-turn conversation, where the user asks a series of questions and the assistant responds. It demonstrates how user inputs are processed, and responses are generated while maintaining the conversation history.
def check_chat_conversation():
"""
This function simulates a multi-turn conversation between a user and an assistant.
It demonstrates how user inputs are processed, and the assistant generates responses,
while maintaining the conversation history for each query.
"""
# Initialize the ChatConversation object
chat = ChatConversation() # This creates an instance of the ChatConversation class
# Define a list of user questions for a multi-turn conversation
questions = [
"What is the capital of France?", # First user question
"What is its population?", # Second user question
"Tell me about its famous landmarks", # Third user question
]
# Loop through each question and get the assistant's response
for question in questions:
# Print the current question from the user
print(f"\nUser: {question}")
# Add the user's message to the conversation history
chat.add_user_message(question)
# Get the assistant's response based on the conversation history
response = chat.get_response()
# Print the assistant's response
print(f"Assistant: {response}")
# After the conversation, print the full conversation history
print("\nFull Conversation History:")
print(
chat.conversation_history
) # This will print all messages (user and assistant) in the conversation history
Run the following to use groq_client multichat ability
check_chat_conversation()
Groq LLM Chat - Multichat Usage - Asynchronous#
This example demonstrates how to perform multi-turn conversations with the Groq LLM using asynchronous calls for each query. It uses Python’s asyncio to handle multiple independent requests concurrently.
import asyncio
from adalflow.components.model_client import GroqAPIClient
from adalflow.core.types import ModelType
from typing import List
ChatConversation Class
This class allows you to interact asynchronously with the Groq model. The get_response method fetches responses from the model for a single user input asynchronously.
class ChatConversation:
def __init__(self):
# Using an asynchronous client for communication with GroqAPI
self.groq_client = GroqAPIClient() # Create an instance of GroqAPIClient
# Model configuration parameters (e.g., Llama model with 8b parameters and 8192 context length)
self.model_kwargs = {
"model": "llama3-8b-8192", # Llama model with specific size
"temperature": 0.5, # Degree of randomness in the model's responses
"max_tokens": 100, # Maximum number of tokens in the response
}
async def get_response(self, message: str) -> str:
"""Get response from the model for a single message asynchronously"""
# Convert the user input message to the appropriate format for the Groq API
api_kwargs = self.groq_client.convert_inputs_to_api_kwargs(
input=message, # User's input message
model_kwargs=self.model_kwargs, # Model parameters
model_type=ModelType.LLM, # Model type for large language models (LLM)
)
print(f"api_kwargs: {api_kwargs}") # Print the API arguments for debugging
# Asynchronously call the Groq API with the provided API arguments
response = await self.groq_client.acall(
api_kwargs=api_kwargs, # Pass the API arguments
model_type=ModelType.LLM, # Specify the model type
)
print("response: ", response) # Print the API response for debugging
# Parse the response to extract the assistant's reply from the API response
response_text = self.groq_client.parse_chat_completion(response)
return response_text # Return the assistant's response text
Example Asynchronous Multi-Turn Conversation
The following function demonstrates how multiple independent questions are handled asynchronously. Each question is processed concurrently, and their responses are gathered using asyncio.gather.
async def check_chat_conversations():
# Create an instance of ChatConversation
chat = ChatConversation()
# List of unrelated questions for independent async calls
questions = [
"What is the capital of France?",
"Is dog a wild animal ?",
"Tell me about amazon forest",
]
# Run each question as an independent asynchronous task
tasks = [chat.get_response(question) for question in questions]
# Gather all the responses concurrently
responses = await asyncio.gather(*tasks)
# Display each response alongside the question
for question, response in zip(questions, responses):
print(f"\nUser: {question}")
print(f"Assistant: {response}")
To execute the function, run the following:
# Run the asynchronous function if in a file
# asyncio.run(check_chat_conversations())
await check_chat_conversations()
Groq LLM Chat - Multichat Usage - Benchmark sync() vs async()#
This example demonstrates how to benchmark the synchronous .call()
method versus the asynchronous .acall()
method for making API calls using Groq. The benchmark compares the time taken to execute multiple API requests synchronously and asynchronously.
import asyncio
import time
from adalflow.components.model_client import (
GroqAPIClient,
) # Assuming GroqAPI with .call() and .acall() is available
from adalflow.core.types import ModelType
Initialization
The following code initializes the Groq client and sets up the sample prompt and model parameters for testing.
# Initialize the Groq client
groq_client = GroqAPIClient()
# Sample prompt for testing
prompt = "Tell me a joke."
model_kwargs = {"model": "llama3-8b-8192", "temperature": 0.5, "max_tokens": 100}
Benchmarking Synchronous .call() Method
This function benchmarks the synchronous .call() method by calling the Groq API synchronously multiple times.
# Synchronous function for benchmarking .call()
def benchmark_sync_call(api_kwargs, runs=10):
# List to store responses from each synchronous call
responses = []
# Record the start time for benchmarking
start_time = time.time()
# Perform synchronous API calls in a loop
responses = [
groq_client.call( # Calling the API synchronously
api_kwargs=api_kwargs, # Passing the API arguments
model_type=ModelType.LLM, # Defining the model type
)
for _ in range(runs) # Repeat the call 'runs' times
]
# Record the end time after all calls are completed
end_time = time.time()
# Print out the response from each synchronous call
for i, response in enumerate(responses):
print(f"sync call {i + 1} completed: {response}")
# Print the total time taken for the synchronous benchmark
print(f"\nSynchronous benchmark completed in {end_time - start_time:.2f} seconds")
Benchmarking Asynchronous .acall()
Method
This asynchronous function benchmarks the .acall()
method by calling the Groq API asynchronously multiple times using asyncio.gather() to execute tasks concurrently.
# Asynchronous function for benchmarking .acall()
async def benchmark_async_acall(api_kwargs, runs=10):
# Record the start time for benchmarking
start_time = time.time()
# Create a list of tasks for asynchronous API calls
tasks = [
groq_client.acall( # Calling the API asynchronously
api_kwargs=api_kwargs, # Passing the API arguments
model_type=ModelType.LLM, # Defining the model type
)
for _ in range(runs) # Repeat the call 'runs' times
]
# Await the completion of all tasks concurrently
responses = await asyncio.gather(
*tasks
) # Gather all the responses from asynchronous calls
# Record the end time after all asynchronous calls are completed
end_time = time.time()
# Print out the response from each asynchronous call
for i, response in enumerate(responses):
print(f"Async call {i + 1} completed: {response}")
# Print the total time taken for the asynchronous benchmark
print(f"\nAsynchronous benchmark completed in {end_time - start_time:.2f} seconds")
Running the Benchmarks
The following code sets up the API arguments and runs both the synchronous and asynchronous benchmarks.
api_kwargs = groq_client.convert_inputs_to_api_kwargs(
input=prompt, model_kwargs=model_kwargs, model_type=ModelType.LLM
)
# Run both benchmarks
print("Starting synchronous benchmark...\n")
benchmark_sync_call(api_kwargs)
print("\nStarting asynchronous benchmark...\n")
await benchmark_async_acall(api_kwargs)
Building Custom Model client#
Building a Synchronous api call
Note: I am using openai api as a example to build custom model client in adalflow. Even though its already there in adalflow repo below code will definitly be a starter code whom ever wants to build a custom model client
# Building simple custom third party model client and using it
# I have modified convert_inputs_to_api_kwargs() to make sure it follows the prompt of openai and i have used appropiate
# openai api call in __call__()
import openai
from adalflow.core.model_client import ModelClient
from adalflow.core.types import ModelType, GeneratorOutput, EmbedderOutput
from openai.types import (
CreateEmbeddingResponse,
)
from adalflow.components.model_client.utils import parse_embedding_response
This class defines the custom model client. The constructor initializes the client by calling the parent class’s initializer (ModelClient), which is essential for the setup of the Adalflow framework.
class SimpleCustomModelClient(ModelClient):
# Initialize the custom model client
def __init__(self):
# Call the parent class's initializer
super().__init__()
pass # Placeholder for any initialization logic if needed in the future
# Method to convert input into API parameters for different model types (LLM or Embedder)
def convert_inputs_to_api_kwargs(
self, input=None, model_kwargs={}, model_type=ModelType.UNDEFINED
):
"""
Convert the inputs into API arguments based on the model type.
Args:
input (str): The input text to be processed.
model_kwargs (dict): Additional model parameters like temperature, max_tokens, etc.
model_type (ModelType): The type of model to use (LLM or Embedder).
Returns:
dict: API arguments formatted for the specified model type.
"""
if (
model_type == ModelType.LLM
): # If the model type is a large language model (LLM)
return {
"model": model_kwargs[
"model"
], # Set the model to use (e.g., GPT-3, GPT-4)
"messages": input, # Provide the input as the message
"temperature": model_kwargs[
"temperature"
], # Set the temperature (creativity of the response)
"max_tokens": model_kwargs[
"max_tokens"
], # Max tokens to generate in the response
}
elif model_type == ModelType.EMBEDDER: # If the model type is an embedder
return {
"model": model_kwargs["model"], # Model name for embedding
"input": [input], # Provide the input in a list format for embedding
}
else:
# Raise an error if the model type is unsupported
raise ValueError(f"model_type {model_type} is not supported")
# Method to make the actual API call to OpenAI for either completions (LLM) or embeddings
def call(self, api_kwargs={}, model_type=ModelType.UNDEFINED):
"""
Call the appropriate OpenAI API method based on the model type (LLM or Embedder).
Args:
api_kwargs (dict): Arguments to be passed to the API call.
model_type (ModelType): The type of model (LLM or Embedder).
Returns:
Response: The API response from OpenAI.
"""
if model_type == ModelType.LLM: # If the model type is LLM (e.g., GPT-3, GPT-4)
return openai.chat.completions.create(
**api_kwargs
) # Call the chat API for completion
elif model_type == ModelType.EMBEDDER: # If the model type is Embedder
return openai.embeddings.create(**api_kwargs) # Call the embedding API
else:
# Raise an error if an invalid model type is passed
raise ValueError(f"Unsupported model type: {model_type}")
# Method to parse the response from a chat completion API call
def parse_chat_completion(self, completion):
"""
Parse the response from a chat completion API call into a custom output format.
Args:
completion: The completion response from the OpenAI API.
Returns:
GeneratorOutput: A custom data structure containing the parsed response.
"""
# Note: GeneratorOutput is a adalflow dataclass that contains the parsed completion data
return GeneratorOutput(
data=completion, # Store the raw completion data
error=None, # No error in this case
raw_response=str(completion), # Store the raw response as a string
)
# Method to parse the response from an embedding API call
def parse_embedding_response(
self, response: CreateEmbeddingResponse
) -> EmbedderOutput:
"""
Parse the response from an embedding API call into a custom output format.
Args:
response (CreateEmbeddingResponse): The response from the embedding API.
Returns:
EmbedderOutput: A custom data structure containing the parsed embedding response.
"""
try:
# Attempt to parse the embedding response using a helper function
return parse_embedding_response(response)
except Exception as e:
# If parsing fails, return an error message with the raw response
return EmbedderOutput(data=[], error=str(e), raw_response=response)
In below block, the custom model client is instantiated, and a query is defined for processing by both an LLM (like GPT-3.5) and an Embedder model. The API arguments are converted, and the call() method is used to fetch responses. Finally, both types of responses (LLM and Embedder) are parsed and printed.
def build_custom_model_client():
# Instantiate the custom model client (SimpleCustomModelClient)
custom_client = SimpleCustomModelClient()
# Define the query for the model to process
query = "What is the capital of France?"
# Set the model type for a Large Language Model (LLM)
model_type = ModelType.LLM
# Prepare the message prompt as expected by the OpenAI chat API.
# This format is suitable for GPT-like models (e.g., gpt-3.5-turbo).
message_prompt = [
{
"role": "user", # Define the user role in the conversation
"content": [
{
"type": "text", # Specify that the input is a text type
"text": query, # The actual query to be processed by the model
}
],
}
]
# Print message indicating the usage of the LLM model type
print("ModelType LLM")
# Define additional model parameters like model name, temperature, and max tokens for LLM
model_kwargs = {"model": "gpt-3.5-turbo", "temperature": 0.5, "max_tokens": 100}
# Convert the input message and model kwargs into the required API parameters
api_kwargs = custom_client.convert_inputs_to_api_kwargs(
input=message_prompt, model_kwargs=model_kwargs, model_type=model_type
)
# Print the API arguments that will be passed to the call method
print(f"api_kwargs: {api_kwargs}")
# Call the LLM model using the prepared API arguments
result = custom_client.call(api_kwargs, ModelType.LLM)
# Print the result of the LLM model call (response from OpenAI)
print(result)
# Parse the chat completion response and output a more structured result
response_text = custom_client.parse_chat_completion(result)
# Print the structured response from the chat completion
print(f"response_text: {response_text}")
# Switch to using the Embedder model type
print("ModelType EMBEDDER")
# Define model-specific parameters for the embedding model
model_kwargs = {
"model": "text-embedding-3-small",
"dimensions": 8,
"encoding_format": "float",
}
# Convert the input query for the embedder model
api_kwargs = custom_client.convert_inputs_to_api_kwargs(
input=query, model_kwargs=model_kwargs, model_type=ModelType.EMBEDDER
)
# Print the API arguments that will be passed to the embedder model
print(f"embedder api_kwargs: {api_kwargs}")
# Call the Embedder model using the prepared API arguments
result = custom_client.call(api_kwargs, ModelType.EMBEDDER)
# Print the result of the Embedder model call (embedding response)
print(result)
# Parse the embedding response and output a more structured result
response_text = custom_client.parse_embedding_response(result)
# Print the structured response from the embedding model
print(f"response_text: {response_text}")
This is the function call that triggers the execution of the custom model client, processing the defined query and displaying results for both LLM and Embedder.
build_custom_model_client()
API reference
core.model_client.ModelClient
components.model_client.transformers_client.TransformersClient