agent¶
Submodules¶
- agent
- prompts
- react
- runner
Runner
Runner.agent
Runner.max_steps
Runner.answer_data_type
Runner.step_history
Runner.ctx
Runner.set_permission_manager()
Runner.is_cancelled()
Runner.reset_cancellation()
Runner.get_token_consumption()
Runner.register_cancel_callback()
Runner.cancel()
Runner.create_response_span()
Runner.call()
Runner.acall()
Runner.astream()
Runner.impl_astream()
Runner.stream_tool_execution()
- class ReActAgent(tools: ~typing.List[~typing.Callable | ~typing.Callable[[...], ~typing.Awaitable[~typing.Any]] | ~adalflow.core.func_tool.FunctionTool] = [], max_steps: int = 10, add_llm_as_fallback: bool = True, examples: ~typing.List[~adalflow.core.types.Function] | ~typing.List[str] | None = None, *, model_client: ~adalflow.core.model_client.ModelClient, model_kwargs: ~typing.Dict = {}, template: str | None = None, role_desc: str | None = 'You are an excellent task planner.', context_variables: ~typing.Dict | None = None, is_thinking_model: bool = False, use_cache: bool = True, debug: bool = False, answer_data_type: ~typing.Type[~components.agent.react.T] = <class 'str'>)[source]¶
Bases:
Component
ReActAgent uses generator as a planner that runs multiple and sequential functional call steps to generate the final response. The planner will generate a Function data class as action for each step that includes a “thought” field. The execution result is stored in the “observation” field of the StepOutput data class. If the execution failed, it will store the error message in the “observation” field so that we can auto-optimize it to correct the error.
The final answer can be different in training and eval mode: - Training: the final answer will be Users need to set up: - tools: a list of tools to use to complete the task. Each tool is a function or a function tool. - max_steps: the maximum number of steps the agent can take to complete the task. - add_llm_as_fallback: a boolean to decide whether to use an additional LLM model as a fallback tool to answer the query. - model_client: the model client to use to generate the response. - model_kwargs: the model kwargs to use to generate the response. - template: the template to use to generate the prompt. Default is DEFAULT_REACT_AGENT_SYSTEM_PROMPT. - context_variables: the context variables to use in the prompt. - use_cache: a boolean to decide whether to use the cache to store the generated responses for the planner. - debug: a boolean to decide whether to print debug information.
For the generator, the default arguments are: (1) default prompt: DEFAULT_REACT_AGENT_SYSTEM_PROMPT (2) default output_processors: JsonParser
There are examples which is optional, a list of string examples in the prompt.
Example:
from core.openai_client import OpenAIClient from components.agent.react import ReActAgent from core.func_tool import FunctionTool # define the tools def multiply(a: int, b: int) -> int: '''Multiply two numbers.''' return a * b def add(a: int, b: int) -> int: '''Add two numbers.''' return a + b agent = ReActAgent( tools=[multiply, add], model_client=OpenAIClient(), model_kwargs={"model": "gpt-3.5-turbo"}, ) # Using examples: call_multiply = FunctionExpression.from_function( thought="I want to multiply 3 and 4.",
Reference: [1] https://arxiv.org/abs/2210.03629, published in Mar, 2023.
- call(*args, **kwargs) ReActOutput [source]¶
User must override this for the inference scenario if bicall is not defined.
- class Agent(name: str, tools: ~typing.List[~typing.Any] | None = None, context_variables: ~typing.Dict | None = None, add_llm_as_fallback: bool | None = False, model_client: ~adalflow.core.model_client.ModelClient | None = None, model_kwargs: ~typing.Dict[str, ~typing.Any] | None = {}, model_type: ~adalflow.core.types.ModelType | None = ModelType.LLM, template: str | None = None, role_desc: str | ~adalflow.core.prompt_builder.Prompt | None = None, cache_path: str | None = None, use_cache: bool | None = True, answer_data_type: ~typing.Type[~components.agent.agent.T] | None = <class 'str'>, max_steps: int | None = 10, is_thinking_model: bool | None = False, tool_manager: ~adalflow.core.tool_manager.ToolManager | None = None, planner: ~adalflow.core.generator.Generator | None = None, **kwargs)[source]¶
Bases:
Component
A high-level agentic component that orchestrates AI planning and tool execution.
The Agent combines a Generator-based planner for task decomposition with a ToolManager for function calling. It uses a ReAct (Reasoning and Acting) architecture to iteratively plan steps and execute tools to solve complex tasks.
The Agent comes with default prompt templates for agentic reasoning, automatic tool definition integration, and step history tracking. It includes built-in helper tools: - llm_tool: Fallback tool using LLM world knowledge for simple queries
- Architecture:
Agent contains two main components: 1. Planner (Generator): Plans and reasons about next actions using an LLM 2. ToolManager: Manages and executes available tools/functions
- name¶
Unique identifier for the agent instance
- Type:
str
- tool_manager¶
Manages available tools and their execution
- Type:
- answer_data_type¶
Expected type for the final answer output
- Type:
Type
- max_steps¶
Maximum number of planning steps allowed
- Type:
int
- is_thinking_model¶
Whether the underlying model supports chain-of-thought
- Type:
bool
- class Runner(agent: Agent, ctx: Dict | None = None, max_steps: int | None = None, permission_manager: PermissionManager | None = None, conversation_memory: ConversationMemory | None = None, **kwargs)[source]¶
Bases:
Component
Executes Agent instances with multi-step iterative planning and tool execution.
The Runner orchestrates the execution of an Agent through multiple reasoning and action cycles. It manages the step-by-step execution loop where the Agent’s planner generates Function calls that get executed by the ToolManager, with results fed back into the planning context for the next iteration.
- Execution Flow:
Initialize step history and prompt context
For each step (up to max_steps): a. Call Agent’s planner to get next Function b. Execute the Function using ToolManager c. Add step result to history d. Check if Function is “finish” to terminate
Process final answer to expected output type
The Runner supports both synchronous and asynchronous execution modes, as well as streaming execution with real-time event emission. It includes comprehensive tracing and error handling throughout the execution pipeline.
- max_steps¶
Maximum number of execution steps allowed
- Type:
int
- answer_data_type¶
Expected type for final answer processing
- Type:
Type
- step_history¶
History of all execution steps
- Type:
List[StepOutput]
- ctx¶
Additional context passed to tools
- Type:
Optional[Dict]
- set_permission_manager(permission_manager: PermissionManager | None) None [source]¶
Set or update the permission manager after runner initialization.
- Parameters:
permission_manager – The permission manager instance to use for tool approval
- get_token_consumption() Dict[str, Any] [source]¶
Get the current token consumption statistics.
- Returns:
total_prompt_tokens: Total tokens consumed across all steps
current_step_tokens: Tokens from the most recent step
steps_token_history: List of token counts per step
- Return type:
Dict containing token consumption data
- register_cancel_callback(callback) None [source]¶
Register a callback to be called when execution is cancelled.
- async cancel() None [source]¶
Cancel the current execution.
This will stop the current execution but preserve state like memory.
- create_response_span(runner_result, step_count: int, streaming_result: RunnerStreamingResult, runner_span_instance, workflow_status: str = 'stream_completed')[source]¶
- call(prompt_kwargs: Dict[str, Any], model_kwargs: Dict[str, Any] | None = None, use_cache: bool | None = None, id: str | None = None) RunnerResult [source]¶
Execute the planner synchronously for multiple steps with function calling support.
At the last step the action should be set to “finish” instead which terminates the sequence
- Parameters:
prompt_kwargs – Dictionary of prompt arguments for the generator
model_kwargs – Optional model parameters to override defaults
use_cache – Whether to use cached results if available
id – Optional unique identifier for the request
- Returns:
RunnerResult containing step history and final processed output
- async acall(prompt_kwargs: Dict[str, Any], model_kwargs: Dict[str, Any] | None = None, use_cache: bool | None = None, id: str | None = None) RunnerResult | None [source]¶
Execute the planner asynchronously for multiple steps with function calling support.
At the last step the action should be set to “finish” instead which terminates the sequence
- Parameters:
prompt_kwargs – Dictionary of prompt arguments for the generator
model_kwargs – Optional model parameters to override defaults
use_cache – Whether to use cached results if available
id – Optional unique identifier for the request
- Returns:
RunnerResponse containing step history and final processed output
- astream(prompt_kwargs: Dict[str, Any], model_kwargs: Dict[str, Any] | None = None, use_cache: bool | None = None, id: str | None = None) RunnerStreamingResult [source]¶
Execute the runner asynchronously with streaming support.
- Returns:
A streaming result object with stream_events() method
- Return type:
- async impl_astream(prompt_kwargs: Dict[str, Any], model_kwargs: Dict[str, Any] | None = None, use_cache: bool | None = None, id: str | None = None, streaming_result: RunnerStreamingResult | None = None) None [source]¶
Behave exactly the same as acall but with streaming support.
GeneratorOutput will be emitted as RawResponsesStreamEvent
StepOutput will be emitted as RunItemStreamEvent with name “agent.step_complete”.
Finally, there will be a FinalOutputItem with the final answer or error.
Execute the planner asynchronously for multiple steps with function calling support.
At the last step the action should be set to “finish” instead which terminates the sequence
- Parameters:
prompt_kwargs – Dictionary of prompt arguments for the generator
model_kwargs – Optional model parameters to override defaults
use_cache – Whether to use cached results if available
id – Optional unique identifier for the request
- async stream_tool_execution(function: Function, tool_call_id: str, tool_call_name: str, streaming_result: RunnerStreamingResult) tuple[Any, Any, Any] [source]¶
Execute a tool/function call with streaming support and proper event handling.
This method handles: - Tool span creation for tracing - Async generator support for streaming results - Tool activity events - Tool completion events - Error handling and observation extraction
- Parameters:
function – The Function object to execute
tool_call_id – Unique identifier for this tool call
tool_call_name – Name of the tool being called
streaming_result – Queue for streaming events
- Returns:
(function_output, function_output_observation)
- Return type:
tuple