Source code for components.model_client.chat_completion_to_response_converter

"""
Code from OpenAI Agents library src/agents/models/openai_chatcompletions.py

Used to convert the OpenAI ChatCompletion stream to Response API format
"""

from __future__ import annotations

from collections.abc import AsyncIterator, Iterator
from dataclasses import dataclass, field

from openai import AsyncStream
from openai._streaming import Stream
from openai.types.chat import ChatCompletion, ChatCompletionChunk
from openai.types.completion_usage import CompletionUsage
from openai.types.responses import (
    Response,
    ResponseCompletedEvent,
    ResponseContentPartAddedEvent,
    ResponseContentPartDoneEvent,
    ResponseCreatedEvent,
    ResponseFunctionCallArgumentsDeltaEvent,
    ResponseFunctionToolCall,
    ResponseOutputItem,
    ResponseOutputItemAddedEvent,
    ResponseOutputItemDoneEvent,
    ResponseOutputMessage,
    ResponseOutputRefusal,
    ResponseOutputText,
    ResponseRefusalDeltaEvent,
    ResponseTextDeltaEvent,
    ResponseUsage,
    ResponseStreamEvent,
)
from openai.types.responses.response_usage import (
    InputTokensDetails,
    OutputTokensDetails,
)

FAKE_RESPONSES_ID = "fake_responses_id"
"""From the OpenAI Agents library This is a placeholder ID used to fill in the `id` field in Responses API related objects. It's
useful when you're creating Responses objects from non-Responses APIs, e.g. the OpenAI Chat
Completions API or other LLM providers.
"""


