types¶
Functional data classes to support functional components like Generator, Retriever, and Assistant.
Functions
|
Get the required keys in model_kwargs for a specific model type. |
Classes
|
|
|
In sync with OpenAI completion usage api spec at openai/types/completion_usage.py |
|
A conversation manages the dialog turns in a whole conversation as a session. |
|
A turn consists of a user query and the assistant response. |
|
A text container with optional metadata and vector representation. |
|
Container to hold the response from an Embedder datacomponent for a single batch of input. |
alias of |
|
|
Container for a single embedding. |
|
|
|
Event emitted when the entire Runner execution has completed. |
|
The data modeling of a function call, including the name and keyword arguments. |
|
The data modeling of a function definition, including the name, description, and parameters. |
|
The data modeling of a function expression for a call, including the name and arguments. |
|
The output of a tool, which could be a function, a class, or a module. |
|
Event emitted when the Agent is about to execute a function/tool call. |
|
The output data class for the Generator component. |
|
Details about input tokens used in a response |
A quick way to access all model clients in the ModelClient module. |
|
|
The type of the model, including Embedder, LLM, Reranker. |
|
Details about output tokens used in a response |
Sentinel to indicate queue completion. |
|
|
Special sentinel object to mark the end of a stream when using asyncio.Queue for stream processing. |
|
Streaming event for storing the raw responses from the LLM. |
|
Usage information for a response, including token counts, in sync with OpenAI response usage api spec at openai/types/response_usage.py |
|
Save the output of a single query in retrievers. |
|
Base class for streaming execution events in the Runner system. |
|
Wrapper for streaming RunItem events during Runner execution. |
|
|
|
Container for runner streaming results that provides access to the event queue and allows users to consume streaming events. |
|
The output of a single step in the agent. |
|
Event emitted when a complete execution step has finished. |
|
similar to openai.ChatCompletionTokenLogprob |
|
Event emitted during the execution of a function/tool call. |
|
Event emitted when the Agent is about to execute a function/tool call. |
|
Event emitted when the Agent is about to execute a function/tool call. |
|
Using ToolOutput means a completed tool call even if it has error |
|
Event emitted after a function/tool call has been executed. |
|
In sync with OpenAI embedding api spec, same as openai/types/create_embedding_response.py |
|
- class ModelType(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]¶
Bases:
Enum
The type of the model, including Embedder, LLM, Reranker. It helps ModelClient identify the model type required to correctly call the model.
- EMBEDDER = 1¶
- LLM = 2¶
- LLM_REASONING = 3¶
- RERANKER = 4¶
- IMAGE_GENERATION = 5¶
- UNDEFINED = 6¶
- class ModelClientType[source]¶
Bases:
object
A quick way to access all model clients in the ModelClient module.
From this:
from adalflow.components.model_client import CohereAPIClient, TransformersClient, AnthropicAPIClient, GroqAPIClient, OpenAIClient model_client = OpenAIClient()
To this:
from adalflow.core.types import ModelClientType model_client = ModelClientType.OPENAI
- get_model_args(model_type: ModelType) List[str] [source]¶
Get the required keys in model_kwargs for a specific model type.
note: If your model inference sdk uses different keys, you need to convert them to the standard keys here in their specifc ModelClient.
- Parameters:
model_type (ModelType) – The model type
- Returns:
The required keys in model_kwargs
- Return type:
List[str]
- class Embedding(embedding: List[float], index: int | None)[source]¶
Bases:
object
Container for a single embedding.
In sync with api spec, same as openai/types/embedding.py
- embedding: List[float]¶
- index: int | None¶
- class Usage(prompt_tokens: int, total_tokens: int)[source]¶
Bases:
object
In sync with OpenAI embedding api spec, same as openai/types/create_embedding_response.py
- prompt_tokens: int¶
- total_tokens: int¶
- class EmbedderOutput(data: ~typing.List[~core.types.Embedding] = <factory>, model: str | None = None, usage: ~core.types.Usage | None = None, error: str | None = None, raw_response: ~typing.Any | None = None, input: ~typing.List[str] | None = None)[source]¶
Bases:
DataClass
Container to hold the response from an Embedder datacomponent for a single batch of input.
Data standard for Embedder model output to interact with other components. Batch processing is often available, thus we need a list of Embedding objects.
- model: str | None = None¶
- error: str | None = None¶
- raw_response: Any | None = None¶
- input: List[str] | None = None¶
- property length: int¶
- property embedding_dim: int¶
The dimension of the embedding, assuming all embeddings have the same dimension.
- Returns:
The dimension of the embedding, -1 if no embedding is available
- Return type:
int
- property is_normalized: bool¶
Check if the embeddings are normalized to unit vectors.
- Returns:
True if the embeddings are normalized, False otherwise
- Return type:
bool
- EmbedderOutputType¶
alias of
EmbedderOutput
- class TokenLogProb(token: str, logprob: float)[source]¶
Bases:
object
similar to openai.ChatCompletionTokenLogprob
- token: str¶
- logprob: float¶
- class CompletionUsage(completion_tokens: int | None = None, prompt_tokens: int | None = None, total_tokens: int | None = None)[source]¶
Bases:
object
In sync with OpenAI completion usage api spec at openai/types/completion_usage.py
- completion_tokens: int | None = None¶
- prompt_tokens: int | None = None¶
- total_tokens: int | None = None¶
- class InputTokensDetails(cached_tokens: int | None = 0)[source]¶
Bases:
object
Details about input tokens used in a response
- cached_tokens: int | None = 0¶
- class OutputTokensDetails(reasoning_tokens: int | None = 0)[source]¶
Bases:
object
Details about output tokens used in a response
- reasoning_tokens: int | None = 0¶
- class ResponseUsage(input_tokens: int, output_tokens: int, total_tokens: int, input_tokens_details: ~core.types.InputTokensDetails = <factory>, output_tokens_details: ~core.types.OutputTokensDetails = <factory>)[source]¶
Bases:
object
Usage information for a response, including token counts, in sync with OpenAI response usage api spec at openai/types/response_usage.py
- input_tokens: int¶
- output_tokens: int¶
- total_tokens: int¶
- input_tokens_details: InputTokensDetails¶
- output_tokens_details: OutputTokensDetails¶
- class RetrieverOutput(id: str = None, doc_indices: ~typing.List[int] = <function required_field>, doc_scores: ~typing.List[float] = None, query: ~core.types.RetrieverQueryType = None, documents: ~typing.List[~core.types.RetrieverDocumentType] = None)[source]¶
Bases:
DataClass
Save the output of a single query in retrievers.
It is up to the subclass of Retriever to specify the type of query and document.
- id: str = None¶
- doc_indices() Callable[[], Any] ¶
A factory function to create a required field in a dataclass. The returned callable raises a TypeError when invoked, indicating a required field was not provided.
- Parameters:
name (Optional[str], optional) – The name of the required field. Defaults to None
- Returns:
A callable that raises TypeError when called, indicating a missing required field.
- Return type:
Callable[[], Any]
Example:
from dataclasses import dataclass from adalflow.core.base_data_class import required_field, DataClass @dataclass class Person(DataClass): name: str = field(default=None) age: int = field(default_factory=required_field())# allow required field after optional field
- doc_scores: List[float] = None¶
- query: RetrieverQueryType = None¶
- documents: List[RetrieverDocumentType] = None¶
- class FunctionDefinition(func_name: str = <function required_field>, func_desc: str | None = None, func_parameters: ~typing.Dict[str, object] = <factory>)[source]¶
Bases:
DataClass
The data modeling of a function definition, including the name, description, and parameters.
- func_name() Callable[[], Any] ¶
A factory function to create a required field in a dataclass. The returned callable raises a TypeError when invoked, indicating a required field was not provided.
- Parameters:
name (Optional[str], optional) – The name of the required field. Defaults to None
- Returns:
A callable that raises TypeError when called, indicating a missing required field.
- Return type:
Callable[[], Any]
Example:
from dataclasses import dataclass from adalflow.core.base_data_class import required_field, DataClass @dataclass class Person(DataClass): name: str = field(default=None) age: int = field(default_factory=required_field())# allow required field after optional field
- func_desc: str | None = None¶
- func_parameters: Dict[str, object]¶
- class Function(id: str | None = None, thought: str | None = None, name: str = '', args: ~typing.List[object] | None = <factory>, kwargs: ~typing.Dict[str, object] | None = <factory>, _is_answer_final: bool | None = None, _answer: ~typing.Any | None = None)[source]¶
Bases:
DataClass
The data modeling of a function call, including the name and keyword arguments.
You can use the exclude in
to_json()
andto_yaml()
to exclude the thought field if you do not want to use chain-of-thought pattern.Example:
# assume the function is added in a context_map # context_map = {"add": add} def add(a, b): return a + b # call function add with arguments 1 and 2 fun = Function(name="add", kwargs={"a": 1, "b": 2}) # evaluate the function result = context_map[fun.name](**fun.kwargs) # or call with positional arguments fun = Function(name="add", args=[1, 2]) result = context_map[fun.name](*fun.args)
- id: str | None = None¶
- thought: str | None = None¶
- name: str = ''¶
- args: List[object] | None¶
- kwargs: Dict[str, object] | None¶
- classmethod from_function(func: Callable[[...], Any] | Callable[[...], Awaitable[Any]], thought: str | None = None, *args, **kwargs) Function [source]¶
Create a Function object from a function.
- Parameters:
fun (Union[Callable[..., Any], AsyncCallable]) – The function to be converted
- Returns:
The Function object
- Return type:
Usage: 1. Create a Function object from a function call: 2. use
to_json()
andto_yaml()
to get the schema in JSON or YAML format. 3. This will be used as an example in prompt showing LLM how to call the function.Example:
from adalflow.core.types import Function def add(a, b): return a + b # create a function call object with positional arguments fun = Function.from_function(add, thought="Add two numbers", 1, 2) print(fun) # output # Function(thought='Add two numbers', name='add', args=[1, 2])
- class QueueSentinel(type: Literal['queue_sentinel'] = 'queue_sentinel')[source]¶
Bases:
object
Special sentinel object to mark the end of a stream when using asyncio.Queue for stream processing.
- type: Literal['queue_sentinel'] = 'queue_sentinel'¶
Type discriminator for the sentinel.
- class RawResponsesStreamEvent(input: Any | None = None, data: Any | None = None, type: Literal['raw_response_event'] = 'raw_response_event', error: str | None = None)[source]¶
Bases:
DataClass
Streaming event for storing the raw responses from the LLM. These are ‘raw’ events, i.e. they are directly passed through from the LLM.
- input: Any | None = None¶
The input to the LLM.
- data: Any | None = None¶
The raw responses streaming event from the LLM.
- type: Literal['raw_response_event'] = 'raw_response_event'¶
The type of the event.
- error: str | None = None¶
The error message if any.
- class GeneratorOutput(id: str | None = None, input: Any | None = None, data: T_co = None, thinking: str | None = None, tool_use: Function | None = None, images: str | List[str] | None = None, error: str | None = None, usage: CompletionUsage | None = None, raw_response: str | AsyncIterable[T_co] | Iterable[T_co] | None = None, api_response: Any | None = None, metadata: Dict[str, object] | None = None)[source]¶
Bases:
DataClass
,Generic
[T_co
]The output data class for the Generator component. We can not control its output 100%, so we use this to track the error_message and allow the raw string output to be passed through.
(1) When model predict and output processors are both without error, we have data as the final output, error as None. (2) When either model predict or output processors have error, we have data as None, error as the error message.
Raw_response will depends on the model predict.
- id: str | None = None¶
- input: Any | None = None¶
- data: T_co = None¶
- thinking: str | None = None¶
- images: str | List[str] | None = None¶
- error: str | None = None¶
- usage: CompletionUsage | None = None¶
- raw_response: str | AsyncIterable[T_co] | Iterable[T_co] | None = None¶
- api_response: Any | None = None¶
- metadata: Dict[str, object] | None = None¶
- async stream_events() AsyncIterator[T_co] [source]¶
Stream raw events from the Generator’s raw response which has the processed version of api_response. If the raw_response has already been consumed, yield from the data field
- Returns:
An async iterator that yields events stored in raw_response
- Return type:
AsyncIterator[T_co]
- save_images(directory: str = '.', prefix: str = 'generated', format: Literal['png', 'jpg', 'jpeg', 'webp', 'gif', 'bmp'] = 'png', decode_base64: bool = True, return_paths: bool = True) List[str] | None [source]¶
Save generated images to disk with automatic format conversion.
- Parameters:
directory – Directory to save images to (default: current directory)
prefix – Filename prefix for saved images (default: “generated”)
format – Image format to save as (png, jpg, jpeg, webp, gif, bmp)
decode_base64 – Whether to decode base64 encoded images (default: True)
return_paths – Whether to return the saved file paths (default: True)
- Returns:
- List[str]: Paths to saved images (always returns a list, even for single image)
None: If no images to save
Otherwise returns None
- Return type:
If return_paths is True
Examples
>>> # Save single image as PNG (returns list with one element) >>> response.save_images() ['generated_0.png']
>>> # Save multiple images as JPEG with custom prefix >>> response.save_images(prefix="cat", format="jpg") ['cat_0.jpg', 'cat_1.jpg']
>>> # Save to specific directory >>> response.save_images(directory="/tmp/images", format="webp") ['/tmp/images/generated_0.webp']
- class FunctionExpression(thought: str = None, action: str = <factory>)[source]¶
Bases:
DataClass
The data modeling of a function expression for a call, including the name and arguments.
Example:
def add(a, b): return a + b # call function add with positional arguments 1 and 2 fun_expr = FunctionExpression(action="add(1, 2)") # evaluate the expression result = eval(fun_expr.action) print(result) # Output: 3 # call function add with keyword arguments fun_expr = FunctionExpression(action="add(a=1, b=2)") result = eval(fun_expr.action) print(result) # Output: 3
Why asking LLM to generate function expression (code snippet) for a function call? - It is more efficient/compact to call a function. - It is more flexible.
for the full range of Python expressions, including arithmetic operations, nested function calls, and more.
allow to pass variables as arguments.
Ease of parsing using
ast
module.
The benefits are less failed function calls.
- thought: str = None¶
- action: str¶
- classmethod from_function(func: Callable[[...], Any] | Callable[[...], Awaitable[Any]], thought: str | None = None, *args, **kwargs) FunctionExpression [source]¶
Create a FunctionExpression object from a function.
- Parameters:
fun (Union[Callable[..., Any], AsyncCallable]) – The function to be converted
- Returns:
The FunctionExpression object
- Return type:
Usage: 1. Create a FunctionExpression object from a function call: 2. use
to_json()
andto_yaml()
to get the schema in JSON or YAML format. 3. This will be used as an example in prompt showing LLM how to call the function.Example:
from adalflow.core.types import FunctionExpression def add(a, b): return a + b # create an expression for the function call and using keyword arguments fun_expr = FunctionExpression.from_function( add, thought="Add two numbers", a=1, b=2 ) print(fun_expr) # output # FunctionExpression(thought='Add two numbers', action='add(a=1, b=2)')
- class FunctionOutput(name: str | None = None, input: Function | FunctionExpression | None = None, parsed_input: Function | None = None, output: Any | Generator[Any, Any, Any] | AsyncGenerator[Any, Any] | Coroutine[Any, Any, Any] | None = None, error: str | None = None)[source]¶
Bases:
DataClass
The output of a tool, which could be a function, a class, or a module.
- name: str | None = None¶
- input: Function | FunctionExpression | None = None¶
- output: Any | Generator[Any, Any, Any] | AsyncGenerator[Any, Any] | Coroutine[Any, Any, Any] | None = None¶
- error: str | None = None¶
- class ToolOutput(output: Any = None, observation: str | None = None, display: str | None = None, is_streaming: bool | None = False, metadata: Dict[str, Any] | None = None, status: Literal['success', 'cancelled', 'error'] = 'success')[source]¶
Bases:
DataClass
Using ToolOutput means a completed tool call even if it has error
- output: Any = None¶
- observation: str | None = None¶
- display: str | None = None¶
- is_streaming: bool | None = False¶
- metadata: Dict[str, Any] | None = None¶
- status: Literal['success', 'cancelled', 'error'] = 'success'¶
- class StepOutput(step: int = 0, planner_prompt: str | None = None, action: T = None, function: Function | None = None, observation: str | None = None, ctx: Dict[str, Any] | None = None)[source]¶
Bases:
DataClass
,Generic
[T
]The output of a single step in the agent. Suits for serial planning agent such as React
- step: int = 0¶
- planner_prompt: str | None = None¶
- action: T = None¶
- observation: str | None = None¶
- ctx: Dict[str, Any] | None = None¶
- class Document(text: str, meta_data: ~typing.Dict[str, ~typing.Any] | None = None, vector: ~typing.List[float] = <factory>, id: str | None = <factory>, order: int | None = None, score: float | None = None, parent_doc_id: str | ~uuid.UUID | None = None, estimated_num_tokens: int | None = None)[source]¶
Bases:
DataClass
A text container with optional metadata and vector representation.
It is the data structure to support functions like Retriever, DocumentSplitter, and used with LocalDB.
- text: str¶
- meta_data: Dict[str, Any] | None = None¶
- vector: List[float]¶
- id: str | None¶
- order: int | None = None¶
- score: float | None = None¶
- parent_doc_id: str | UUID | None = None¶
- estimated_num_tokens: int | None = None¶
- class UserQuery(query_str: str, metadata: Dict[str, Any] | None = None)[source]¶
Bases:
object
- query_str: str¶
- metadata: Dict[str, Any] | None = None¶
- class AssistantResponse(response_str: str, metadata: Dict[str, Any] | None = None)[source]¶
Bases:
object
- response_str: str¶
- metadata: Dict[str, Any] | None = None¶
- class DialogTurn(id: str = <factory>, user_id: str | None = None, conversation_id: str | None = None, order: int | None = None, user_query: ~core.types.UserQuery | None = None, assistant_response: ~core.types.AssistantResponse | None = None, user_query_timestamp: ~datetime.datetime | None = <factory>, assistant_response_timestamp: ~datetime.datetime | None = <factory>, metadata: ~typing.Dict[str, ~typing.Any] | None = None, vector: ~typing.List[float] | None = None)[source]¶
Bases:
DataClass
A turn consists of a user query and the assistant response.
The dataformat is designed to fit into a relational database, where each turn is a row. Use session_id to group the turns into a dialog session with the order field and user_query_timestamp and assistant_response_timestamp to order the turns.
- Parameters:
id (str) – The unique id of the turn.
user_id (str, optional) – The unique id of the user.
session_id (str, optional) – The unique id of the dialog session.
order (int, optional) – The order of the turn in the dialog session, starts from 0.
user_query (UserQuery, optional) – The user query in the turn.
assistant_response (AssistantResponse, optional) – The assistant response in the turn.
user_query_timestamp (datetime, optional) – The timestamp of the user query.
assistant_response_timestamp (datetime, optional) – The timestamp of the assistant response.
metadata (Dict[str, Any], optional) – Additional metadata.
Examples
User: Hi, how are you?
Assistant: Doing great!
DialogTurn(id=uuid4(), user_query=UserQuery(“Hi, how are you?”), assistant_response=AssistantResponse(“Doing great!”))
- id: str¶
- user_id: str | None = None¶
- conversation_id: str | None = None¶
- order: int | None = None¶
- assistant_response: AssistantResponse | None = None¶
- user_query_timestamp: datetime | None¶
- assistant_response_timestamp: datetime | None¶
- metadata: Dict[str, Any] | None = None¶
- vector: List[float] | None = None¶
- set_assistant_response(assistant_response: AssistantResponse, assistant_response_timestamp: datetime | None = None)[source]¶
- class Conversation(id: str = <factory>, name: str | None = None, user_id: str | None = None, dialog_turns: ~collections.OrderedDict[int, ~core.types.DialogTurn] = <factory>, metadata: ~typing.Dict[str, ~typing.Any] | None = None, created_at: ~datetime.datetime | None = <factory>, dialog_turns_input: dataclasses.InitVar[typing.Union[collections.OrderedDict[int, core.types.DialogTurn], typing.List[core.types.DialogTurn], NoneType]] = None)[source]¶
Bases:
object
A conversation manages the dialog turns in a whole conversation as a session.
This class is mainly used in-memory for the dialog system/app to manage active conversations. You won’t need this class for past conversations which have already been persisted in a database as a form of record or history.
- id: str¶
- name: str | None = None¶
- user_id: str | None = None¶
- dialog_turns: OrderedDict[int, DialogTurn]¶
- metadata: Dict[str, Any] | None = None¶
- created_at: datetime | None¶
- dialog_turns_input: dataclasses.InitVar[Union[collections.OrderedDict[int, core.types.DialogTurn], List[core.types.DialogTurn], NoneType]] = None¶
- append_dialog_turn(dialog_turn: DialogTurn)[source]¶
- get_dialog_turns() OrderedDict[int, DialogTurn] [source]¶
- update_dialog_turn(order: int, dialog_turn: DialogTurn)[source]¶
- class RunItem(id: str = <factory>, type: str = 'base', data: ~typing.Any | None = None, error: str | None = None, timestamp: ~datetime.datetime = <factory>)[source]¶
Bases:
DataClass
Base class for streaming execution events in the Runner system.
RunItems represent discrete events that occur during the execution of an Agent through the Runner. These items are used for streaming real-time updates about the execution progress, allowing consumers to monitor and react to different phases of agent execution.
- id¶
Unique identifier for tracking this specific event instance
- Type:
str
- type¶
String identifier for the event type (used for event filtering/routing)
- Type:
str
- data¶
Optional generic data payload (deprecated, prefer specific fields in subclasses)
- Type:
Any | None
- timestamp¶
When this event was created (for debugging and monitoring)
- Type:
datetime.datetime
- Usage:
This is an abstract base class. Use specific subclasses for different event types.
Example
`python # Don't instantiate directly - use subclasses tool_call_event = ToolCallRunItem(function=my_function) `
- id: str¶
- type: str = 'base'¶
- data: Any | None = None¶
- error: str | None = None¶
- timestamp: datetime¶
- class ToolCallRunItem(id: str = <factory>, type: str = 'tool_call_start', data: ~core.types.Function | None = None, error: str | None = None, timestamp: ~datetime.datetime = <factory>)[source]¶
Bases:
RunItem
Event emitted when the Agent is about to execute a function/tool call.
This event is generated after the planner LLM has decided on a function to call but before the function is actually executed. It allows consumers to monitor what tools are being invoked and potentially intervene or log the calls.
- data¶
The Function object containing the tool call details (name, args, kwargs)
- Type:
core.types.Function | None
- Event Flow Position:
Planner generates Function → ToolCallRunItem → Function execution → ToolOutputRunItem
- Usage:
```python # Listen for tool calls in streaming async for event in runner.astream(prompt_kwargs).stream_events():
- if isinstance(event, RunItemStreamEvent) and event.name == “tool_called”:
tool_call_item = event.item print(f”About to call: {tool_call_item.data.name}”)
- type: str = 'tool_call_start'¶
- class ToolCallActivityRunItem(id: str = <factory>, type: str = 'tool_call_activity', data: ~typing.Any | None = None, error: str | None = None, timestamp: ~datetime.datetime = <factory>)[source]¶
Bases:
RunItem
Event emitted during the execution of a function/tool call.
This event provides intermediate updates on the progress of a function/tool call, such as when it starts, completes, or fails. It’s paired with ToolCallRunItem to provide before/after notification to the caller.
- data¶
Any data containing the progress of the tool call
- Type:
Any | None
- Event Flow Position:
ToolCallRunItem → ToolCallActivityRunItem → ToolOutputRunItem
- Usage:
```python # Monitor function execution progress async for event in runner.astream(prompt_kwargs).stream_events():
- if isinstance(event, RunItemStreamEvent) and event.name == “tool_call_activity”:
activity_item = event.item print(f”Function progress: {activity_item.data}”)
- type: str = 'tool_call_activity'¶
- data: Any | None = None¶
- class FunctionRequest(id: str = <factory>, tool_name: str = None, tool: ~core.types.Function | None = None, confirmation_details: ~typing.Any | None = None)[source]¶
Bases:
DataClass
Event emitted when the Agent is about to execute a function/tool call.
- id: str¶
- tool_name: str = None¶
- confirmation_details: Any | None = None¶
- class ToolCallPermissionRequest(id: str = <factory>, type: str = 'tool_call_permission_request', data: ~core.types.FunctionRequest | None = None, error: str | None = None, timestamp: ~datetime.datetime = <factory>)[source]¶
Bases:
RunItem
Event emitted when the Agent is about to execute a function/tool call.
This event is generated after the planner LLM has decided on a function to call but before the function is actually executed. It allows consumers to monitor what tools are being invoked and potentially intervene or log the calls.
- data¶
The Function object containing the tool call details (name, args, kwargs)
- Type:
core.types.FunctionRequest | None
- Event Flow Position:
1. Planner generates Function → ToolCallPermissionRequest → ToolCallPermissionResponse → ToolCallRunItem or when user rejects the tool call, ToolCallPermissionRequest → ToolCallPermissionResponse → ToolOutputRunItem
- Usage:
```python # Listen for tool calls in streaming async for event in runner.astream(prompt_kwargs).stream_events():
- if isinstance(event, RunItemStreamEvent) and event.name == “tool_call_permission_request”:
tool_call_item = event.item print(f”About to call: {tool_call_item.data.name}”)
- type: str = 'tool_call_permission_request'¶
- data: FunctionRequest | None = None¶
- class ToolOutputRunItem(id: str = <factory>, type: str = 'tool_output', data: ~core.types.FunctionOutput | None = None, error: str | None = None, timestamp: ~datetime.datetime = <factory>)[source]¶
Bases:
RunItem
Event emitted after a function/tool call has been executed.
This event contains the complete execution result, including any outputs or errors from the function call. It’s paired with ToolCallRunItem to provide before/after notification to the caller.
- data¶
Complete FunctionOutput containing execution results, errors, etc.
- Type:
core.types.FunctionOutput | None
- Event Flow Position:
ToolCallRunItem → Function execution → ToolOutputRunItem → StepRunItem
- Usage:
```python # Monitor function execution results async for event in runner.astream(prompt_kwargs).stream_events():
- if isinstance(event, RunItemStreamEvent) and event.name == “tool_output”:
output_item = event.item if output_item.data.error:
print(f”Function failed: {output_item.data.error}”)
- else:
print(f”Function result: {output_item.data.output}”)
- type: str = 'tool_output'¶
- data: FunctionOutput | None = None¶
- class StepRunItem(id: str = <factory>, type: str = 'step', data: ~core.types.StepOutput | None = None, error: str | None = None, timestamp: ~datetime.datetime = <factory>)[source]¶
Bases:
RunItem
Event emitted when a complete execution step has finished.
A “step” represents one complete cycle of: planning → tool selection → tool execution. This event marks the completion of that cycle and contains the full step information including the action taken and the observation (result).
- data¶
Complete StepOutput containing step number, action, and observation
- Type:
core.types.StepOutput | None
- Event Flow Position:
ToolOutputRunItem → StepRunItem → (next step or completion)
- Usage:
```python # Track step completion async for event in runner.astream(prompt_kwargs).stream_events():
- if isinstance(event, RunItemStreamEvent) and event.name == “step_completed”:
step_item = event.item print(f”Completed step {step_item.data.step}”)
- type: str = 'step'¶
- data: StepOutput | None = None¶
- class RunnerResult(step_history: List[core.types.StepOutput] = <factory>, answer: Optional[str] = None, error: Optional[str] = None, ctx: Optional[Dict] = None)[source]¶
Bases:
object
- step_history: List[StepOutput]¶
- answer: str | None = None¶
- error: str | None = None¶
- ctx: Dict | None = None¶
- class FinalOutputItem(id: str = <factory>, type: str = 'final_output', data: ~core.types.RunnerResult | None = None, error: str | None = None, timestamp: ~datetime.datetime = <factory>)[source]¶
Bases:
RunItem
Event emitted when the entire Runner execution has completed.
This event signals the end of the execution sequence and contains the final processed result. It’s emitted regardless of whether execution completed successfully or with an error.
- data¶
The final RunnerResponse containing the complete execution result
- Type:
core.types.RunnerResult | None
- Event Flow Position:
Final step → FinalOutputItem (execution complete)
- Usage:
```python # Get final results async for event in runner.astream(prompt_kwargs).stream_events():
- if isinstance(event, RunItemStreamEvent) and event.name == “runner_finished”:
final_item = event.item if final_item.data.error:
print(f”Execution failed: {final_item.data.error}”)
- else:
print(f”Final answer: {final_item.data.answer}”)
- type: str = 'final_output'¶
- data: RunnerResult | None = None¶
- class RunItemStreamEvent(name: Literal['agent.tool_call_start', 'agent.tool_call_activity', 'agent.tool_call_complete', 'agent.step_complete', 'agent.final_output', 'agent.execution_complete', 'agent.tool_permission_request'], item: RunItem, type: Literal['run_item_stream_event'] = 'run_item_stream_event')[source]¶
Bases:
DataClass
Wrapper for streaming RunItem events during Runner execution.
This class wraps RunItem instances with event metadata to create a streaming event system. Each event has a name that indicates what type of execution event occurred, and contains the associated RunItem with the event data.
The streaming system allows consumers to react to different phases of agent execution in real-time, such as when tools are called, when steps complete, or when execution finishes.
- name¶
The specific event type that occurred (see event name literals)
- Type:
Literal[‘agent.tool_call_start’, ‘agent.tool_call_activity’, ‘agent.tool_call_complete’, ‘agent.step_complete’, ‘agent.final_output’, ‘agent.execution_complete’, ‘agent.tool_permission_request’]
- item¶
The RunItem containing the event-specific data
- Type:
- type¶
Always “run_item_stream_event” for type discrimination
- Type:
Literal[‘run_item_stream_event’]
- Event Types:
“agent.final_output”: Final output from the agent (FinalOutputItem)
“agent.tool_permission_request”: Tool permission request before execution
“agent.tool_call_start”: Agent is about to execute a tool (ToolCallRunItem)
“agent.tool_call_activity”: Agent is about to execute a tool (ToolCallActivityRunItem)
“agent.tool_call_complete”: Tool execution completed (ToolOutputRunItem)
“agent.step_complete”: Full execution step finished (StepRunItem)
“agent.execution_complete”: Entire execution completed (FinalOutputItem)
- name: Literal['agent.tool_call_start', 'agent.tool_call_activity', 'agent.tool_call_complete', 'agent.step_complete', 'agent.final_output', 'agent.execution_complete', 'agent.tool_permission_request']¶
The name identifying the specific type of execution event that occurred.
- type: Literal['run_item_stream_event'] = 'run_item_stream_event'¶
Type discriminator for the streaming event system.
- class EventEncoder(*, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None)[source]¶
Bases:
JSONEncoder
- default(obj)[source]¶
Implement this method in a subclass such that it returns a serializable object for
o
, or calls the base implementation (to raise aTypeError
).For example, to support arbitrary iterators, you could implement default like this:
def default(self, o): try: iterable = iter(o) except TypeError: pass else: return list(iterable) # Let the base class default method raise the TypeError return super().default(o)
- class RunnerStreamingResult(_event_queue: ~asyncio.queues.Queue = <factory>, _run_task: ~_asyncio.Task | None = None, _exception: Exception | None = None, answer: ~typing.Any | None = None, step_history: ~typing.List[~typing.Any] = <factory>, _is_complete: bool = False)[source]¶
Bases:
object
Container for runner streaming results that provides access to the event queue and allows users to consume streaming events.
- answer: Any | None = None¶
- step_history: List[Any]¶
- property is_complete: bool¶
Check if the workflow execution is complete.
- put_nowait(item: RawResponsesStreamEvent | RunItemStreamEvent)[source]¶
- async stream_events() AsyncIterator[RawResponsesStreamEvent | RunItemStreamEvent] [source]¶
Stream events from the runner execution.w
- Returns:
An async iterator that yields stream events
- Return type:
AsyncIterator[StreamEvent]
Example
```python result = runner.astream(prompt_kwargs) async for event in result.stream_events():
- if isinstance(event, RawResponsesStreamEvent):
print(f”Raw event: {event.data}”)
- elif isinstance(event, RunItemStreamEvent):
print(f”Run item: {event.name} - {event.item}”)
- async stream_to_json(file_name: str = 'agent_events_stream.json') AsyncIterator[RawResponsesStreamEvent | RunItemStreamEvent] [source]¶
Stream events to a JSON file in real-time while also yielding them.
This method writes events to a JSON file as they arrive, giving a live streaming effect. The JSON file is updated incrementally.
- Parameters:
file_name – The output file name for saving events
- Yields:
StreamEvent – Each event as it arrives
Example
```python result = runner.astream(prompt_kwargs) async for event in result.stream_to_json(“live_events.json”):
# Process event while it’s also being written to file print(f”Event: {event}”)
- stream_to_json_sync(file_name: str = 'agent_events_stream.json')[source]¶
Synchronous wrapper for stream_to_json that returns an iterator.
This allows users to use the streaming JSON functionality in a sync context.
- Parameters:
file_name – The output file name for saving events
- Returns:
Iterator of events
Example
```python result = runner.astream(prompt_kwargs) for event in result.stream_to_json_sync(“live_events.json”):
print(f”Event: {event}”)