Agent Streaming Architecture in OpenAI Agents SDK¶
The OpenAI Agents SDK implements a sophisticated streaming architecture that standardizes around the OpenAI Responses API format while providing compatibility with the Chat Completions API through a bridging mechanism. This document explains how the streaming system works, its key components, and data flow.
Overview¶
The streaming system is designed around the principle that:
Responses API format is the standard: All streaming events follow the
TResponseStreamEventtype from OpenAI’s Responses APIChat Completions API compatibility: The
OpenAIChatCompletionsModelacts as a bridge, converting Chat Completions chunks into Responses API eventsUnified streaming interface: Both APIs produce the same
AsyncIterator[TResponseStreamEvent]output
Key Components¶
1. Core Streaming Types¶
Location: /src/agents/items.py
TResponseStreamEvent = ResponseStreamEvent
This type alias represents the standardized streaming event from OpenAI’s Responses API, which includes events like:
ResponseCreatedEventResponseTextDeltaEventResponseContentPartAddedEventResponseFunctionCallArgumentsDeltaEventResponseCompletedEvent
2. Model Interface¶
Location: /src/agents/models/interface.py
async def stream_response(
self,
system_instructions: str | None,
input: str | list[TResponseInputItem],
model_settings: ModelSettings,
tools: list[Tool],
output_schema: AgentOutputSchema | None,
handoffs: list[Handoff],
tracing: ModelTracing,
) -> AsyncIterator[TResponseStreamEvent]:
All model implementations must implement this unified streaming interface.
3. Response API Implementation¶
Location: /src/agents/models/openai_responses.py
The OpenAIResponsesModel provides native Responses API support:
async def stream_response(self, ...) -> AsyncIterator[TResponseStreamEvent]:
response_stream = await self._fetch_response(..., stream=True)
async for event in response_stream:
yield event # Direct pass-through - no conversion needed
Key characteristics:
Direct streaming: Events are passed through without conversion
Native format: Already in
TResponseStreamEventformatFull feature support: Supports all Responses API features (file search, computer use, etc.)
4. Chat Completions Bridge Implementation¶
Location: /src/agents/models/openai_chatcompletions.py
The OpenAIChatCompletionsModel acts as a streaming adapter that converts Chat Completions chunks into Responses API events:
Key Data Structures¶
@dataclass
class _StreamingState:
started: bool = False
text_content_index_and_output: tuple[int, ResponseOutputText] | None = None
refusal_content_index_and_output: tuple[int, ResponseOutputRefusal] | None = None
function_calls: dict[int, ResponseFunctionToolCall] = field(default_factory=dict)
This state tracker accumulates Chat Completions deltas and reconstructs them as Responses API events.
Streaming Conversion Process¶
graph TD
A[Chat Completions Stream] --> B[ChatCompletionChunk]
B --> C{Chunk Type}
C -->|Text Content| D[ResponseTextDeltaEvent]
C -->|Tool Call| E[Accumulate in _StreamingState]
C -->|Refusal| F[ResponseRefusalDeltaEvent]
D --> G[ResponseCompletedEvent]
E --> H[ResponseFunctionCallArgumentsDeltaEvent]
F --> G
H --> G
G --> I[TResponseStreamEvent Output]
Step-by-Step Conversion Flow¶
Initialize State: Create
_StreamingStateto track streaming progressProcess Chunks: For each
ChatCompletionChunk:Text Content: Convert to
ResponseTextDeltaEventTool Calls: Accumulate in state (name/args come in fragments)
Refusals: Convert to
ResponseRefusalDeltaEvent
Emit Structured Events:
ResponseCreatedEvent(start)ResponseContentPartAddedEvent(new content part)ResponseTextDeltaEvent(text deltas)ResponseOutputItemAddedEvent(tool calls)ResponseContentPartDoneEvent(content completion)ResponseCompletedEvent(final)
Key Conversion Challenges¶
Problem: Chat Completions tool calls stream in fragments:
# Chunk 1: {"tool_calls": [{"index": 0, "function": {"name": "get_weath"}}]}
# Chunk 2: {"tool_calls": [{"index": 0, "function": {"name": "er"}}]}
# Chunk 3: {"tool_calls": [{"index": 0, "function": {"arguments": "{\"city\""}}]}
Solution: Accumulate fragments in _StreamingState.function_calls:
for tc_delta in delta.tool_calls:
if tc_delta.index not in state.function_calls:
state.function_calls[tc_delta.index] = ResponseFunctionToolCall(...)
# Accumulate name and arguments
state.function_calls[tc_delta.index].name += tc_function.name or ""
state.function_calls[tc_delta.index].arguments += tc_function.arguments or ""
Streaming Event Flow¶
High-Level Architecture¶
graph TB
subgraph "Agent Runtime"
A[Runner.run_streamed] --> B[Model.stream_response]
end
subgraph "Model Layer"
B --> C{Model Type}
C -->|Responses API| D[OpenAIResponsesModel]
C -->|Chat Completions| E[OpenAIChatCompletionsModel]
end
subgraph "OpenAI APIs"
D --> F[Responses API Stream]
E --> G[Chat Completions Stream]
end
subgraph "Event Processing"
F --> H[Direct Pass-through]
G --> I[ChatCompletionStreamHandler]
I --> J[Convert to ResponseStreamEvent]
H --> K[TResponseStreamEvent]
J --> K
end
subgraph "Agent Stream Events"
K --> L[RawResponsesStreamEvent]
L --> M[RunItemStreamEvent]
M --> N[AgentUpdatedStreamEvent]
end
Event Types Hierarchy¶
Location: /src/agents/stream_events.py
StreamEvent = Union[RawResponsesStreamEvent, RunItemStreamEvent, AgentUpdatedStreamEvent]
RawResponsesStreamEvent: Raw LLM events (direct from model)RunItemStreamEvent: Processed agent events (messages, tool calls, handoffs)AgentUpdatedStreamEvent: Agent handoff notifications
Processing Pipeline¶
sequenceDiagram
participant Client
participant Runner
participant Model
participant OpenAI
participant EventQueue
Client->>Runner: run_streamed()
Runner->>Model: stream_response()
Model->>OpenAI: Chat/Responses API call
loop For each chunk
OpenAI->>Model: Stream chunk
Model->>Model: Convert to TResponseStreamEvent
Model->>EventQueue: Emit RawResponsesStreamEvent
EventQueue->>Runner: Process event
Runner->>Runner: Generate RunItemStreamEvent
Runner->>EventQueue: Emit processed event
EventQueue->>Client: Stream event to consumer
end
Model->>EventQueue: Emit final ResponseCompletedEvent
Runner->>EventQueue: Complete streaming
Key Functions and Methods¶
OpenAI Responses Model (openai_responses.py)¶
stream_response()¶
Purpose: Native Responses API streaming
Flow: Direct pass-through of OpenAI Responses events
Location: Lines 125-171
async def stream_response(self, ...) -> AsyncIterator[TResponseStreamEvent]:
response_stream = await self._fetch_response(..., stream=True)
async for event in response_stream:
# Direct yield - no conversion needed
yield event
OpenAI Chat Completions Model (openai_chatcompletions.py)¶
stream_response()¶
Purpose: Convert Chat Completions chunks to Responses API format
Flow: Chunk → State Accumulation → Event Emission
Location: Lines 160-421
_StreamingState Class¶
Purpose: Track partial streaming data across chunks
Fields:
text_content_index_and_output: Text content trackingfunction_calls: Tool call accumulationrefusal_content_index_and_output: Refusal tracking
Streaming State Management¶
# Initialize tracking state
state = _StreamingState()
# Process each chunk
async for chunk in stream:
delta = chunk.choices[0].delta
# Handle text content
if delta.content:
if not state.text_content_index_and_output:
# Initialize new content part
state.text_content_index_and_output = (index, ResponseOutputText(...))
yield ResponseContentPartAddedEvent(...)
# Emit text delta
yield ResponseTextDeltaEvent(delta=delta.content, ...)
state.text_content_index_and_output[1].text += delta.content
Data Structure Mapping¶
Chat Completions → Responses API Conversion¶
Chat Completions |
Responses API |
Notes |
|---|---|---|
|
|
Container type |
|
|
Text streaming |
|
|
Tool calls (accumulated) |
|
|
Model refusals |
|
|
Final usage stats |
Event Lifecycle¶
stateDiagram-v2
[*] --> StreamStart
StreamStart --> ResponseCreated : ResponseCreatedEvent
ResponseCreated --> ContentAdded : ResponseContentPartAddedEvent
ContentAdded --> TextDelta : ResponseTextDeltaEvent
TextDelta --> TextDelta : More text chunks
TextDelta --> ContentDone : ResponseContentPartDoneEvent
ContentDone --> ToolCall : ResponseOutputItemAddedEvent
ToolCall --> ToolArgs : ResponseFunctionCallArgumentsDeltaEvent
ToolArgs --> ToolDone : ResponseOutputItemDoneEvent
ToolDone --> StreamComplete : ResponseCompletedEvent
StreamComplete --> [*]
Integration Points¶
Runner Integration¶
Location: /src/agents/run.py
The Runner class orchestrates streaming by:
Creating Stream:
run_streamed()returnsRunResultStreamingEvent Processing: Converts raw model events to semantic agent events
Queue Management: Uses
asyncio.Queuefor event distribution
# Create streaming result container
streamed_result = RunResultStreaming(...)
# Process model stream events
async for raw_event in model.stream_response(...):
# Emit raw event
queue.put_nowait(RawResponsesStreamEvent(data=raw_event))
# Process and emit semantic events
if isinstance(raw_event, ResponseCompletedEvent):
# Generate RunItemStreamEvent for new messages/tools
processed_items = process_response(raw_event.response)
for item in processed_items:
queue.put_nowait(RunItemStreamEvent(item=item, name=item.type))
Usage Examples¶
Basic Streaming¶
agent = Agent(...)
result = Runner.run_streamed(agent, "Hello")
async for event in result.stream_events():
if event.type == "raw_response_event":
# Raw LLM event
llm_event = event.data
if llm_event.type == "response.output_text.delta":
print(llm_event.delta, end="")
elif event.type == "run_item_stream_event":
# Processed agent event
if event.name == "message_output_created":
print(f"New message: {event.item}")
Event Filtering¶
async for event in result.stream_events():
match event:
case RawResponsesStreamEvent(data=ResponseTextDeltaEvent(delta=text)):
print(text, end="")
case RunItemStreamEvent(name="tool_called", item=tool_call):
print(f"Calling tool: {tool_call.name}")
case AgentUpdatedStreamEvent(new_agent=agent):
print(f"Switched to agent: {agent.name}")
Benefits of This Architecture¶
API Unification: Both Chat Completions and Responses APIs produce identical streaming output
Future Proofing: New OpenAI APIs can be integrated by implementing the same interface
Rich Event Types: Supports text, tool calls, refusals, handoffs, and custom events
Backward Compatibility: Existing Chat Completions models work seamlessly
Performance: Minimal overhead for native Responses API, efficient conversion for Chat Completions
Implementation Notes¶
Fake IDs¶
Both models use FAKE_RESPONSES_ID for event identification since Chat Completions doesn’t provide response IDs like the Responses API.
Error Handling¶
Stream errors are propagated through the async iterator, maintaining consistency with OpenAI’s streaming patterns.
Memory Management¶
The _StreamingState is designed to have minimal memory footprint, accumulating only necessary data for event generation.
This architecture demonstrates how the OpenAI Agents SDK successfully bridges different API formats while maintaining a consistent, powerful streaming interface for developers.