"""OpenAI ModelClient integration."""
import os
from typing import (
Dict,
Sequence,
Optional,
List,
Any,
TypeVar,
Callable,
Generator,
Union,
Literal,
)
import re
import logging
import backoff
# optional import
from adalflow.utils.lazy_import import safe_import, OptionalPackages
openai = safe_import(OptionalPackages.OPENAI.value[0], OptionalPackages.OPENAI.value[1])
from openai import OpenAI, AsyncOpenAI, Stream
from openai import (
APITimeoutError,
InternalServerError,
RateLimitError,
UnprocessableEntityError,
BadRequestError,
)
from openai.types import (
Completion,
CreateEmbeddingResponse,
)
from openai.types.chat import ChatCompletionChunk, ChatCompletion
from adalflow.core.model_client import ModelClient
from adalflow.core.types import (
ModelType,
EmbedderOutput,
TokenLogProb,
CompletionUsage,
GeneratorOutput,
)
from adalflow.components.model_client.utils import parse_embedding_response
log = logging.getLogger(__name__)
T = TypeVar("T")
# completion parsing functions and you can combine them into one singple chat completion parser
[docs]
def get_first_message_content(completion: ChatCompletion) -> str:
r"""When we only need the content of the first message.
It is the default parser for chat completion."""
return completion.choices[0].message.content
# def _get_chat_completion_usage(completion: ChatCompletion) -> OpenAICompletionUsage:
# return completion.usage
[docs]
def parse_stream_response(completion: ChatCompletionChunk) -> str:
r"""Parse the response of the stream API."""
return completion.choices[0].delta.content
[docs]
def handle_streaming_response(generator: Stream[ChatCompletionChunk]):
r"""Handle the streaming response."""
for completion in generator:
log.debug(f"Raw chunk completion: {completion}")
parsed_content = parse_stream_response(completion)
yield parsed_content
[docs]
def get_all_messages_content(completion: ChatCompletion) -> List[str]:
r"""When the n > 1, get all the messages content."""
return [c.message.content for c in completion.choices]
[docs]
def get_probabilities(completion: ChatCompletion) -> List[List[TokenLogProb]]:
r"""Get the probabilities of each token in the completion."""
log_probs = []
for c in completion.choices:
content = c.logprobs.content
print(content)
log_probs_for_choice = []
for openai_token_logprob in content:
token = openai_token_logprob.token
logprob = openai_token_logprob.logprob
log_probs_for_choice.append(TokenLogProb(token=token, logprob=logprob))
log_probs.append(log_probs_for_choice)
return log_probs
[docs]
class OpenAIClient(ModelClient):
__doc__ = r"""A component wrapper for the OpenAI API client.
Support both embedding and chat completion API.
Users (1) simplify use ``Embedder`` and ``Generator`` components by passing OpenAIClient() as the model_client.
(2) can use this as an example to create their own API client or extend this class(copying and modifing the code) in their own project.
Note:
We suggest users not to use `response_format` to enforce output data type or `tools` and `tool_choice` in your model_kwargs when calling the API.
We do not know how OpenAI is doing the formating or what prompt they have added.
Instead
- use :ref:`OutputParser<components-output_parsers>` for response parsing and formating.
Args:
api_key (Optional[str], optional): OpenAI API key. Defaults to None.
chat_completion_parser (Callable[[Completion], Any], optional): A function to parse the chat completion to a str. Defaults to None.
Default is `get_first_message_content`.
References:
- Embeddings models: https://platform.openai.com/docs/guides/embeddings
- Chat models: https://platform.openai.com/docs/guides/text-generation
- OpenAI docs: https://platform.openai.com/docs/introduction
"""
def __init__(
self,
api_key: Optional[str] = None,
chat_completion_parser: Callable[[Completion], Any] = None,
input_type: Literal["text", "messages"] = "text",
):
r"""It is recommended to set the OPENAI_API_KEY environment variable instead of passing it as an argument.
Args:
api_key (Optional[str], optional): OpenAI API key. Defaults to None.
"""
super().__init__()
self._api_key = api_key
self.sync_client = self.init_sync_client()
self.async_client = None # only initialize if the async call is called
self.chat_completion_parser = (
chat_completion_parser or get_first_message_content
)
self._input_type = input_type
[docs]
def init_sync_client(self):
api_key = self._api_key or os.getenv("OPENAI_API_KEY")
if not api_key:
raise ValueError("Environment variable OPENAI_API_KEY must be set")
return OpenAI(api_key=api_key)
[docs]
def init_async_client(self):
api_key = self._api_key or os.getenv("OPENAI_API_KEY")
if not api_key:
raise ValueError("Environment variable OPENAI_API_KEY must be set")
return AsyncOpenAI(api_key=api_key)
# def _parse_chat_completion(self, completion: ChatCompletion) -> "GeneratorOutput":
# # TODO: raw output it is better to save the whole completion as a source of truth instead of just the message
# try:
# data = self.chat_completion_parser(completion)
# usage = self.track_completion_usage(completion)
# return GeneratorOutput(
# data=data, error=None, raw_response=str(data), usage=usage
# )
# except Exception as e:
# log.error(f"Error parsing the completion: {e}")
# return GeneratorOutput(data=None, error=str(e), raw_response=completion)
[docs]
def parse_chat_completion(
self,
completion: Union[ChatCompletion, Generator[ChatCompletionChunk, None, None]],
) -> "GeneratorOutput":
"""Parse the completion, and put it into the raw_response."""
log.debug(f"completion: {completion}, parser: {self.chat_completion_parser}")
try:
data = self.chat_completion_parser(completion)
usage = self.track_completion_usage(completion)
return GeneratorOutput(
data=None, error=None, raw_response=data, usage=usage
)
except Exception as e:
log.error(f"Error parsing the completion: {e}")
return GeneratorOutput(data=None, error=str(e), raw_response=completion)
[docs]
def track_completion_usage(
self,
completion: Union[ChatCompletion, Generator[ChatCompletionChunk, None, None]],
) -> CompletionUsage:
if isinstance(completion, ChatCompletion):
usage: CompletionUsage = CompletionUsage(
completion_tokens=completion.usage.completion_tokens,
prompt_tokens=completion.usage.prompt_tokens,
total_tokens=completion.usage.total_tokens,
)
return usage
else:
raise NotImplementedError(
"streaming completion usage tracking is not implemented"
)
[docs]
def parse_embedding_response(
self, response: CreateEmbeddingResponse
) -> EmbedderOutput:
r"""Parse the embedding response to a structure LightRAG components can understand.
Should be called in ``Embedder``.
"""
try:
return parse_embedding_response(response)
except Exception as e:
log.error(f"Error parsing the embedding response: {e}")
return EmbedderOutput(data=[], error=str(e), raw_response=response)
[docs]
@backoff.on_exception(
backoff.expo,
(
APITimeoutError,
InternalServerError,
RateLimitError,
UnprocessableEntityError,
BadRequestError,
),
max_time=5,
)
def call(self, api_kwargs: Dict = {}, model_type: ModelType = ModelType.UNDEFINED):
"""
kwargs is the combined input and model_kwargs. Support streaming call.
"""
log.info(f"api_kwargs: {api_kwargs}")
if model_type == ModelType.EMBEDDER:
return self.sync_client.embeddings.create(**api_kwargs)
elif model_type == ModelType.LLM:
if "stream" in api_kwargs and api_kwargs.get("stream", False):
log.debug("streaming call")
self.chat_completion_parser = handle_streaming_response
return self.sync_client.chat.completions.create(**api_kwargs)
return self.sync_client.chat.completions.create(**api_kwargs)
else:
raise ValueError(f"model_type {model_type} is not supported")
[docs]
@backoff.on_exception(
backoff.expo,
(
APITimeoutError,
InternalServerError,
RateLimitError,
UnprocessableEntityError,
BadRequestError,
),
max_time=5,
)
async def acall(
self, api_kwargs: Dict = {}, model_type: ModelType = ModelType.UNDEFINED
):
"""
kwargs is the combined input and model_kwargs
"""
if self.async_client is None:
self.async_client = self.init_async_client()
if model_type == ModelType.EMBEDDER:
return await self.async_client.embeddings.create(**api_kwargs)
elif model_type == ModelType.LLM:
return await self.async_client.chat.completions.create(**api_kwargs)
else:
raise ValueError(f"model_type {model_type} is not supported")
[docs]
@classmethod
def from_dict(cls: type[T], data: Dict[str, Any]) -> T:
obj = super().from_dict(data)
# recreate the existing clients
obj.sync_client = obj.init_sync_client()
obj.async_client = obj.init_async_client()
return obj
[docs]
def to_dict(self) -> Dict[str, Any]:
r"""Convert the component to a dictionary."""
# TODO: not exclude but save yes or no for recreating the clients
exclude = [
"sync_client",
"async_client",
] # unserializable object
output = super().to_dict(exclude=exclude)
return output
# if __name__ == "__main__":
# from adalflow.core import Generator
# from adalflow.utils import setup_env, get_logger
# log = get_logger(level="DEBUG")
# setup_env()
# prompt_kwargs = {"input_str": "What is the meaning of life?"}
# gen = Generator(
# model_client=OpenAIClient(),
# model_kwargs={"model": "gpt-3.5-turbo", "stream": True},
# )
# gen_response = gen(prompt_kwargs)
# print(f"gen_response: {gen_response}")
# for genout in gen_response.data:
# print(f"genout: {genout}")