types

Functional data classes to support functional components like Generator, Retriever, and Assistant.

Functions

get_model_args(model_type)

Get the required keys in model_kwargs for a specific model type.

Classes

AssistantResponse(response_str[, metadata])

CompletionUsage([completion_tokens, ...])

In sync with OpenAI completion usage api spec at openai/types/completion_usage.py

Conversation(id, name, user_id, ...)

A conversation manages the dialog turns in a whole conversation as a session.

DialogTurn(id, user_id, conversation_id, ...)

A turn consists of a user query and the assistant response.

Document(text, meta_data, ...)

A text container with optional metadata and vector representation.

EmbedderOutput(data, model, usage, error, ...)

Container to hold the response from an Embedder datacomponent for a single batch of input.

EmbedderOutputType

alias of EmbedderOutput

Embedding(embedding, index)

Container for a single embedding.

EventEncoder(*[, skipkeys, ensure_ascii, ...])

FinalOutputItem(id, type, data, error, timestamp)

Event emitted when the entire Runner execution has completed.

Function(id, thought, name, args, kwargs, ...)

The data modeling of a function call, including the name and keyword arguments.

FunctionDefinition(func_name, func_desc, ...)

The data modeling of a function definition, including the name, description, and parameters.

FunctionExpression(thought, action)

The data modeling of a function expression for a call, including the name and arguments.

FunctionOutput([name, input, parsed_input, ...])

The output of a tool, which could be a function, a class, or a module.

FunctionRequest(id, tool_name, tool, ...)

Event emitted when the Agent is about to execute a function/tool call.

GeneratorOutput([id, input, data, thinking, ...])

The output data class for the Generator component.

InputTokensDetails([cached_tokens])

Details about input tokens used in a response

ModelClientType()

A quick way to access all model clients in the ModelClient module.

ModelType(value[, names, module, qualname, ...])

The type of the model, including Embedder, LLM, Reranker.

OutputTokensDetails([reasoning_tokens])

Details about output tokens used in a response

QueueCompleteSentinel()

Sentinel to indicate queue completion.

QueueSentinel([type])

Special sentinel object to mark the end of a stream when using asyncio.Queue for stream processing.

RawResponsesStreamEvent([input, data, type, ...])

Streaming event for storing the raw responses from the LLM.

ResponseUsage(input_tokens, output_tokens, ...)

Usage information for a response, including token counts, in sync with OpenAI response usage api spec at openai/types/response_usage.py

RetrieverOutput(id, doc_indices, doc_scores, ...)

Save the output of a single query in retrievers.

RunItem(id, type, data, error, timestamp)

Base class for streaming execution events in the Runner system.

RunItemStreamEvent(name, item[, type])

Wrapper for streaming RunItem events during Runner execution.

RunnerResult(step_history, answer, error, ctx)

RunnerStreamingResult(_event_queue, ...)

Container for runner streaming results that provides access to the event queue and allows users to consume streaming events.

StepOutput([step, planner_prompt, action, ...])

The output of a single step in the agent.

StepRunItem(id, type, data, error, timestamp)

Event emitted when a complete execution step has finished.

TokenLogProb(token, logprob)

similar to openai.ChatCompletionTokenLogprob

ToolCallActivityRunItem(id, type, data, ...)

Event emitted during the execution of a function/tool call.

ToolCallPermissionRequest(id, type, data, ...)

Event emitted when the Agent is about to execute a function/tool call.

ToolCallRunItem(id, type, data, error, timestamp)

Event emitted when the Agent is about to execute a function/tool call.

ToolOutput([output, observation, display, ...])

Using ToolOutput means a completed tool call even if it has error

ToolOutputRunItem(id, type, data, error, ...)

Event emitted after a function/tool call has been executed.

Usage(prompt_tokens, total_tokens)

In sync with OpenAI embedding api spec, same as openai/types/create_embedding_response.py

UserQuery(query_str[, metadata])

