Agent Streaming Architecture in OpenAI Agents SDK¶
The OpenAI Agents SDK implements a sophisticated streaming architecture that standardizes around the OpenAI Responses API format while providing compatibility with the Chat Completions API through a bridging mechanism. This document explains how the streaming system works, its key components, and data flow.
Overview¶
The streaming system is designed around the principle that:
Responses API format is the standard: All streaming events follow the
TResponseStreamEvent
type from OpenAI’s Responses APIChat Completions API compatibility: The
OpenAIChatCompletionsModel
acts as a bridge, converting Chat Completions chunks into Responses API eventsUnified streaming interface: Both APIs produce the same
AsyncIterator[TResponseStreamEvent]
output
Key Components¶
1. Core Streaming Types¶
Location: /src/agents/items.py
TResponseStreamEvent = ResponseStreamEvent
This type alias represents the standardized streaming event from OpenAI’s Responses API, which includes events like:
ResponseCreatedEvent
ResponseTextDeltaEvent
ResponseContentPartAddedEvent
ResponseFunctionCallArgumentsDeltaEvent
ResponseCompletedEvent
2. Model Interface¶
Location: /src/agents/models/interface.py
async def stream_response(
self,
system_instructions: str | None,
input: str | list[TResponseInputItem],
model_settings: ModelSettings,
tools: list[Tool],
output_schema: AgentOutputSchema | None,
handoffs: list[Handoff],
tracing: ModelTracing,
) -> AsyncIterator[TResponseStreamEvent]:
All model implementations must implement this unified streaming interface.
3. Response API Implementation¶
Location: /src/agents/models/openai_responses.py
The OpenAIResponsesModel
provides native Responses API support:
async def stream_response(self, ...) -> AsyncIterator[TResponseStreamEvent]:
response_stream = await self._fetch_response(..., stream=True)
async for event in response_stream:
yield event # Direct pass-through - no conversion needed
Key characteristics:
Direct streaming: Events are passed through without conversion
Native format: Already in
TResponseStreamEvent
formatFull feature support: Supports all Responses API features (file search, computer use, etc.)
4. Chat Completions Bridge Implementation¶
Location: /src/agents/models/openai_chatcompletions.py
The OpenAIChatCompletionsModel
acts as a streaming adapter that converts Chat Completions chunks into Responses API events:
Key Data Structures¶
@dataclass
class _StreamingState:
started: bool = False
text_content_index_and_output: tuple[int, ResponseOutputText] | None = None
refusal_content_index_and_output: tuple[int, ResponseOutputRefusal] | None = None
function_calls: dict[int, ResponseFunctionToolCall] = field(default_factory=dict)
This state tracker accumulates Chat Completions deltas and reconstructs them as Responses API events.
Streaming Conversion Process¶
graph TD A[Chat Completions Stream] --> B[ChatCompletionChunk] B --> C{Chunk Type} C -->|Text Content| D[ResponseTextDeltaEvent] C -->|Tool Call| E[Accumulate in _StreamingState] C -->|Refusal| F[ResponseRefusalDeltaEvent] D --> G[ResponseCompletedEvent] E --> H[ResponseFunctionCallArgumentsDeltaEvent] F --> G H --> G G --> I[TResponseStreamEvent Output]
Step-by-Step Conversion Flow¶
Initialize State: Create
_StreamingState
to track streaming progressProcess Chunks: For each
ChatCompletionChunk
:Text Content: Convert to
ResponseTextDeltaEvent
Tool Calls: Accumulate in state (name/args come in fragments)
Refusals: Convert to
ResponseRefusalDeltaEvent
Emit Structured Events:
ResponseCreatedEvent
(start)ResponseContentPartAddedEvent
(new content part)ResponseTextDeltaEvent
(text deltas)ResponseOutputItemAddedEvent
(tool calls)ResponseContentPartDoneEvent
(content completion)ResponseCompletedEvent
(final)
Key Conversion Challenges¶
Problem: Chat Completions tool calls stream in fragments:
# Chunk 1: {"tool_calls": [{"index": 0, "function": {"name": "get_weath"}}]}
# Chunk 2: {"tool_calls": [{"index": 0, "function": {"name": "er"}}]}
# Chunk 3: {"tool_calls": [{"index": 0, "function": {"arguments": "{\"city\""}}]}
Solution: Accumulate fragments in _StreamingState.function_calls
:
for tc_delta in delta.tool_calls:
if tc_delta.index not in state.function_calls:
state.function_calls[tc_delta.index] = ResponseFunctionToolCall(...)
# Accumulate name and arguments
state.function_calls[tc_delta.index].name += tc_function.name or ""
state.function_calls[tc_delta.index].arguments += tc_function.arguments or ""
Streaming Event Flow¶
High-Level Architecture¶
graph TB subgraph "Agent Runtime" A[Runner.run_streamed] --> B[Model.stream_response] end subgraph "Model Layer" B --> C{Model Type} C -->|Responses API| D[OpenAIResponsesModel] C -->|Chat Completions| E[OpenAIChatCompletionsModel] end subgraph "OpenAI APIs" D --> F[Responses API Stream] E --> G[Chat Completions Stream] end subgraph "Event Processing" F --> H[Direct Pass-through] G --> I[ChatCompletionStreamHandler] I --> J[Convert to ResponseStreamEvent] H --> K[TResponseStreamEvent] J --> K end subgraph "Agent Stream Events" K --> L[RawResponsesStreamEvent] L --> M[RunItemStreamEvent] M --> N[AgentUpdatedStreamEvent] end
Event Types Hierarchy¶
Location: /src/agents/stream_events.py
StreamEvent = Union[RawResponsesStreamEvent, RunItemStreamEvent, AgentUpdatedStreamEvent]
RawResponsesStreamEvent
: Raw LLM events (direct from model)RunItemStreamEvent
: Processed agent events (messages, tool calls, handoffs)AgentUpdatedStreamEvent
: Agent handoff notifications
Processing Pipeline¶
sequenceDiagram participant Client participant Runner participant Model participant OpenAI participant EventQueue Client->>Runner: run_streamed() Runner->>Model: stream_response() Model->>OpenAI: Chat/Responses API call loop For each chunk OpenAI->>Model: Stream chunk Model->>Model: Convert to TResponseStreamEvent Model->>EventQueue: Emit RawResponsesStreamEvent EventQueue->>Runner: Process event Runner->>Runner: Generate RunItemStreamEvent Runner->>EventQueue: Emit processed event EventQueue->>Client: Stream event to consumer end Model->>EventQueue: Emit final ResponseCompletedEvent Runner->>EventQueue: Complete streaming
Key Functions and Methods¶
OpenAI Responses Model (openai_responses.py
)¶
stream_response()
¶
Purpose: Native Responses API streaming
Flow: Direct pass-through of OpenAI Responses events
Location: Lines 125-171
async def stream_response(self, ...) -> AsyncIterator[TResponseStreamEvent]:
response_stream = await self._fetch_response(..., stream=True)
async for event in response_stream:
# Direct yield - no conversion needed
yield event
OpenAI Chat Completions Model (openai_chatcompletions.py
)¶
stream_response()
¶
Purpose: Convert Chat Completions chunks to Responses API format
Flow: Chunk → State Accumulation → Event Emission
Location: Lines 160-421
_StreamingState
Class¶
Purpose: Track partial streaming data across chunks
Fields:
text_content_index_and_output
: Text content trackingfunction_calls
: Tool call accumulationrefusal_content_index_and_output
: Refusal tracking
Streaming State Management¶
# Initialize tracking state
state = _StreamingState()
# Process each chunk
async for chunk in stream:
delta = chunk.choices[0].delta
# Handle text content
if delta.content:
if not state.text_content_index_and_output:
# Initialize new content part
state.text_content_index_and_output = (index, ResponseOutputText(...))
yield ResponseContentPartAddedEvent(...)
# Emit text delta
yield ResponseTextDeltaEvent(delta=delta.content, ...)
state.text_content_index_and_output[1].text += delta.content
Data Structure Mapping¶
Chat Completions → Responses API Conversion¶
Chat Completions |
Responses API |
Notes |
---|---|---|
|
|
Container type |
|
|
Text streaming |
|
|
Tool calls (accumulated) |
|
|
Model refusals |
|
|
Final usage stats |
Event Lifecycle¶
stateDiagram-v2 [*] --> StreamStart StreamStart --> ResponseCreated : ResponseCreatedEvent ResponseCreated --> ContentAdded : ResponseContentPartAddedEvent ContentAdded --> TextDelta : ResponseTextDeltaEvent TextDelta --> TextDelta : More text chunks TextDelta --> ContentDone : ResponseContentPartDoneEvent ContentDone --> ToolCall : ResponseOutputItemAddedEvent ToolCall --> ToolArgs : ResponseFunctionCallArgumentsDeltaEvent ToolArgs --> ToolDone : ResponseOutputItemDoneEvent ToolDone --> StreamComplete : ResponseCompletedEvent StreamComplete --> [*]
Integration Points¶
Runner Integration¶
Location: /src/agents/run.py
The Runner
class orchestrates streaming by:
Creating Stream:
run_streamed()
returnsRunResultStreaming
Event Processing: Converts raw model events to semantic agent events
Queue Management: Uses
asyncio.Queue
for event distribution
# Create streaming result container
streamed_result = RunResultStreaming(...)
# Process model stream events
async for raw_event in model.stream_response(...):
# Emit raw event
queue.put_nowait(RawResponsesStreamEvent(data=raw_event))
# Process and emit semantic events
if isinstance(raw_event, ResponseCompletedEvent):
# Generate RunItemStreamEvent for new messages/tools
processed_items = process_response(raw_event.response)
for item in processed_items:
queue.put_nowait(RunItemStreamEvent(item=item, name=item.type))
Usage Examples¶
Basic Streaming¶
agent = Agent(...)
result = Runner.run_streamed(agent, "Hello")
async for event in result.stream_events():
if event.type == "raw_response_event":
# Raw LLM event
llm_event = event.data
if llm_event.type == "response.output_text.delta":
print(llm_event.delta, end="")
elif event.type == "run_item_stream_event":
# Processed agent event
if event.name == "message_output_created":
print(f"New message: {event.item}")
Event Filtering¶
async for event in result.stream_events():
match event:
case RawResponsesStreamEvent(data=ResponseTextDeltaEvent(delta=text)):
print(text, end="")
case RunItemStreamEvent(name="tool_called", item=tool_call):
print(f"Calling tool: {tool_call.name}")
case AgentUpdatedStreamEvent(new_agent=agent):
print(f"Switched to agent: {agent.name}")
Benefits of This Architecture¶
API Unification: Both Chat Completions and Responses APIs produce identical streaming output
Future Proofing: New OpenAI APIs can be integrated by implementing the same interface
Rich Event Types: Supports text, tool calls, refusals, handoffs, and custom events
Backward Compatibility: Existing Chat Completions models work seamlessly
Performance: Minimal overhead for native Responses API, efficient conversion for Chat Completions
Implementation Notes¶
Fake IDs¶
Both models use FAKE_RESPONSES_ID
for event identification since Chat Completions doesn’t provide response IDs like the Responses API.
Error Handling¶
Stream errors are propagated through the async iterator, maintaining consistency with OpenAI’s streaming patterns.
Memory Management¶
The _StreamingState
is designed to have minimal memory footprint, accumulating only necessary data for event generation.
This architecture demonstrates how the OpenAI Agents SDK successfully bridges different API formats while maintaining a consistent, powerful streaming interface for developers.