Source code for components.agent.react

"""Implementation and optimization of React agent."""

from typing import List, Union, Callable, Optional, Any, Dict
from dataclasses import dataclass, field
from adalflow.core.base_data_class import DataClass
import logging
import traceback


from adalflow.core.generator import Generator
from adalflow.optim.grad_component import GradComponent
from adalflow.optim.parameter import Parameter, ParameterType
from adalflow.core.func_tool import FunctionTool, AsyncCallable
from adalflow.core.tool_manager import ToolManager
from adalflow.core.component import Component
from adalflow.components.output_parsers import JsonOutputParser
from adalflow.core.types import (
    StepOutput,
    GeneratorOutput,
    Function,
    FunctionOutput,
)
from adalflow.optim.grad_component import fun_to_grad_component
from adalflow.core.model_client import ModelClient
from adalflow.utils.logger import printc


log = logging.getLogger(__name__)

__all__ = ["DEFAULT_REACT_AGENT_SYSTEM_PROMPT", "ReActAgent"]


react_agent_task_desc = r"""
<START_OF_TASK_SPEC>
You are an excellent task planner.
Answer the input query using the tools provided below with maximum accuracy.

Each step you will read the previous thought, Action(name, kwargs), and Observation(execution result of the action) and then provide the next Thought and Action.

Follow function docstring to best call the tool.
- For simple queries: Directly call the ``finish`` action and provide the answer.
- For complex queries:
    - Step 1: Read the user query and divide it into multisteps. Start with the first tool/subquery.
    - Call one tool at a time to solve each subquery/subquestion. \
    - At step 'finish', give the final answer based on all previous steps.
REMEMBER:
- Action MUST call one of the tools. It CANNOT be empty.
- You will ALWAYS END WITH 'finish' tool to finish the task directly with answer or failure message.
- When the tool is a class method and when class_instance exists, use <class_instance_value>.<func_name> to call instead (NOT the CLASS NAME)
<END_OF_TASK_SPEC>
"""

# - In this case, you are working as a multi-hop retriever and your answer in finish MUST be verbatim short factoid responses from retrieved context.
# - Answer with only the exact answer phrase, not a full sentence or paragraph.

DEFAULT_REACT_AGENT_SYSTEM_PROMPT = r"""<START_OF_SYSTEM_PROMPT>
{{react_agent_task_desc}}
- You cant use more than {{max_steps}} steps. At the {{max_steps}}th current step, must finish with answer.

{# Tools #}
{% if tools %}
<START_OF_TOOLS>
Tools and instructions:
{% for tool in tools %}
{{ loop.index }}.
{{tool}}
------------------------
{% endfor %}
<END_OF_TOOLS>
{% endif %}
{# Context Variables #}
{% if context_variables is not none %}
<START_OF_CONTEXT>
You have access to context_variables with the following keys:
{% for key, value in context_variables.items() %}
{{ key }}
------------------------
{% endfor %}
You can either pass context_variables or context_variables['key'] to the tools depending on the tool's requirements.
<END_OF_CONTEXT>
{% endif %}
{# output format and examples for output format #}
<START_OF_OUTPUT_FORMAT>
{{output_format_str}}
<END_OF_OUTPUT_FORMAT>
{% if examples %}
<START_OF_EXAMPLES>
Examples:
{% for example in examples %}
{{example}}
------------------------
{% endfor %}
<END_OF_EXAMPLES>
{% endif %}
<END_OF_SYSTEM_PROMPT>
-----------------
<START_OF_USER_QUERY>
Input query:
{{ input_str }}
_____________________
Current Step/Max Step: {{step_history|length + 1}} / {{max_steps}}
{# Step History #}
{% if step_history %}
<STEPS>
Your previous steps:
{% for history in step_history %}
Step {{ loop.index }}.
{% if history.action %}
"thought": "{{history.action.thought}}",
"name": "{{history.action.name}},
"kwargs": {{history.action.kwargs}}",
{% endif %}
"Observation": "{{history.observation}}"
------------------------
{% endfor %}
</STEPS>
{% endif %}
<END_OF_USER_QUERY>
"""


class CombineStepHistory(GradComponent):
    def __init__(self):
        super().__init__(desc="Extract the final answer from the step history.")

    def call(
        self,
        step_history: List[StepOutput],
        react_agent_task_desc: str,  # skip connection
        id: Optional[str] = None,
    ) -> str:
        if not step_history:
            return ""
        answer = step_history[-1].observation
        return answer