class ModelType(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: Enum

The type of the model, including Embedder, LLM, Reranker. It helps ModelClient identify the model type required to correctly call the model.

EMBEDDER = 1
LLM = 2
LLM_REASONING = 3
RERANKER = 4
IMAGE_GENERATION = 5
UNDEFINED = 6
class ModelClientType[source]

Bases: object

A quick way to access all model clients in the ModelClient module.

From this:

from adalflow.components.model_client import CohereAPIClient, TransformersClient, AnthropicAPIClient, GroqAPIClient, OpenAIClient

model_client = OpenAIClient()

To this:

from adalflow.core.types import ModelClientType

model_client = ModelClientType.OPENAI
get_model_args(model_type: ModelType) List[str][source]

Get the required keys in model_kwargs for a specific model type.

note: If your model inference sdk uses different keys, you need to convert them to the standard keys here in their specifc ModelClient.

Parameters:

model_type (ModelType) – The model type

Returns:

The required keys in model_kwargs

Return type:

List[str]

class Embedding(embedding: List[float], index: int | None)[source]

Bases: object

Container for a single embedding.

In sync with api spec, same as openai/types/embedding.py

embedding: List[float]
index: int | None
class Usage(prompt_tokens: int, total_tokens: int)[source]

Bases: object

In sync with OpenAI embedding api spec, same as openai/types/create_embedding_response.py

prompt_tokens: int
total_tokens: int
class EmbedderOutput(data: ~typing.List[~core.types.Embedding] = <factory>, model: str | None = None, usage: ~core.types.Usage | None = None, error: str | None = None, raw_response: ~typing.Any | None = None, input: ~typing.List[str] | None = None)[source]

Bases: DataClass

Container to hold the response from an Embedder datacomponent for a single batch of input.

Data standard for Embedder model output to interact with other components. Batch processing is often available, thus we need a list of Embedding objects.

data: List[Embedding]
model: str | None = None
usage: Usage | None = None
error: str | None = None
raw_response: Any | None = None
input: List[str] | None = None
property length: int
property embedding_dim: int

The dimension of the embedding, assuming all embeddings have the same dimension.

Returns:

The dimension of the embedding, -1 if no embedding is available

Return type:

int

property is_normalized: bool

Check if the embeddings are normalized to unit vectors.

Returns:

True if the embeddings are normalized, False otherwise

Return type:

bool

EmbedderOutputType

alias of EmbedderOutput

class TokenLogProb(token: str, logprob: float)[source]

Bases: object

similar to openai.ChatCompletionTokenLogprob

token: str
logprob: float
class CompletionUsage(completion_tokens: int | None = None, prompt_tokens: int | None = None, total_tokens: int | None = None)[source]

Bases: object

In sync with OpenAI completion usage api spec at openai/types/completion_usage.py

completion_tokens: int | None = None
prompt_tokens: int | None = None
total_tokens: int | None = None
class InputTokensDetails(cached_tokens: int | None = 0)[source]

Bases: object

Details about input tokens used in a response

cached_tokens: int | None = 0
class OutputTokensDetails(reasoning_tokens: int | None = 0)[source]

Bases: object

Details about output tokens used in a response

reasoning_tokens: int | None = 0
class ResponseUsage(input_tokens: int, output_tokens: int, total_tokens: int, input_tokens_details: ~core.types.InputTokensDetails = <factory>, output_tokens_details: ~core.types.OutputTokensDetails = <factory>)[source]

Bases: object

Usage information for a response, including token counts, in sync with OpenAI response usage api spec at openai/types/response_usage.py

input_tokens: int
output_tokens: int
total_tokens: int
input_tokens_details: InputTokensDetails
output_tokens_details: OutputTokensDetails
class RetrieverOutput(id: str = None, doc_indices: ~typing.List[int] = <function required_field>, doc_scores: ~typing.List[float] = None, query: ~core.types.RetrieverQueryType = None, documents: ~typing.List[~core.types.RetrieverDocumentType] = None)[source]

Bases: DataClass

Save the output of a single query in retrievers.

It is up to the subclass of Retriever to specify the type of query and document.

id: str = None
doc_indices() Callable[[], Any]

A factory function to create a required field in a dataclass. The returned callable raises a TypeError when invoked, indicating a required field was not provided.

Parameters:

name (Optional[str], optional) – The name of the required field. Defaults to None

Returns:

A callable that raises TypeError when called, indicating a missing required field.

Return type:

Callable[[], Any]

Example:

from dataclasses import dataclass
from adalflow.core.base_data_class import required_field, DataClass

@dataclass
class Person(DataClass):
    name: str = field(default=None)
    age: int = field(default_factory=required_field())# allow required field after optional field
doc_scores: List[float] = None
query: RetrieverQueryType = None
documents: List[RetrieverDocumentType] = None
class FunctionDefinition(func_name: str = <function required_field>, func_desc: str | None = None, func_parameters: ~typing.Dict[str, object] = <factory>)[source]

Bases: DataClass

The data modeling of a function definition, including the name, description, and parameters.

func_name() Callable[[], Any]

A factory function to create a required field in a dataclass. The returned callable raises a TypeError when invoked, indicating a required field was not provided.

Parameters:

name (Optional[str], optional) – The name of the required field. Defaults to None

Returns:

A callable that raises TypeError when called, indicating a missing required field.

Return type:

Callable[[], Any]

Example:

from dataclasses import dataclass
from adalflow.core.base_data_class import required_field, DataClass

@dataclass
class Person(DataClass):
    name: str = field(default=None)
    age: int = field(default_factory=required_field())# allow required field after optional field
func_desc: str | None = None
func_parameters: Dict[str, object]
fn_schema_str(type: Literal['json', 'yaml'] = 'json') str[source]

Get the function definition str to be used in the prompt.

You should also directly use to_json() and to_yaml() to get the schema in JSON or YAML format.

class Function(id: str | None = None, thought: str | None = None, name: str = '', args: ~typing.List[object] | None = <factory>, kwargs: ~typing.Dict[str, object] | None = <factory>, _is_answer_final: bool | None = None, _answer: ~typing.Any | None = None)[source]

Bases: DataClass

The data modeling of a function call, including the name and keyword arguments.

You can use the exclude in to_json() and to_yaml() to exclude the thought field if you do not want to use chain-of-thought pattern.

Example:

# assume the function is added in a context_map
# context_map = {"add": add}

def add(a, b):
    return a + b

# call function add with arguments 1 and 2
fun = Function(name="add", kwargs={"a": 1, "b": 2})
# evaluate the function
result = context_map[fun.name](**fun.kwargs)

# or call with positional arguments
fun = Function(name="add", args=[1, 2])
result = context_map[fun.name](*fun.args)
id: str | None = None
thought: str | None = None
name: str = ''
args: List[object] | None
kwargs: Dict[str, object] | None
classmethod from_function(func: Callable[[...], Any] | Callable[[...], Awaitable[Any]], thought: str | None = None, *args, **kwargs) Function[source]

Create a Function object from a function.

Parameters:

fun (Union[Callable[..., Any], AsyncCallable]) – The function to be converted

Returns:

The Function object

Return type:

Function

Usage: 1. Create a Function object from a function call: 2. use to_json() and to_yaml() to get the schema in JSON or YAML format. 3. This will be used as an example in prompt showing LLM how to call the function.

Example:

from adalflow.core.types import Function

def add(a, b):
    return a + b

# create a function call object with positional arguments
fun = Function.from_function(add, thought="Add two numbers", 1, 2)
print(fun)

# output
# Function(thought='Add two numbers', name='add', args=[1, 2])
class QueueSentinel(type: Literal['queue_sentinel'] = 'queue_sentinel')[source]

Bases: object

Special sentinel object to mark the end of a stream when using asyncio.Queue for stream processing.

type: Literal['queue_sentinel'] = 'queue_sentinel'

Type discriminator for the sentinel.

class RawResponsesStreamEvent(input: Any | None = None, data: Any | None = None, type: Literal['raw_response_event'] = 'raw_response_event', error: str | None = None)[source]

Bases: DataClass

Streaming event for storing the raw responses from the LLM. These are ‘raw’ events, i.e. they are directly passed through from the LLM.

input: Any | None = None

The input to the LLM.

data: Any | None = None

The raw responses streaming event from the LLM.

type: Literal['raw_response_event'] = 'raw_response_event'

The type of the event.

error: str | None = None

The error message if any.

class GeneratorOutput(id: str | None = None, input: Any | None = None, data: T_co = None, thinking: str | None = None, tool_use: Function | None = None, images: str | List[str] | None = None, error: str | None = None, usage: CompletionUsage | None = None, raw_response: str | AsyncIterable[T_co] | Iterable[T_co] | None = None, api_response: Any | None = None, metadata: Dict[str, object] | None = None)[source]

Bases: DataClass, Generic[T_co]

The output data class for the Generator component. We can not control its output 100%, so we use this to track the error_message and allow the raw string output to be passed through.

(1) When model predict and output processors are both without error, we have data as the final output, error as None. (2) When either model predict or output processors have error, we have data as None, error as the error message.

Raw_response will depends on the model predict.

id: str | None = None
input: Any | None = None
data: T_co = None
thinking: str | None = None
tool_use: Function | None = None
images: str | List[str] | None = None
error: str | None = None
usage: CompletionUsage | None = None
raw_response: str | AsyncIterable[T_co] | Iterable[T_co] | None = None
api_response: Any | None = None
metadata: Dict[str, object] | None = None
async stream_events() AsyncIterator[T_co][source]

Stream raw events from the Generator’s raw response which has the processed version of api_response. If the raw_response has already been consumed, yield from the data field

Returns:

An async iterator that yields events stored in raw_response

Return type:

AsyncIterator[T_co]

save_images(directory: str = '.', prefix: str = 'generated', format: Literal['png', 'jpg', 'jpeg', 'webp', 'gif', 'bmp'] = 'png', decode_base64: bool = True, return_paths: bool = True) List[str] | None[source]

Save generated images to disk with automatic format conversion.

Parameters:
  • directory – Directory to save images to (default: current directory)

  • prefix – Filename prefix for saved images (default: “generated”)

  • format – Image format to save as (png, jpg, jpeg, webp, gif, bmp)

  • decode_base64 – Whether to decode base64 encoded images (default: True)

  • return_paths – Whether to return the saved file paths (default: True)

Returns:

  • List[str]: Paths to saved images (always returns a list, even for single image)
    • None: If no images to save

Otherwise returns None

Return type:

If return_paths is True

Examples

>>> # Save single image as PNG (returns list with one element)
>>> response.save_images()
['generated_0.png']
>>> # Save multiple images as JPEG with custom prefix
>>> response.save_images(prefix="cat", format="jpg")
['cat_0.jpg', 'cat_1.jpg']
>>> # Save to specific directory
>>> response.save_images(directory="/tmp/images", format="webp")
['/tmp/images/generated_0.webp']
class FunctionExpression(thought: str = None, action: str = <factory>)[source]

Bases: DataClass

The data modeling of a function expression for a call, including the name and arguments.

Example:

def add(a, b):
    return a + b

# call function add with positional arguments 1 and 2
fun_expr = FunctionExpression(action="add(1, 2)")
# evaluate the expression
result = eval(fun_expr.action)
print(result)
# Output: 3

# call function add with keyword arguments
fun_expr = FunctionExpression(action="add(a=1, b=2)")
result = eval(fun_expr.action)
print(result)
# Output: 3

Why asking LLM to generate function expression (code snippet) for a function call? - It is more efficient/compact to call a function. - It is more flexible.

  1. for the full range of Python expressions, including arithmetic operations, nested function calls, and more.

  2. allow to pass variables as arguments.

  • Ease of parsing using ast module.

The benefits are less failed function calls.

thought: str = None
action: str
classmethod from_function(func: Callable[[...], Any] | Callable[[...], Awaitable[Any]], thought: str | None = None, *args, **kwargs) FunctionExpression[source]

Create a FunctionExpression object from a function.

Parameters:

fun (Union[Callable[..., Any], AsyncCallable]) – The function to be converted

Returns:

The FunctionExpression object

Return type:

FunctionExpression

Usage: 1. Create a FunctionExpression object from a function call: 2. use to_json() and to_yaml() to get the schema in JSON or YAML format. 3. This will be used as an example in prompt showing LLM how to call the function.

Example:

from adalflow.core.types import FunctionExpression

def add(a, b):
    return a + b

# create an expression for the function call and using keyword arguments
fun_expr = FunctionExpression.from_function(
    add, thought="Add two numbers", a=1, b=2
)
print(fun_expr)

# output
# FunctionExpression(thought='Add two numbers', action='add(a=1, b=2)')
class FunctionOutput(name: str | None = None, input: Function | FunctionExpression | None = None, parsed_input: Function | None = None, output: Any | Generator[Any, Any, Any] | AsyncGenerator[Any, Any] | Coroutine[Any, Any, Any] | None = None, error: str | None = None)[source]

Bases: DataClass

The output of a tool, which could be a function, a class, or a module.

name: str | None = None
input: Function | FunctionExpression | None = None
parsed_input: Function | None = None
output: Any | Generator[Any, Any, Any] | AsyncGenerator[Any, Any] | Coroutine[Any, Any, Any] | None = None
error: str | None = None
class ToolOutput(output: Any = None, observation: str | None = None, display: str | None = None, is_streaming: bool | None = False, metadata: Dict[str, Any] | None = None, status: Literal['success', 'cancelled', 'error'] = 'success')[source]

Bases: DataClass

Using ToolOutput means a completed tool call even if it has error

output: Any = None
observation: str | None = None
display: str | None = None
is_streaming: bool | None = False
metadata: Dict[str, Any] | None = None
status: Literal['success', 'cancelled', 'error'] = 'success'
class StepOutput(step: int = 0, planner_prompt: str | None = None, action: T = None, function: Function | None = None, observation: str | None = None, ctx: Dict[str, Any] | None = None)[source]

Bases: DataClass, Generic[T]

The output of a single step in the agent. Suits for serial planning agent such as React

step: int = 0
planner_prompt: str | None = None
action: T = None
function: Function | None = None
observation: str | None = None
ctx: Dict[str, Any] | None = None
to_prompt_str() str[source]
class Document(text: str, meta_data: ~typing.Dict[str, ~typing.Any] | None = None, vector: ~typing.List[float] = <factory>, id: str | None = <factory>, order: int | None = None, score: float | None = None, parent_doc_id: str | ~uuid.UUID | None = None, estimated_num_tokens: int | None = None)[source]

Bases: DataClass

A text container with optional metadata and vector representation.

It is the data structure to support functions like Retriever, DocumentSplitter, and used with LocalDB.

text: str
meta_data: Dict[str, Any] | None = None
vector: List[float]
id: str | None
order: int | None = None
score: float | None = None
parent_doc_id: str | UUID | None = None
estimated_num_tokens: int | None = None
classmethod from_dict(doc: Dict)[source]

Create a Document object from a dictionary.

Example:

doc = Document.from_dict({
    "id": "123",
    "text": "Hello world",
    "meta_data": {"title": "Greeting"}
})
class UserQuery(query_str: str, metadata: Dict[str, Any] | None = None)[source]

Bases: object

query_str: str
metadata: Dict[str, Any] | None = None
class AssistantResponse(response_str: str, metadata: Dict[str, Any] | None = None)[source]

Bases: object

response_str: str
metadata: Dict[str, Any] | None = None
class DialogTurn(id: str = <factory>, user_id: str | None = None, conversation_id: str | None = None, order: int | None = None, user_query: ~core.types.UserQuery | None = None, assistant_response: ~core.types.AssistantResponse | None = None, user_query_timestamp: ~datetime.datetime | None = <factory>, assistant_response_timestamp: ~datetime.datetime | None = <factory>, metadata: ~typing.Dict[str, ~typing.Any] | None = None, vector: ~typing.List[float] | None = None)[source]

Bases: DataClass

A turn consists of a user query and the assistant response.

The dataformat is designed to fit into a relational database, where each turn is a row. Use session_id to group the turns into a dialog session with the order field and user_query_timestamp and assistant_response_timestamp to order the turns.

Parameters:
  • id (str) – The unique id of the turn.

  • user_id (str, optional) – The unique id of the user.

  • session_id (str, optional) – The unique id of the dialog session.

  • order (int, optional) – The order of the turn in the dialog session, starts from 0.

  • user_query (UserQuery, optional) – The user query in the turn.

  • assistant_response (AssistantResponse, optional) – The assistant response in the turn.

  • user_query_timestamp (datetime, optional) – The timestamp of the user query.

  • assistant_response_timestamp (datetime, optional) – The timestamp of the assistant response.

  • metadata (Dict[str, Any], optional) – Additional metadata.

Examples

  • User: Hi, how are you?

  • Assistant: Doing great!

DialogTurn(id=uuid4(), user_query=UserQuery(“Hi, how are you?”), assistant_response=AssistantResponse(“Doing great!”))

id: str
user_id: str | None = None
conversation_id: str | None = None
order: int | None = None
user_query: UserQuery | None = None
assistant_response: AssistantResponse | None = None
user_query_timestamp: datetime | None
assistant_response_timestamp: datetime | None
metadata: Dict[str, Any] | None = None
vector: List[float] | None = None
set_user_query(user_query: UserQuery, user_query_timestamp: datetime | None = None)[source]
set_assistant_response(assistant_response: AssistantResponse, assistant_response_timestamp: datetime | None = None)[source]
class Conversation(id: str = <factory>, name: str | None = None, user_id: str | None = None, dialog_turns: ~collections.OrderedDict[int, ~core.types.DialogTurn] = <factory>, metadata: ~typing.Dict[str, ~typing.Any] | None = None, created_at: ~datetime.datetime | None = <factory>, dialog_turns_input: dataclasses.InitVar[typing.Union[collections.OrderedDict[int, core.types.DialogTurn], typing.List[core.types.DialogTurn], NoneType]] = None)[source]

Bases: object

A conversation manages the dialog turns in a whole conversation as a session.

This class is mainly used in-memory for the dialog system/app to manage active conversations. You won’t need this class for past conversations which have already been persisted in a database as a form of record or history.

id: str
name: str | None = None
user_id: str | None = None
dialog_turns: OrderedDict[int, DialogTurn]
metadata: Dict[str, Any] | None = None
created_at: datetime | None
dialog_turns_input: dataclasses.InitVar[Union[collections.OrderedDict[int, core.types.DialogTurn], List[core.types.DialogTurn], NoneType]] = None
get_next_order()[source]
append_dialog_turn(dialog_turn: DialogTurn)[source]
get_dialog_turns() OrderedDict[int, DialogTurn][source]
get_chat_history_str() str[source]
delete_dialog_turn(order: int)[source]
update_dialog_turn(order: int, dialog_turn: DialogTurn)[source]
class RunItem(id: str = <factory>, type: str = 'base', data: ~typing.Any | None = None, error: str | None = None, timestamp: ~datetime.datetime = <factory>)[source]

Bases: DataClass

Base class for streaming execution events in the Runner system.

RunItems represent discrete events that occur during the execution of an Agent through the Runner. These items are used for streaming real-time updates about the execution progress, allowing consumers to monitor and react to different phases of agent execution.

id

Unique identifier for tracking this specific event instance

Type:

str

type

String identifier for the event type (used for event filtering/routing)

Type:

str

data

Optional generic data payload (deprecated, prefer specific fields in subclasses)

Type:

Any | None

timestamp

When this event was created (for debugging and monitoring)

Type:

datetime.datetime

Usage:

This is an abstract base class. Use specific subclasses for different event types.

Example

`python # Don't instantiate directly - use subclasses tool_call_event = ToolCallRunItem(function=my_function) `

id: str
type: str = 'base'
data: Any | None = None
error: str | None = None
timestamp: datetime
class ToolCallRunItem(id: str = <factory>, type: str = 'tool_call_start', data: ~core.types.Function | None = None, error: str | None = None, timestamp: ~datetime.datetime = <factory>)[source]

Bases: RunItem

Event emitted when the Agent is about to execute a function/tool call.

This event is generated after the planner LLM has decided on a function to call but before the function is actually executed. It allows consumers to monitor what tools are being invoked and potentially intervene or log the calls.

data

The Function object containing the tool call details (name, args, kwargs)

Type:

core.types.Function | None

Event Flow Position:
  1. Planner generates Function → ToolCallRunItem → Function execution → ToolOutputRunItem

Usage:

```python # Listen for tool calls in streaming async for event in runner.astream(prompt_kwargs).stream_events():

if isinstance(event, RunItemStreamEvent) and event.name == “tool_called”:

tool_call_item = event.item print(f”About to call: {tool_call_item.data.name}”)

```

type: str = 'tool_call_start'
data: Function | None = None
class ToolCallActivityRunItem(id: str = <factory>, type: str = 'tool_call_activity', data: ~typing.Any | None = None, error: str | None = None, timestamp: ~datetime.datetime = <factory>)[source]

Bases: RunItem

Event emitted during the execution of a function/tool call.

This event provides intermediate updates on the progress of a function/tool call, such as when it starts, completes, or fails. It’s paired with ToolCallRunItem to provide before/after notification to the caller.

data

Any data containing the progress of the tool call

Type:

Any | None

Event Flow Position:

ToolCallRunItem → ToolCallActivityRunItem → ToolOutputRunItem

Usage:

```python # Monitor function execution progress async for event in runner.astream(prompt_kwargs).stream_events():

if isinstance(event, RunItemStreamEvent) and event.name == “tool_call_activity”:

activity_item = event.item print(f”Function progress: {activity_item.data}”)

```

type: str = 'tool_call_activity'
data: Any | None = None
class FunctionRequest(id: str = <factory>, tool_name: str = None, tool: ~core.types.Function | None = None, confirmation_details: ~typing.Any | None = None)[source]

Bases: DataClass

Event emitted when the Agent is about to execute a function/tool call.

id: str
tool_name: str = None
tool: Function | None = None
confirmation_details: Any | None = None
class ToolCallPermissionRequest(id: str = <factory>, type: str = 'tool_call_permission_request', data: ~core.types.FunctionRequest | None = None, error: str | None = None, timestamp: ~datetime.datetime = <factory>)[source]

Bases: RunItem

Event emitted when the Agent is about to execute a function/tool call.

This event is generated after the planner LLM has decided on a function to call but before the function is actually executed. It allows consumers to monitor what tools are being invoked and potentially intervene or log the calls.

data

The Function object containing the tool call details (name, args, kwargs)

Type:

core.types.FunctionRequest | None

Event Flow Position:

1. Planner generates Function → ToolCallPermissionRequest → ToolCallPermissionResponse → ToolCallRunItem or when user rejects the tool call, ToolCallPermissionRequest → ToolCallPermissionResponse → ToolOutputRunItem

Usage:

```python # Listen for tool calls in streaming async for event in runner.astream(prompt_kwargs).stream_events():

if isinstance(event, RunItemStreamEvent) and event.name == “tool_call_permission_request”:

tool_call_item = event.item print(f”About to call: {tool_call_item.data.name}”)

```

type: str = 'tool_call_permission_request'
data: FunctionRequest | None = None
class ToolOutputRunItem(id: str = <factory>, type: str = 'tool_output', data: ~core.types.FunctionOutput | None = None, error: str | None = None, timestamp: ~datetime.datetime = <factory>)[source]

Bases: RunItem

Event emitted after a function/tool call has been executed.

This event contains the complete execution result, including any outputs or errors from the function call. It’s paired with ToolCallRunItem to provide before/after notification to the caller.

data

Complete FunctionOutput containing execution results, errors, etc.

Type:

core.types.FunctionOutput | None

Event Flow Position:

ToolCallRunItem → Function execution → ToolOutputRunItem → StepRunItem

Usage:

```python # Monitor function execution results async for event in runner.astream(prompt_kwargs).stream_events():

if isinstance(event, RunItemStreamEvent) and event.name == “tool_output”:

output_item = event.item if output_item.data.error:

print(f”Function failed: {output_item.data.error}”)

else:

print(f”Function result: {output_item.data.output}”)

```

type: str = 'tool_output'
data: FunctionOutput | None = None
class StepRunItem(id: str = <factory>, type: str = 'step', data: ~core.types.StepOutput | None = None, error: str | None = None, timestamp: ~datetime.datetime = <factory>)[source]

Bases: RunItem

Event emitted when a complete execution step has finished.

A “step” represents one complete cycle of: planning → tool selection → tool execution. This event marks the completion of that cycle and contains the full step information including the action taken and the observation (result).

data

Complete StepOutput containing step number, action, and observation

Type:

core.types.StepOutput | None

Event Flow Position:

ToolOutputRunItem → StepRunItem → (next step or completion)

Usage:

```python # Track step completion async for event in runner.astream(prompt_kwargs).stream_events():

if isinstance(event, RunItemStreamEvent) and event.name == “step_completed”:

step_item = event.item print(f”Completed step {step_item.data.step}”)

```

type: str = 'step'
data: StepOutput | None = None
class RunnerResult(step_history: List[core.types.StepOutput] = <factory>, answer: Optional[str] = None, error: Optional[str] = None, ctx: Optional[Dict] = None)[source]

Bases: object

step_history: List[StepOutput]
answer: str | None = None
error: str | None = None
ctx: Dict | None = None
class FinalOutputItem(id: str = <factory>, type: str = 'final_output', data: ~core.types.RunnerResult | None = None, error: str | None = None, timestamp: ~datetime.datetime = <factory>)[source]

Bases: RunItem

Event emitted when the entire Runner execution has completed.

This event signals the end of the execution sequence and contains the final processed result. It’s emitted regardless of whether execution completed successfully or with an error.

data

The final RunnerResponse containing the complete execution result

Type:

core.types.RunnerResult | None

Event Flow Position:

Final step → FinalOutputItem (execution complete)

Usage:

```python # Get final results async for event in runner.astream(prompt_kwargs).stream_events():

if isinstance(event, RunItemStreamEvent) and event.name == “runner_finished”:

final_item = event.item if final_item.data.error:

print(f”Execution failed: {final_item.data.error}”)

else:

print(f”Final answer: {final_item.data.answer}”)

```

type: str = 'final_output'
data: RunnerResult | None = None
class RunItemStreamEvent(name: Literal['agent.tool_call_start', 'agent.tool_call_activity', 'agent.tool_call_complete', 'agent.step_complete', 'agent.final_output', 'agent.execution_complete', 'agent.tool_permission_request'], item: RunItem, type: Literal['run_item_stream_event'] = 'run_item_stream_event')[source]

Bases: DataClass

Wrapper for streaming RunItem events during Runner execution.

This class wraps RunItem instances with event metadata to create a streaming event system. Each event has a name that indicates what type of execution event occurred, and contains the associated RunItem with the event data.

The streaming system allows consumers to react to different phases of agent execution in real-time, such as when tools are called, when steps complete, or when execution finishes.

name

The specific event type that occurred (see event name literals)

Type:

Literal[‘agent.tool_call_start’, ‘agent.tool_call_activity’, ‘agent.tool_call_complete’, ‘agent.step_complete’, ‘agent.final_output’, ‘agent.execution_complete’, ‘agent.tool_permission_request’]

item

The RunItem containing the event-specific data

Type:

core.types.RunItem

type

Always “run_item_stream_event” for type discrimination

Type:

Literal[‘run_item_stream_event’]

Event Types:
  • “agent.final_output”: Final output from the agent (FinalOutputItem)

  • “agent.tool_permission_request”: Tool permission request before execution

  • “agent.tool_call_start”: Agent is about to execute a tool (ToolCallRunItem)

  • “agent.tool_call_activity”: Agent is about to execute a tool (ToolCallActivityRunItem)

  • “agent.tool_call_complete”: Tool execution completed (ToolOutputRunItem)

  • “agent.step_complete”: Full execution step finished (StepRunItem)

  • “agent.execution_complete”: Entire execution completed (FinalOutputItem)

name: Literal['agent.tool_call_start', 'agent.tool_call_activity', 'agent.tool_call_complete', 'agent.step_complete', 'agent.final_output', 'agent.execution_complete', 'agent.tool_permission_request']

The name identifying the specific type of execution event that occurred.

item: RunItem

The RunItem instance containing the event-specific data and context.

type: Literal['run_item_stream_event'] = 'run_item_stream_event'

Type discriminator for the streaming event system.

class QueueCompleteSentinel[source]

Bases: object

Sentinel to indicate queue completion.

class EventEncoder(*, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None)[source]

Bases: JSONEncoder

default(obj)[source]

Implement this method in a subclass such that it returns a serializable object for o, or calls the base implementation (to raise a TypeError).

For example, to support arbitrary iterators, you could implement default like this:

def default(self, o):
    try:
        iterable = iter(o)
    except TypeError:
        pass
    else:
        return list(iterable)
    # Let the base class default method raise the TypeError
    return super().default(o)
class RunnerStreamingResult(_event_queue: ~asyncio.queues.Queue = <factory>, _run_task: ~_asyncio.Task | None = None, _exception: Exception | None = None, answer: ~typing.Any | None = None, step_history: ~typing.List[~typing.Any] = <factory>, _is_complete: bool = False)[source]

Bases: object

Container for runner streaming results that provides access to the event queue and allows users to consume streaming events.

answer: Any | None = None
step_history: List[Any]
property is_complete: bool

Check if the workflow execution is complete.

set_exception(exc: Any) None[source]

Set an exception, ensuring it’s a proper exception object.

put_nowait(item: RawResponsesStreamEvent | RunItemStreamEvent)[source]
async stream_events() AsyncIterator[RawResponsesStreamEvent | RunItemStreamEvent][source]

Stream events from the runner execution.w

Returns:

An async iterator that yields stream events

Return type:

AsyncIterator[StreamEvent]

Example

```python result = runner.astream(prompt_kwargs) async for event in result.stream_events():

if isinstance(event, RawResponsesStreamEvent):

print(f”Raw event: {event.data}”)

elif isinstance(event, RunItemStreamEvent):

print(f”Run item: {event.name} - {event.item}”)

```

async stream_to_json(file_name: str = 'agent_events_stream.json') AsyncIterator[RawResponsesStreamEvent | RunItemStreamEvent][source]

Stream events to a JSON file in real-time while also yielding them.

This method writes events to a JSON file as they arrive, giving a live streaming effect. The JSON file is updated incrementally.

Parameters:

file_name – The output file name for saving events

Yields:

StreamEvent – Each event as it arrives

Example

```python result = runner.astream(prompt_kwargs) async for event in result.stream_to_json(“live_events.json”):

# Process event while it’s also being written to file print(f”Event: {event}”)

```

stream_to_json_sync(file_name: str = 'agent_events_stream.json')[source]

Synchronous wrapper for stream_to_json that returns an iterator.

This allows users to use the streaming JSON functionality in a sync context.

Parameters:

file_name – The output file name for saving events

Returns:

Iterator of events

Example

```python result = runner.astream(prompt_kwargs) for event in result.stream_to_json_sync(“live_events.json”):

print(f”Event: {event}”)

```

cancel()[source]

Cancel the running task.

async wait_for_completion()[source]

Wait for the runner task to complete.