[docs] @dataclass class StreamingState: started: bool = False text_content_index_and_output: tuple[int, ResponseOutputText] | None = None refusal_content_index_and_output: tuple[int, ResponseOutputRefusal] | None = None function_calls: dict[int, ResponseFunctionToolCall] = field(default_factory=dict)
[docs] class SequenceNumber: def __init__(self): self._sequence_number = 0
[docs] def get_and_increment(self) -> int: num = self._sequence_number self._sequence_number += 1 return num
[docs] class ChatCompletionToResponseConverter: """Converts OpenAI ChatCompletion streams to standardized Response API format events."""
[docs] @classmethod async def async_handle_stream( cls, response: Response, stream: AsyncStream[ChatCompletionChunk], ) -> AsyncIterator[ResponseStreamEvent]: usage: CompletionUsage | None = None state = StreamingState() sequence_number = SequenceNumber() async for chunk in stream: if not state.started: state.started = True yield ResponseCreatedEvent( response=response, type="response.created", sequence_number=sequence_number.get_and_increment(), ) # This is always set by the OpenAI API, but not by others e.g. LiteLLM usage = chunk.usage if hasattr(chunk, "usage") else None if not chunk.choices or not chunk.choices[0].delta: continue delta = chunk.choices[0].delta # Handle text if delta.content: if not state.text_content_index_and_output: # Initialize a content tracker for streaming text state.text_content_index_and_output = ( 0 if not state.refusal_content_index_and_output else 1, ResponseOutputText( text="", type="output_text", annotations=[], ), ) # Start a new assistant message stream assistant_item = ResponseOutputMessage( id=FAKE_RESPONSES_ID, content=[], role="assistant", type="message", status="in_progress", ) # Notify consumers of the start of a new output message + first content part yield ResponseOutputItemAddedEvent( item=assistant_item, output_index=0, type="response.output_item.added", sequence_number=sequence_number.get_and_increment(), ) yield ResponseContentPartAddedEvent( content_index=state.text_content_index_and_output[0], item_id=FAKE_RESPONSES_ID, output_index=0, part=ResponseOutputText( text="", type="output_text", annotations=[], ), type="response.content_part.added", sequence_number=sequence_number.get_and_increment(), ) # Emit the delta for this segment of content yield ResponseTextDeltaEvent( content_index=state.text_content_index_and_output[0], delta=delta.content, item_id=FAKE_RESPONSES_ID, output_index=0, type="response.output_text.delta", sequence_number=sequence_number.get_and_increment(), logprobs=[], ) # Accumulate the text into the response part state.text_content_index_and_output[1].text += delta.content # Handle refusals (model declines to answer) # This is always set by the OpenAI API, but not by others e.g. LiteLLM if hasattr(delta, "refusal") and delta.refusal: if not state.refusal_content_index_and_output: # Initialize a content tracker for streaming refusal text state.refusal_content_index_and_output = ( 0 if not state.text_content_index_and_output else 1, ResponseOutputRefusal(refusal="", type="refusal"), ) # Start a new assistant message if one doesn't exist yet (in-progress) assistant_item = ResponseOutputMessage( id=FAKE_RESPONSES_ID, content=[], role="assistant", type="message", status="in_progress", ) # Notify downstream that assistant message + first content part are starting yield ResponseOutputItemAddedEvent( item=assistant_item, output_index=0, type="response.output_item.added", sequence_number=sequence_number.get_and_increment(), ) yield ResponseContentPartAddedEvent( content_index=state.refusal_content_index_and_output[0], item_id=FAKE_RESPONSES_ID, output_index=0, part=ResponseOutputText( text="", type="output_text", annotations=[], ), type="response.content_part.added", sequence_number=sequence_number.get_and_increment(), ) # Emit the delta for this segment of refusal yield ResponseRefusalDeltaEvent( content_index=state.refusal_content_index_and_output[0], delta=delta.refusal, item_id=FAKE_RESPONSES_ID, output_index=0, type="response.refusal.delta", sequence_number=sequence_number.get_and_increment(), ) # Accumulate the refusal string in the output part state.refusal_content_index_and_output[1].refusal += delta.refusal # Handle tool calls # Because we don't know the name of the function until the end of the stream, we'll # save everything and yield events at the end if delta.tool_calls: for tc_delta in delta.tool_calls: if tc_delta.index not in state.function_calls: state.function_calls[tc_delta.index] = ResponseFunctionToolCall( id=FAKE_RESPONSES_ID, arguments="", name="", type="function_call", call_id="", ) tc_function = tc_delta.function state.function_calls[tc_delta.index].arguments += ( tc_function.arguments if tc_function else "" ) or "" state.function_calls[tc_delta.index].name += ( tc_function.name if tc_function else "" ) or "" state.function_calls[tc_delta.index].call_id += tc_delta.id or "" function_call_starting_index = 0 if state.text_content_index_and_output: function_call_starting_index += 1 # Send end event for this content part yield ResponseContentPartDoneEvent( content_index=state.text_content_index_and_output[0], item_id=FAKE_RESPONSES_ID, output_index=0, part=state.text_content_index_and_output[1], type="response.content_part.done", sequence_number=sequence_number.get_and_increment(), ) if state.refusal_content_index_and_output: function_call_starting_index += 1 # Send end event for this content part yield ResponseContentPartDoneEvent( content_index=state.refusal_content_index_and_output[0], item_id=FAKE_RESPONSES_ID, output_index=0, part=state.refusal_content_index_and_output[1], type="response.content_part.done", sequence_number=sequence_number.get_and_increment(), ) # Actually send events for the function calls for function_call in state.function_calls.values(): # First, a ResponseOutputItemAdded for the function call yield ResponseOutputItemAddedEvent( item=ResponseFunctionToolCall( id=FAKE_RESPONSES_ID, call_id=function_call.call_id, arguments=function_call.arguments, name=function_call.name, type="function_call", ), output_index=function_call_starting_index, type="response.output_item.added", sequence_number=sequence_number.get_and_increment(), ) # Then, yield the args yield ResponseFunctionCallArgumentsDeltaEvent( delta=function_call.arguments, item_id=FAKE_RESPONSES_ID, output_index=function_call_starting_index, type="response.function_call_arguments.delta", sequence_number=sequence_number.get_and_increment(), ) # Finally, the ResponseOutputItemDone yield ResponseOutputItemDoneEvent( item=ResponseFunctionToolCall( id=FAKE_RESPONSES_ID, call_id=function_call.call_id, arguments=function_call.arguments, name=function_call.name, type="function_call", ), output_index=function_call_starting_index, type="response.output_item.done", sequence_number=sequence_number.get_and_increment(), ) # Finally, send the Response completed event outputs: list[ResponseOutputItem] = [] if ( state.text_content_index_and_output or state.refusal_content_index_and_output ): assistant_msg = ResponseOutputMessage( id=FAKE_RESPONSES_ID, content=[], role="assistant", type="message", status="completed", ) if state.text_content_index_and_output: assistant_msg.content.append(state.text_content_index_and_output[1]) if state.refusal_content_index_and_output: assistant_msg.content.append(state.refusal_content_index_and_output[1]) outputs.append(assistant_msg) # send a ResponseOutputItemDone for the assistant message yield ResponseOutputItemDoneEvent( item=assistant_msg, output_index=0, type="response.output_item.done", sequence_number=sequence_number.get_and_increment(), ) for function_call in state.function_calls.values(): outputs.append(function_call) final_response = response.model_copy() final_response.output = outputs final_response.usage = ( ResponseUsage( input_tokens=usage.prompt_tokens, output_tokens=usage.completion_tokens, total_tokens=usage.total_tokens, output_tokens_details=OutputTokensDetails( reasoning_tokens=( usage.completion_tokens_details.reasoning_tokens if usage.completion_tokens_details and usage.completion_tokens_details.reasoning_tokens else 0 ) ), input_tokens_details=InputTokensDetails( cached_tokens=( usage.prompt_tokens_details.cached_tokens if usage.prompt_tokens_details and usage.prompt_tokens_details.cached_tokens else 0 ) ), ) if usage else None ) yield ResponseCompletedEvent( response=final_response, type="response.completed", sequence_number=sequence_number.get_and_increment(), )
[docs] @classmethod def sync_handle_stream( cls, response: Response, stream: Stream[ChatCompletionChunk], ) -> Iterator[ResponseStreamEvent]: usage: CompletionUsage | None = None state = StreamingState() sequence_number = SequenceNumber() for chunk in stream: if not state.started: state.started = True yield ResponseCreatedEvent( response=response, type="response.created", sequence_number=sequence_number.get_and_increment(), ) # This is always set by the OpenAI API, but not by others e.g. LiteLLM usage = chunk.usage if hasattr(chunk, "usage") else None if not chunk.choices or not chunk.choices[0].delta: continue delta = chunk.choices[0].delta # Handle text if delta.content: if not state.text_content_index_and_output: # Initialize a content tracker for streaming text state.text_content_index_and_output = ( 0 if not state.refusal_content_index_and_output else 1, ResponseOutputText( text="", type="output_text", annotations=[], ), ) # Start a new assistant message stream assistant_item = ResponseOutputMessage( id=FAKE_RESPONSES_ID, content=[], role="assistant", type="message", status="in_progress", ) # Notify consumers of the start of a new output message + first content part yield ResponseOutputItemAddedEvent( item=assistant_item, output_index=0, type="response.output_item.added", sequence_number=sequence_number.get_and_increment(), ) yield ResponseContentPartAddedEvent( content_index=state.text_content_index_and_output[0], item_id=FAKE_RESPONSES_ID, output_index=0, part=ResponseOutputText( text="", type="output_text", annotations=[], ), type="response.content_part.added", sequence_number=sequence_number.get_and_increment(), ) # Emit the delta for this segment of content yield ResponseTextDeltaEvent( content_index=state.text_content_index_and_output[0], delta=delta.content, item_id=FAKE_RESPONSES_ID, output_index=0, type="response.output_text.delta", sequence_number=sequence_number.get_and_increment(), logprobs=[], ) # Accumulate the text into the response part state.text_content_index_and_output[1].text += delta.content # Handle refusals (model declines to answer) # This is always set by the OpenAI API, but not by others e.g. LiteLLM if hasattr(delta, "refusal") and delta.refusal: if not state.refusal_content_index_and_output: # Initialize a content tracker for streaming refusal text state.refusal_content_index_and_output = ( 0 if not state.text_content_index_and_output else 1, ResponseOutputRefusal(refusal="", type="refusal"), ) # Start a new assistant message if one doesn't exist yet (in-progress) assistant_item = ResponseOutputMessage( id=FAKE_RESPONSES_ID, content=[], role="assistant", type="message", status="in_progress", ) # Notify downstream that assistant message + first content part are starting yield ResponseOutputItemAddedEvent( item=assistant_item, output_index=0, type="response.output_item.added", sequence_number=sequence_number.get_and_increment(), ) yield ResponseContentPartAddedEvent( content_index=state.refusal_content_index_and_output[0], item_id=FAKE_RESPONSES_ID, output_index=0, part=ResponseOutputText( text="", type="output_text", annotations=[], ), type="response.content_part.added", sequence_number=sequence_number.get_and_increment(), ) # Emit the delta for this segment of refusal yield ResponseRefusalDeltaEvent( content_index=state.refusal_content_index_and_output[0], delta=delta.refusal, item_id=FAKE_RESPONSES_ID, output_index=0, type="response.refusal.delta", sequence_number=sequence_number.get_and_increment(), ) # Accumulate the refusal string in the output part state.refusal_content_index_and_output[1].refusal += delta.refusal # Handle tool calls # Because we don't know the name of the function until the end of the stream, we'll # save everything and yield events at the end if delta.tool_calls: for tc_delta in delta.tool_calls: if tc_delta.index not in state.function_calls: state.function_calls[tc_delta.index] = ResponseFunctionToolCall( id=FAKE_RESPONSES_ID, arguments="", name="", type="function_call", call_id="", ) tc_function = tc_delta.function state.function_calls[tc_delta.index].arguments += ( tc_function.arguments if tc_function else "" ) or "" state.function_calls[tc_delta.index].name += ( tc_function.name if tc_function else "" ) or "" state.function_calls[tc_delta.index].call_id += tc_delta.id or "" function_call_starting_index = 0 if state.text_content_index_and_output: function_call_starting_index += 1 # Send end event for this content part yield ResponseContentPartDoneEvent( content_index=state.text_content_index_and_output[0], item_id=FAKE_RESPONSES_ID, output_index=0, part=state.text_content_index_and_output[1], type="response.content_part.done", sequence_number=sequence_number.get_and_increment(), ) if state.refusal_content_index_and_output: function_call_starting_index += 1 # Send end event for this content part yield ResponseContentPartDoneEvent( content_index=state.refusal_content_index_and_output[0], item_id=FAKE_RESPONSES_ID, output_index=0, part=state.refusal_content_index_and_output[1], type="response.content_part.done", sequence_number=sequence_number.get_and_increment(), ) # Actually send events for the function calls for function_call in state.function_calls.values(): # First, a ResponseOutputItemAdded for the function call yield ResponseOutputItemAddedEvent( item=ResponseFunctionToolCall( id=FAKE_RESPONSES_ID, call_id=function_call.call_id, arguments=function_call.arguments, name=function_call.name, type="function_call", ), output_index=function_call_starting_index, type="response.output_item.added", sequence_number=sequence_number.get_and_increment(), ) # Then, yield the args yield ResponseFunctionCallArgumentsDeltaEvent( delta=function_call.arguments, item_id=FAKE_RESPONSES_ID, output_index=function_call_starting_index, type="response.function_call_arguments.delta", sequence_number=sequence_number.get_and_increment(), ) # Finally, the ResponseOutputItemDone yield ResponseOutputItemDoneEvent( item=ResponseFunctionToolCall( id=FAKE_RESPONSES_ID, call_id=function_call.call_id, arguments=function_call.arguments, name=function_call.name, type="function_call", ), output_index=function_call_starting_index, type="response.output_item.done", sequence_number=sequence_number.get_and_increment(), ) # Finally, send the Response completed event outputs: list[ResponseOutputItem] = [] if ( state.text_content_index_and_output or state.refusal_content_index_and_output ): assistant_msg = ResponseOutputMessage( id=FAKE_RESPONSES_ID, content=[], role="assistant", type="message", status="completed", ) if state.text_content_index_and_output: assistant_msg.content.append(state.text_content_index_and_output[1]) if state.refusal_content_index_and_output: assistant_msg.content.append(state.refusal_content_index_and_output[1]) outputs.append(assistant_msg) # send a ResponseOutputItemDone for the assistant message yield ResponseOutputItemDoneEvent( item=assistant_msg, output_index=0, type="response.output_item.done", sequence_number=sequence_number.get_and_increment(), ) for function_call in state.function_calls.values(): outputs.append(function_call) final_response = response.model_copy() final_response.output = outputs final_response.usage = ( ResponseUsage( input_tokens=usage.prompt_tokens, output_tokens=usage.completion_tokens, total_tokens=usage.total_tokens, output_tokens_details=OutputTokensDetails( reasoning_tokens=( usage.completion_tokens_details.reasoning_tokens if usage.completion_tokens_details and usage.completion_tokens_details.reasoning_tokens else 0 ) ), input_tokens_details=InputTokensDetails( cached_tokens=( usage.prompt_tokens_details.cached_tokens if usage.prompt_tokens_details and usage.prompt_tokens_details.cached_tokens else 0 ) ), ) if usage else None ) yield ResponseCompletedEvent( response=final_response, type="response.completed", sequence_number=sequence_number.get_and_increment(), )
[docs] @classmethod def get_chat_completion_content(cls, completion: ChatCompletion) -> str: """Retrieve the final text from the ChatCompletion object by taking the content of the first choice""" return completion.choices[0].message.content