@dataclass
class ReActOutput(DataClass):
    r"""Similar to GeneratorOutput, but with additional step history and final answer."""

    id: Optional[str] = field(
        default=None, metadata={"desc": "The unique id of the output"}
    )
    step_history: List[StepOutput] = field(
        metadata={"desc": "The history of steps."}, default_factory=list
    )

    answer: Any = field(metadata={"desc": "The final answer."}, default=None)


[docs] class ReActAgent(Component): __doc__ = r"""ReActAgent uses generator as a planner that runs multiple and sequential functional call steps to generate the final response. The planner will generate a Function data class as action for each step that includes a "thought" field. The execution result is stored in the "observation" field of the StepOutput data class. If the execution failed, it will store the error message in the "observation" field so that we can auto-optimize it to correct the error. The final answer can be different in training and eval mode: - Training: the final answer will be Users need to set up: - tools: a list of tools to use to complete the task. Each tool is a function or a function tool. - max_steps: the maximum number of steps the agent can take to complete the task. - use_llm_as_fallback: a boolean to decide whether to use an additional LLM model as a fallback tool to answer the query. - model_client: the model client to use to generate the response. - model_kwargs: the model kwargs to use to generate the response. - template: the template to use to generate the prompt. Default is DEFAULT_REACT_AGENT_SYSTEM_PROMPT. - context_variables: the context variables to use in the prompt. - use_cache: a boolean to decide whether to use the cache to store the generated responses for the planner. - debug: a boolean to decide whether to print debug information. For the generator, the default arguments are: (1) default prompt: DEFAULT_REACT_AGENT_SYSTEM_PROMPT (2) default output_processors: JsonParser There are `examples` which is optional, a list of string examples in the prompt. Example: .. code-block:: python from core.openai_client import OpenAIClient from components.agent.react import ReActAgent from core.func_tool import FunctionTool # define the tools def multiply(a: int, b: int) -> int: '''Multiply two numbers.''' return a * b def add(a: int, b: int) -> int: '''Add two numbers.''' return a + b agent = ReActAgent( tools=[multiply, add], model_client=OpenAIClient(), model_kwargs={"model": "gpt-3.5-turbo"}, ) # Using examples: call_multiply = FunctionExpression.from_function( thought="I want to multiply 3 and 4.", Reference: [1] https://arxiv.org/abs/2210.03629, published in Mar, 2023. """ # TODO: allow users to pass in a few examples. Need to be a list of FunctionExpression instances. def __init__( self, # added arguments specifc to React tools: List[Union[Callable, AsyncCallable, FunctionTool]] = [], max_steps: int = 10, add_llm_as_fallback: bool = True, # TODO: the examples are just for specifying the output format, not end to end input-output examples, need further optimization examples: Union[List[Function], List[str]] = [], *, # the following arguments are mainly for the planner model_client: ModelClient, model_kwargs: Dict = {}, # template for the planner template: Optional[str] = None, # allow users to customize the template context_variables: Optional[Dict] = None, # context variables use_cache: bool = True, debug: bool = False, ): super().__init__() template = template or DEFAULT_REACT_AGENT_SYSTEM_PROMPT self.max_steps = max_steps self.add_llm_as_fallback = add_llm_as_fallback self.context_variables = context_variables self.debug = debug self.use_cache = use_cache processed_tools = self._init_tools(tools, model_client, model_kwargs) self.tool_manager: ToolManager = ToolManager( tools=processed_tools, additional_context={"context_variables": self.context_variables}, ) ouput_data_class = Function example = Function( thought="Based on all the subtasks, I am able to answer the question. Following the finish doc string, and ....", name="finish", kwargs={"answer": "final answer"}, ) self._examples = examples + [example] output_parser = JsonOutputParser( data_class=ouput_data_class, examples=self._examples, return_data_class=True, include_fields=[ "thought", "name", "kwargs", ], ) prompt_kwargs = { "tools": self.tool_manager.yaml_definitions, "output_format_str": output_parser.format_instructions(), "react_agent_task_desc": Parameter( name="react_agent_task_desc", data=react_agent_task_desc, # data="You are an excellent task planner. Answer the input query using the tools provided below with maximum accuracy.\n\nEach step you will read the previous thought, Action(name, kwargs), and Observation(execution result of the action) and then provide the next Thought and Action.\n\n<START_OF_TASK_SPEC>\nFollow function docstring to best call the tool.\n- For simple queries: Directly call the 'finish' action and answer with a concise 'yes' or 'no' when it fits.\n- For complex queries:\n - Step 1: Understand the main subject(s) and context of the user query accurately.\n - Step 2: Break down the query into multisteps, starting with the first tool/subquery.\n - Ensure each step accurately reflects the subjects under consideration.\n - Continuously verify your extracted information and logic for factual accuracy using concise comparisons.\n - At step 'finish', conclude with a precise final answer.\nREMEMBER:\n- Action MUST call one of the tools. It CANNOT be empty.\n- You will ALWAYS END WITH 'finish' tool to conclude the task directly with an answer or failure message.\n- When the tool is a class method and when class_instance exists, use <class_instance_value>.<func_name> to call instead (NOT the CLASS NAME).\n<END_OF_TASK_SPEC>", role_desc="Task instruction for the agent to plan steps to solve a question in sequential and multi-steps to get the final answer. \ For optimizer: you need to adapt this to the current specific task.", param_type=ParameterType.PROMPT, requires_opt=True, ), # "examples": Parameter( # name="examples", # data=None, # role_desc="Examples for the ReAct agent.", # param_type=ParameterType.DEMOS, # requires_opt=True, # ), "context_variables": self.context_variables, "max_steps": self.max_steps, } self.planner = Generator( template=template, prompt_kwargs=prompt_kwargs, output_processors=output_parser, model_client=model_client, model_kwargs=model_kwargs, use_cache=use_cache, ) # besides of form the final output, it adds a skip connection to the planner task description prompt self.combine_step_history = CombineStepHistory() def _init_tools( self, tools: List[Union[Callable, AsyncCallable, FunctionTool]], model_client: ModelClient, model_kwargs: Dict, ): r"""Initialize the tools. Using reference or else with (copy or deepcopy) we can not set the training/eval mode for each tool.""" processed_tools = [] _additional_llm_tool = ( Generator(model_client=model_client, model_kwargs=model_kwargs) if self.add_llm_as_fallback else None ) def llm_tool(input: str, **kwargs) -> str: """I answer any input query with llm's world knowledge. Use me as a fallback tool or when the query is simple.""" try: output: GeneratorOutput = _additional_llm_tool( prompt_kwargs={"input_str": input} ) response = output.data if output else None return response except Exception as e: log.error(f"Error using the llm_tool: {e}") print(f"Error using the llm_tool: {e}") return None # always add **kwargs for us to track the id, __doc__ as the predecessors. from adalflow.optim.grad_component import fun_to_grad_component @fun_to_grad_component( desc="Finish", doc_string=Parameter( data="Finish the task with the final answer in the kwargs.", param_type=ParameterType.PROMPT, requires_opt=True, role_desc="Instruct the agent on how to create the final answer from the step history.", name="doc_string", ), ) def finish(answer: str, **kwargs) -> str: return answer self._finish = FunctionTool(fn=finish, component=finish) processed_tools = tools.copy() if self.add_llm_as_fallback: processed_tools.append(llm_tool) processed_tools.append(self._finish) return processed_tools def _execute_action( self, step_output: StepOutput, response: Union[Parameter, GeneratorOutput], id: Optional[str] = None, ) -> Optional[StepOutput]: """Parse the action string to a function call and execute it. Update the step_output with the result.""" def handle_error(response: Parameter, e: str): @fun_to_grad_component() def set_step_output_with_error( step_output: StepOutput, error: str, response: Any ): """Set the step_output with error.""" step_output.observation = f"error: {error} at {response.data}" return step_output response.add_successor_map_fn( successor=set_step_output_with_error, map_fn=lambda x: x.data ) return set_step_output_with_error.forward(step_output, e, response) step = step_output.step if isinstance(response, Parameter): try: step_output.action = response.data.data if self.debug: printc( f"Step test train: {step}: {step_output.action}", color="blue" ) if isinstance(response.data.data, Function): response.data.data.kwargs.update({"id": id}) result: Union[Parameter, str] = self.tool_manager( expr_or_fun=response, step="execute", map_fn=lambda x: x.data.data, # Function ) if isinstance(result, str): @fun_to_grad_component() def set_step_output_with_error(step_output: StepOutput, data: str): """Set the step_output with error.""" step_output.observation = f"Error {data} in executing action." return step_output response.add_successor_map_fn( successor=set_step_output_with_error, map_fn=lambda x: x.data.data, ) step_output = set_step_output_with_error.forward( step_output, response ) return step_output except Exception as e: e = f"{e} Error executing action: {response.data}" return handle_error(response, e) try: step_output.step = step step_output.observation = result.data.output # update the execution result to the step_output to be consistent with the eval version result.data = step_output result.role_desc = "The result of the action execution, observation is the final answer" result.param_type = ParameterType.OUTPUT return result except Exception as e: e = f"{e} Error converting function output to step output: {result.data}" return handle_error(response, e) else: return self._execute_action_eval_mode( x=response, step_output=step_output, step=step, id=id, ) def _execute_action_eval_mode( self, x: GeneratorOutput, step_output: StepOutput, step: int, id=None, ) -> StepOutput: """Execute the action and update the step_output.""" if x.error or not x.data: error_msg = f"Error planning step {step}: {x.error}" step_output.observation = error_msg step_output.action = None log.error(error_msg) return step_output else: try: fun_expr: Function = x.data step_output.action = fun_expr # # add id to the function fun_expr.kwargs.update({"id": id}) if step_output and step_output.action: result: FunctionOutput = self.tool_manager( expr_or_fun=x.data, # Function step="execute", ) step_output.observation = result.output if self.debug: printc(f"Step {step}: \n{step_output}\n_______\n", color="blue") return step_output else: if self.debug: printc(f"Failed to parse response for step {step}", color="red") log.error(f"Failed to parse response for step {step}") return step_output except Exception as e: error_msg = f"Error parsing response for step {step}: {e}" step_output.observation = error_msg log.error(error_msg) if self.debug: printc(error_msg, color="red") return step_output def _run_one_step( self, step: int, prompt_kwargs: Dict, model_kwargs: Dict, id: Optional[str] = None, step_history: List[StepOutput] = [], ) -> Union[Parameter, StepOutput]: """Run one step of the agent. Plan and execute the action for the step. Need to deal with both train and eval mode on the self.planner. """ if self.debug: printc(f"step: {step}", color="yellow") step_history_value = [] for step_output in step_history: if isinstance(step_output, Parameter): step_history_value.append(step_output.data) else: step_history_value.append(step_output) prompt_kwargs["step_history"] = step_history_value for data in step_history_value: if not data: raise ValueError( f"Expected StepOutput, but got {type(data)}, all steps: {step_history_value}" ) if not isinstance(data, StepOutput): raise ValueError( f"Expected StepOutput, but got {type(data)}, all steps: {step_history_value}" ) log.debug( f"Running step {step} with prompt: {self.planner.prompt(**prompt_kwargs)}" ) try: response: Union[GeneratorOutput, Parameter] = self.planner( prompt_kwargs=prompt_kwargs, model_kwargs=model_kwargs, id=id ) # prompt_str = self.planner.get_prompt(**prompt_kwargs) # printc(f"Prompt: {prompt_str}", color="yellow") except Exception as e: error_msg = f"Error happened in planner response at step {step}: {e}.\n" error_msg += ( f"Prompt kwargs: {prompt_kwargs}\nModel kwargs: {model_kwargs}\n" ) error_msg += f"Traceback:\n{traceback.format_exc()}" raise RuntimeError(error_msg) step_output: StepOutput = StepOutput(step=step) try: if self.training and isinstance(response, Parameter): if not isinstance(response.data, GeneratorOutput): raise ValueError( f"Expected GeneratorOutput, but got {type(response.data)}, value: {response.data}" ) # Detect planner parsing errors to FunctionExpression so that the prompt can be trained to self-correct if not isinstance(response.data.data, Function): @fun_to_grad_component() def set_step_output_with_error( step_output: StepOutput, data: GeneratorOutput ): """Set the step_output with error.""" step_output.observation = f"Error {data.error} in parsing response: {data.raw_response}, data type: {type(data.data)}" return step_output response.add_successor_map_fn( successor=set_step_output_with_error, map_fn=lambda x: x.data, ) step_output = set_step_output_with_error.forward( step_output, response ) else: step_output: Parameter = self._execute_action( step_output, response, id ) if not isinstance(step_output, Parameter): raise ValueError( f"Expected Parameter, but got {type(step_output)}, value: {step_output}" ) if self.debug: printc(f"step_output: {step_output.data}", color="red") if not isinstance(step_output, Parameter): raise ValueError( f"Ensure step_output to be Parameter at training mode. Got {type(step_output)}.\n\ Please check the observation for error details: {step_output}" ) return step_output else: step_output: StepOutput = self._execute_action( step_output=step_output, response=response, id=id ) if not step_output: raise RuntimeError( f"Error executing action at step {step}: {step_output}" ) if self.debug: printc(f"step_output: {step_output}", color="red") return step_output except Exception as e: error_msg = f"Error during execution at step {step}: {e}.\n" error_msg += f"Step output: {step_output}\nResponse: {response}\n" error_msg += f"Traceback:\n{traceback.format_exc()}" raise RuntimeError(error_msg) def _check_last_step( self, step_history: List[Union[StepOutput, Parameter]] ) -> bool: """Check if the last step is the finish step.""" if not step_history: return True last_step: Union[StepOutput, Parameter] = step_history[-1] if isinstance(last_step, Parameter): last_step = last_step.data if ( last_step and last_step.action and hasattr(last_step.action, "name") and last_step.action.name == "finish" ): return True return False def _get_answer( self, step_history: List[Union[StepOutput, Parameter]] ) -> Union[str, "Parameter"]: """Get the final answer from the step history. When in training mode, we pass the whole step_history to the backward engine to find the feedback """ if not step_history: return None last_step: Union[StepOutput, Parameter] = step_history[-1] if isinstance(last_step, Parameter): answer = self.combine_step_history( step_history=step_history, id=last_step.data_id, react_agent_task_desc=self.planner.prompt_kwargs[ "react_agent_task_desc" ], ) return answer else: return last_step.observation
[docs] def call(self, *args, **kwargs) -> ReActOutput: output = self.bicall(*args, **kwargs) if not isinstance(output, ReActOutput) or not output: raise ValueError(f"Expected ReActOutput, but got {type(output)}") return output
[docs] def forward(self, *args, **kwargs) -> Parameter: return self.bicall(*args, **kwargs)
def _is_step_output_last_step(self, step_output: StepOutput) -> bool: """Check if the step output is the last step.""" step_output_data = ( step_output.data if isinstance(step_output, Parameter) else step_output ) if ( step_output_data and step_output_data.function and step_output_data.function.name == "finish" ): return True return False
[docs] def bicall( self, input: str, promt_kwargs: Optional[Dict] = {}, model_kwargs: Optional[Dict] = {}, id: Optional[str] = None, ) -> Union["Parameter", ReActOutput]: r"""prompt_kwargs: additional prompt kwargs to either replace or add to the preset prompt kwargs.""" # initialize step_history in both training and eval mode # set up the prompts prompt_kwargs = { **promt_kwargs, "input_str": input, } step_history: List[Union[StepOutput, Parameter]] = [] if self.debug: printc(f"input_query: {input}", color="red") for i in range(self.max_steps): step = i + 1 try: step_output = self._run_one_step( step, prompt_kwargs, model_kwargs, id, step_history ) if isinstance(step_output, Parameter): step_output.data_id = id step_history.append(step_output) if self._check_last_step(step_history): break except Exception as e: log.error(f"Error running step {step}: {e}") printc(f"Error running step {step}: {e}", color="red") raise e # the only place to raise the error for debugging. In normal cases, the agent should not raise an error. answer = self._get_answer(step_history) if self.training: return answer # wrap the output output = ReActOutput(step_history=step_history, id=id, answer=answer) if self.debug: printc(f"answer: {output}", color="yellow") return output
def _extra_repr(self) -> str: s = f"max_steps={self.max_steps}, add_llm_as_fallback={self.add_llm_as_fallback}, " return s
if __name__ == "__main__": from adalflow.components.model_client import OpenAIClient from adalflow.utils import setup_env from adalflow.core.func_tool import FunctionTool setup_env() class App(Component): def __init__(self): super().__init__() self.llm_tool = Generator( model_client=OpenAIClient(), model_kwargs={"model": "gpt-3.5-turbo"}, ) def llm_as_tool(input: str, id: Optional[str] = None) -> str: """Used as a calculator tool.""" printc(f"llm_as_tool: {input}", color="yellow") return self.llm_tool(prompt_kwargs={"input_str": input}, id=id) self.react_agent = ReActAgent( tools=[FunctionTool(llm_as_tool, component=self.llm_tool)], max_steps=2, add_llm_as_fallback=False, model_client=OpenAIClient(), model_kwargs={"model": "gpt-3.5-turbo"}, ) def call(self, input: str, id: Optional[str] = None) -> Union[str, "Parameter"]: return self.react_agent(input, id=id) def forward( self, input: str, id: Optional[str] = None ) -> Union[str, "Parameter"]: return self.react_agent(input, id=id) # print(OutputParameter.__mro__) app = App() app.train() output = app("I want to multiply 3 and 4.", id="123") # print(output) printc(output, color="yellow") output.draw_graph()