agent

Submodules


class ReActAgent(tools: ~typing.List[~typing.Callable | ~typing.Callable[[...], ~typing.Awaitable[~typing.Any]] | ~adalflow.core.func_tool.FunctionTool] = [], max_steps: int = 10, add_llm_as_fallback: bool = True, examples: ~typing.List[~adalflow.core.types.Function] | ~typing.List[str] | None = None, *, model_client: ~adalflow.core.model_client.ModelClient, model_kwargs: ~typing.Dict = {}, template: str | None = None, role_desc: str | None = 'You are an excellent task planner.', context_variables: ~typing.Dict | None = None, is_thinking_model: bool = False, use_cache: bool = True, debug: bool = False, answer_data_type: ~typing.Type[~components.agent.react.T] = <class 'str'>)[source]

Bases: Component

ReActAgent uses generator as a planner that runs multiple and sequential functional call steps to generate the final response. The planner will generate a Function data class as action for each step that includes a “thought” field. The execution result is stored in the “observation” field of the StepOutput data class. If the execution failed, it will store the error message in the “observation” field so that we can auto-optimize it to correct the error.

The final answer can be different in training and eval mode: - Training: the final answer will be Users need to set up: - tools: a list of tools to use to complete the task. Each tool is a function or a function tool. - max_steps: the maximum number of steps the agent can take to complete the task. - add_llm_as_fallback: a boolean to decide whether to use an additional LLM model as a fallback tool to answer the query. - model_client: the model client to use to generate the response. - model_kwargs: the model kwargs to use to generate the response. - template: the template to use to generate the prompt. Default is DEFAULT_REACT_AGENT_SYSTEM_PROMPT. - context_variables: the context variables to use in the prompt. - use_cache: a boolean to decide whether to use the cache to store the generated responses for the planner. - debug: a boolean to decide whether to print debug information.

For the generator, the default arguments are: (1) default prompt: DEFAULT_REACT_AGENT_SYSTEM_PROMPT (2) default output_processors: JsonParser

There are examples which is optional, a list of string examples in the prompt.

Example:

from core.openai_client import OpenAIClient
from components.agent.react import ReActAgent
from core.func_tool import FunctionTool
# define the tools
def multiply(a: int, b: int) -> int:
    '''Multiply two numbers.'''
    return a * b
def add(a: int, b: int) -> int:
    '''Add two numbers.'''
    return a + b
agent = ReActAgent(
    tools=[multiply, add],
    model_client=OpenAIClient(),
    model_kwargs={"model": "gpt-3.5-turbo"},
)

# Using examples:

