Streaming¶
Streaming allows you to receive real-time updates as your agent executes steps, tools, and generates responses. This enables you to build responsive user interfaces and monitor agent progress in real-time.
Overview¶
AdalFlow’s streaming architecture provides two types of real-time events:
RawResponsesStreamEvent: Token-level updates from the language model
RunItemStreamEvent: High-level agent execution progress (tool calls, step completion, etc.)
Both event types can be consumed simultaneously, giving you fine-grained control over how you handle streaming data.
Basic Streaming¶
The simplest way to stream agent execution is using the Runner.astream()
method which returns a RunnerStreamingResult
object. You can consume the events by calling the stream_events()
method on the RunnerStreamingResult
object which internally holds an asyncio queue.
# Define a simple tool
def calculator(expression: str) -> str:
"""Evaluate a mathematical expression."""
try:
result = eval(expression)
return f"Result: {result}"
except Exception as e:
return f"Error: {e}"
# Create agent and runner
agent = Agent(
name="StreamingAgent",
tools=[FunctionTool(calculator)],
model_client=OpenAIClient(),
model_kwargs={"model": "gpt-4o", "temperature": 0.3},
max_steps=5
)
runner = Runner(agent=agent)
async def stream_example():
# Start streaming execution
streaming_result = runner.astream(
prompt_kwargs={"input_str": "What is 15 * 7 + 23?"},
model_kwargs={"stream": True}
)
# Process streaming events
async for event in streaming_result.stream_events():
print(f"Event: {event}")
# Run the example
asyncio.run(stream_example())
Raw Response Stream Events¶
Raw response stream events are raw events from the language model. These events contain the streaming chunks as they’re generated:
# Define a simple tool
def calculator(expression: str) -> str:
"""Evaluate a mathematical expression."""
try:
result = eval(expression)
return f"Result: {result}"
except Exception as e:
return f"Error: {e}"
# Create agent and runner
agent = Agent(
name="StreamingAgent",
tools=[FunctionTool(calculator)],
model_client=OpenAIClient(),
model_kwargs={"model": "gpt-4o", "temperature": 0.3},
max_steps=5
)
runner = Runner(agent=agent)
async def handle_raw_responses():
streaming_result = runner.astream(
prompt_kwargs={"input_str": "Calculate 25 * 4 and explain the result"},
model_kwargs={"stream": True}
)
async for event in streaming_result.stream_events():
if isinstance(event, RawResponsesStreamEvent):
# Process raw model output
if hasattr(event.data, 'choices') and event.data.choices:
delta = event.data.choices[0].delta
if hasattr(delta, 'content') and delta.content:
print(delta.content, end='', flush=True)
asyncio.run(handle_raw_responses())
Streaming Event Reference¶
Event Types¶
Event Type |
Description |
When Emitted |
---|---|---|
|
Raw model output chunks |
During model generation |
|
Tool about to be executed |
Before tool execution |
|
Tool execution result |
After tool execution |
|
Agent step completed |
After each reasoning step |
|
Final execution result |
At completion |
Event Names¶
Event Name |
Description |
---|---|
|
Tool execution starting |
|
Tool intermediate activity and progress updates |
|
Tool execution completed |
|
Reasoning step completed |
|
Final processed output available |
|
Entire execution finished |
|
Tool permission request before execution |
This comprehensive streaming system enables you to build responsive, real-time applications with AdalFlow agents while maintaining full control over the execution flow and user experience.
Raw Response Event Structure¶
Raw response stream events contain the streaming data under its data
field,directly from the model client:
# Example raw response event data
{
"type": "raw_response_event",
"data": {
"choices": [{
"delta": {
"content": "Quantum computing is a revolutionary technology..."
},
"index": 0
}]
}
}
Run Item Events¶
Run item events provide high-level updates about agent execution progress. These events tell you when tools are called, when steps complete, and when the final answer is ready:
# Define a simple tool
def calculator(expression: str) -> str:
"""Evaluate a mathematical expression."""
try:
result = eval(expression)
return f"Result: {result}"
except Exception as e:
return f"Error: {e}"
# Create agent and runner
agent = Agent(
name="StreamingAgent",
tools=[FunctionTool(calculator)],
model_client=OpenAIClient(),
model_kwargs={"model": "gpt-4o", "temperature": 0.3},
max_steps=5
)
runner = Runner(agent=agent)
async def handle_agent_events():
streaming_result = runner.astream(
prompt_kwargs={"input_str": "Calculate 15 * 7 + 23 and explain the steps"},
model_kwargs={"stream": True}
)
async for event in streaming_result.stream_events():
if isinstance(event, RunItemStreamEvent):
if isinstance(event.item, ToolCallRunItem):
print(f"🔧 Calling tool: {event.item.data.name}")
elif isinstance(event.item, ToolOutputRunItem):
print(f"✅ Tool completed: {event.item.data.output}")
elif isinstance(event.item, FinalOutputItem):
print(f"🎯 Final answer: {event.item.data.answer}")
asyncio.run(handle_agent_events())
Streaming with Different Model Clients¶
OpenAI Streaming¶
def calculator(expression: str) -> str:
try:
result = eval(expression)
return f"Result: {result}"
except Exception as e:
return f"Error: {e}"
openai_agent = Agent(
name="OpenAIAgent",
tools=[FunctionTool(calculator)],
model_client=OpenAIClient(),
model_kwargs={"model": "gpt-4o", "stream": True},
max_steps=5
)
Anthropic Streaming¶
def calculator(expression: str) -> str:
try:
result = eval(expression)
return f"Result: {result}"
except Exception as e:
return f"Error: {e}"
anthropic_agent = Agent(
name="AnthropicAgent",
tools=[FunctionTool(calculator)],
model_client=AnthropicAPIClient(),
model_kwargs={"model": "claude-3-5-haiku-20241022", "stream": True, "temperature": 0.8},
max_steps=5
)