call_multiply = FunctionExpression.from_function(
    thought="I want to multiply 3 and 4.",

Reference: [1] https://arxiv.org/abs/2210.03629, published in Mar, 2023.

call(*args, **kwargs) ReActOutput[source]

User must override this for the inference scenario if bicall is not defined.

forward(*args, **kwargs) Parameter[source]

User must override this for the training scenario if bicall is not defined.

bicall(input: str, promt_kwargs: Dict | None = {}, model_kwargs: Dict | None = {}, id: str | None = None) Parameter | ReActOutput[source]

prompt_kwargs: additional prompt kwargs to either replace or add to the preset prompt kwargs.

class Agent(name: str, tools: ~typing.List[~typing.Any] | None = None, context_variables: ~typing.Dict | None = None, add_llm_as_fallback: bool | None = False, model_client: ~adalflow.core.model_client.ModelClient | None = None, model_kwargs: ~typing.Dict[str, ~typing.Any] | None = {}, model_type: ~adalflow.core.types.ModelType | None = ModelType.LLM, template: str | None = None, role_desc: str | ~adalflow.core.prompt_builder.Prompt | None = None, cache_path: str | None = None, use_cache: bool | None = True, answer_data_type: ~typing.Type[~components.agent.agent.T] | None = <class 'str'>, max_steps: int | None = 10, is_thinking_model: bool | None = False, tool_manager: ~adalflow.core.tool_manager.ToolManager | None = None, planner: ~adalflow.core.generator.Generator | None = None, **kwargs)[source]

Bases: Component

A high-level agentic component that orchestrates AI planning and tool execution.

The Agent combines a Generator-based planner for task decomposition with a ToolManager for function calling. It uses a ReAct (Reasoning and Acting) architecture to iteratively plan steps and execute tools to solve complex tasks.

The Agent comes with default prompt templates for agentic reasoning, automatic tool definition integration, and step history tracking. It includes built-in helper tools: - llm_tool: Fallback tool using LLM world knowledge for simple queries

Architecture:

Agent contains two main components: 1. Planner (Generator): Plans and reasons about next actions using an LLM 2. ToolManager: Manages and executes available tools/functions

name

Unique identifier for the agent instance

Type:

str

tool_manager

Manages available tools and their execution

Type:

ToolManager

planner

LLM-based planner for task decomposition and reasoning

Type:

Generator

answer_data_type

Expected type for the final answer output

Type:

Type

max_steps

Maximum number of planning steps allowed

Type:

int

is_thinking_model

Whether the underlying model supports chain-of-thought

Type:

bool

flip_thinking_model()[source]

Toggle the thinking model state.

is_training() bool[source]
get_prompt(**kwargs) str[source]

Get formatted prompt using generator’s prompt template.

Parameters:

**kwargs – Additional arguments to pass to the generator’s get_prompt

Returns:

Formatted prompt string

class Runner(agent: Agent, ctx: Dict | None = None, max_steps: int | None = None, permission_manager: PermissionManager | None = None, conversation_memory: ConversationMemory | None = None, **kwargs)[source]

Bases: Component

Executes Agent instances with multi-step iterative planning and tool execution.

The Runner orchestrates the execution of an Agent through multiple reasoning and action cycles. It manages the step-by-step execution loop where the Agent’s planner generates Function calls that get executed by the ToolManager, with results fed back into the planning context for the next iteration.

Execution Flow:
  1. Initialize step history and prompt context

  2. For each step (up to max_steps): a. Call Agent’s planner to get next Function b. Execute the Function using ToolManager c. Add step result to history d. Check if Function is “finish” to terminate

  3. Process final answer to expected output type

The Runner supports both synchronous and asynchronous execution modes, as well as streaming execution with real-time event emission. It includes comprehensive tracing and error handling throughout the execution pipeline.

agent

The Agent instance to execute

Type:

Agent

max_steps

Maximum number of execution steps allowed

Type:

int

answer_data_type

Expected type for final answer processing

Type:

Type

step_history

History of all execution steps

Type:

List[StepOutput]

ctx

Additional context passed to tools

Type:

Optional[Dict]

set_permission_manager(permission_manager: PermissionManager | None) None[source]

Set or update the permission manager after runner initialization.

Parameters:

permission_manager – The permission manager instance to use for tool approval

is_cancelled() bool[source]

Check if execution has been cancelled.

reset_cancellation() None[source]

Reset the cancellation flag for a new execution.

get_token_consumption() Dict[str, Any][source]

Get the current token consumption statistics.

Returns:

  • total_prompt_tokens: Total tokens consumed across all steps

  • current_step_tokens: Tokens from the most recent step

  • steps_token_history: List of token counts per step

Return type:

Dict containing token consumption data

register_cancel_callback(callback) None[source]

Register a callback to be called when execution is cancelled.

async cancel() None[source]

Cancel the current execution.

This will stop the current execution but preserve state like memory.

create_response_span(runner_result, step_count: int, streaming_result: RunnerStreamingResult, runner_span_instance, workflow_status: str = 'stream_completed')[source]
call(prompt_kwargs: Dict[str, Any], model_kwargs: Dict[str, Any] | None = None, use_cache: bool | None = None, id: str | None = None) RunnerResult[source]

Execute the planner synchronously for multiple steps with function calling support.

At the last step the action should be set to “finish” instead which terminates the sequence

Parameters:
  • prompt_kwargs – Dictionary of prompt arguments for the generator

  • model_kwargs – Optional model parameters to override defaults

  • use_cache – Whether to use cached results if available

  • id – Optional unique identifier for the request

Returns:

RunnerResult containing step history and final processed output

async acall(prompt_kwargs: Dict[str, Any], model_kwargs: Dict[str, Any] | None = None, use_cache: bool | None = None, id: str | None = None) RunnerResult | None[source]

Execute the planner asynchronously for multiple steps with function calling support.

At the last step the action should be set to “finish” instead which terminates the sequence

Parameters:
  • prompt_kwargs – Dictionary of prompt arguments for the generator

  • model_kwargs – Optional model parameters to override defaults

  • use_cache – Whether to use cached results if available

  • id – Optional unique identifier for the request

Returns:

RunnerResponse containing step history and final processed output

astream(prompt_kwargs: Dict[str, Any], model_kwargs: Dict[str, Any] | None = None, use_cache: bool | None = None, id: str | None = None) RunnerStreamingResult[source]

Execute the runner asynchronously with streaming support.

Returns:

A streaming result object with stream_events() method

Return type:

RunnerStreamingResult

async impl_astream(prompt_kwargs: Dict[str, Any], model_kwargs: Dict[str, Any] | None = None, use_cache: bool | None = None, id: str | None = None, streaming_result: RunnerStreamingResult | None = None) None[source]

Behave exactly the same as acall but with streaming support.

  • GeneratorOutput will be emitted as RawResponsesStreamEvent

  • StepOutput will be emitted as RunItemStreamEvent with name “agent.step_complete”.

  • Finally, there will be a FinalOutputItem with the final answer or error.

Execute the planner asynchronously for multiple steps with function calling support.

At the last step the action should be set to “finish” instead which terminates the sequence

Parameters:
  • prompt_kwargs – Dictionary of prompt arguments for the generator

  • model_kwargs – Optional model parameters to override defaults

  • use_cache – Whether to use cached results if available

  • id – Optional unique identifier for the request

async stream_tool_execution(function: Function, tool_call_id: str, tool_call_name: str, streaming_result: RunnerStreamingResult) tuple[Any, Any, Any][source]

Execute a tool/function call with streaming support and proper event handling.

This method handles: - Tool span creation for tracing - Async generator support for streaming results - Tool activity events - Tool completion events - Error handling and observation extraction

Parameters:
  • function – The Function object to execute

  • tool_call_id – Unique identifier for this tool call

  • tool_call_name – Name of the tool being called

  • streaming_result – Queue for streaming events

Returns:

(function_output, function_output_observation)

Return type:

tuple