Source code for core.runner



from pydantic import BaseModel
import logging
import json
import inspect
import asyncio
from dataclasses import dataclass

from typing import (
    Any,
    Dict,
    Generator as GeneratorType,
    List,
    Optional,
    Tuple,
    Type,
    TypeVar,
    Union,
)
from typing_extensions import TypeAlias
from pydantic import BaseModel

# Type aliases for better type hints
BuiltInType: TypeAlias = Union[str, int, float, bool, list, dict, tuple, set, None]
PydanticDataClass: TypeAlias = Type[BaseModel]
AdalflowDataClass: TypeAlias = Type[
    Any
]  # Replace with your actual Adalflow dataclass type if available

from adalflow.optim.parameter import Parameter
from adalflow.core.types import Function
from adalflow.utils import printc
from adalflow.core.component import Component
from adalflow.core.agent import Agent

from adalflow.core.types import GeneratorOutput, FunctionOutput, StepOutput, Function
import logging
from adalflow.core.base_data_class import DataClass
import ast


__all__ = ["Runner"]

log = logging.getLogger(__name__)

T = TypeVar("T", bound=BaseModel)  # Changed to use Pydantic BaseModel


def _is_pydantic_dataclass(cls: Any) -> bool:
    # check whether cls is a pydantic dataclass
    return isinstance(cls, type) and issubclass(cls, BaseModel)


def _is_adalflow_dataclass(cls: Any) -> bool:
    # check whether cls is a adalflow dataclass
    return isinstance(cls, type) and issubclass(cls, DataClass)

@dataclass 
class RunResultStreaming:
    step_history: List[StepOutput]
    output: T
    _run_impl_task: asyncio.Task 
    _event_queue: asyncio.Queue 

[docs] class Runner(Component): """A runner class that executes an Agent instance with multi-step execution. It internally maintains a planner LLM and an executor and adds a LLM call to the executor as a tool for the planner. The output to the planner agent call is expected to be a Function object. The planner iterates through at most max_steps unless the planner sets the action to "finish" then the planner returns the final response. If the user optionally specifies the output_type then the Runner parses the Function object to the output_type. Attributes: planner (Agent): The agent instance to execute config (RunnerConfig): Configuration for the runner max_steps (int): Maximum number of steps to execute """ def __init__( self, agent: Agent, **kwargs, ) -> None: """Initialize runner with an agent and configuration. Args: agent: The agent instance to execute stream_parser: Optional stream parser output_type: Optional Pydantic data class type max_steps: Maximum number of steps to execute """ super().__init__(**kwargs) self.agent = agent # get agent requirements self.max_steps = agent.max_steps self.answer_data_type = agent.answer_data_type self.step_history = [] # add the llm call to the executor as a tool def _check_last_step(self, step: Function) -> bool: """Check if the last step is the finish step. Args: step_history: List of previous steps Returns: bool: True if the last step is a finish step """ # Check if it's a finish step if step.name == "finish": return True return False def _process_data( self, data: Union[BuiltInType, PydanticDataClass, AdalflowDataClass], id: Optional[str] = None, ) -> T: """Process the generator output data field and convert to the specified pydantic data class of output_type. Args: data: The data to process id: Optional identifier for the output Returns: str: The processed data as a string """ if not self.answer_data_type: print(data) log.info(f"answer_data_type: {self.answer_data_type}, data: {data}") # by default when the answer data type is not provided return the data directly return data try: model_output = None log.info(f"answer_data_type: {type(self.answer_data_type)}") if _is_pydantic_dataclass(self.answer_data_type): # data should be a string that represents a dictionary log.info( f"initial answer returned by finish when user passed a pydantic type: {data}, type: {type(data)}" ) data = str(data) dict_obj = ast.literal_eval(data) log.info( f"initial answer after being evaluated using ast: {dict_obj}, type: {type(dict_obj)}" ) model_output = self.answer_data_type(**dict_obj) elif _is_adalflow_dataclass(self.answer_data_type): # data should be a string that represents a dictionary log.info( f"initial answer returned by finish when user passed a adalflow dataclass type: {data}, type: {type(data)}" ) data = str(data) dict_obj = ast.literal_eval(data) log.info( f"initial answer after being evaluated using ast: {dict_obj}, type: {type(dict_obj)}" ) model_output = self.answer_data_type.from_dict(dict_obj) else: # expect data to be a python built_in_type log.info( f"type of answer is neither a pydantic dataclass or adalflow dataclass, answer before being casted again for safety: {data}, type: {type(data)}" ) try: # if the data is a python built_in_type then we can return it directly # as the prompt passed to the LLM requires this if not isinstance(data, self.answer_data_type): raise ValueError( f"Expected data of type {self.answer_data_type}, but got {type(data)}" ) model_output = data except Exception as e: log.error( f"Failed to parse output: {data}, {e} for answer_data_type: {self.answer_data_type}" ) model_output = None raise ValueError(f"Error processing output: {str(e)}") # model_ouput is not pydantic or adalflow dataclass or a built in python type if not model_output: raise ValueError(f"Failed to parse output: {data}") return model_output except Exception as e: log.error(f"Error processing output: {str(e)}") raise ValueError(f"Error processing output: {str(e)}") @classmethod def _get_planner_function(self, output: GeneratorOutput) -> Function: """Check the planner output and return the function. Args: output: The planner output """ if not isinstance(output, GeneratorOutput): raise ValueError( f"Expected GeneratorOutput, but got {type(output)}, value: {output}" ) function = output.data if not isinstance(function, Function): raise ValueError( f"Expected Function in the data field of the GeneratorOutput, but got {type(function)}, value: {function}" ) return function
[docs] def call( self, prompt_kwargs: Dict[str, Any], model_kwargs: Optional[ Dict[str, Any] ] = None, # if some call use a different config use_cache: Optional[bool] = None, id: Optional[str] = None, ) -> Tuple[List[StepOutput], T]: """Execute the planner synchronously for multiple steps with function calling support. At the last step the action should be set to "finish" instead which terminates the sequence Args: prompt_kwargs: Dictionary of prompt arguments for the generator model_kwargs: Optional model parameters to override defaults use_cache: Whether to use cached results if available id: Optional unique identifier for the request Returns: Tuple containing: - List of step history (StepOutput objects) - Final processed output of type specified in self.answer_data_type """ # reset the step history self.step_history = [] # take in the query in prompt_kwargs prompt_kwargs = prompt_kwargs.copy() if prompt_kwargs else {} prompt_kwargs["step_history"] = ( self.step_history ) # a reference to the step history model_kwargs = model_kwargs.copy() if model_kwargs else {} step_count = 0 last_output = None # set maximum number of steps for the planner into the prompt # prompt_kwargs["max_steps"] = self.max_steps while step_count < self.max_steps: try: printc(f'agent planner prompt: {self.agent.planner.get_prompt(**prompt_kwargs)}') # Execute one step output = self.agent.planner.call( prompt_kwargs=prompt_kwargs, model_kwargs=model_kwargs, use_cache=use_cache, id=id, ) function = self._get_planner_function(output) # execute the tool function_results = self._tool_execute_sync(function) # create a step output step_ouput: StepOutput = StepOutput( step=step_count, action=function, function=function, observation=function_results.output, ) self.step_history.append(step_ouput) if self._check_last_step(function): last_output = self._process_data(function_results.output) break log.debug( "The prompt with the prompt template is {}".format( self.agent.planner.get_prompt(**prompt_kwargs) ) ) step_count += 1 except Exception as e: error_msg = f"Error in step {step_count}: {str(e)}" log.error(error_msg) raise ValueError(error_msg) return self.step_history, last_output
def _tool_execute_sync( self, func: Function, ) -> Union[FunctionOutput, Parameter]: """ Call this in the call method. Handles both sync and async functions by running async ones in event loop. """ result = self.agent.tool_manager(expr_or_fun=func, step="execute") # Handle cases where result is not wrapped in FunctionOutput (e.g., in tests) if not isinstance(result, FunctionOutput): # If it's a direct result from mocks or other sources, wrap it in FunctionOutput if hasattr(result, 'output'): # Already has output attribute, use it directly wrapped_result = FunctionOutput( name=func.name, input=func, output=result.output ) else: # Treat the entire result as the output wrapped_result = FunctionOutput( name=func.name, input=func, output=result ) return wrapped_result return result
[docs] async def acall( self, prompt_kwargs: Dict[str, Any], model_kwargs: Optional[Dict[str, Any]] = None, use_cache: Optional[bool] = None, id: Optional[str] = None, ) -> Tuple[List[GeneratorOutput], T]: """Execute the planner asynchronously for multiple steps with function calling support. At the last step the action should be set to "finish" instead which terminates the sequence Args: prompt_kwargs: Dictionary of prompt arguments for the generator model_kwargs: Optional model parameters to override defaults use_cache: Whether to use cached results if available id: Optional unique identifier for the request Returns: Tuple containing: - List of step history (GeneratorOutput objects) - Final processed output """ self.step_history = [] prompt_kwargs = prompt_kwargs.copy() if prompt_kwargs else {} prompt_kwargs["step_history"] = ( self.step_history ) # a reference to the step history model_kwargs = model_kwargs.copy() if model_kwargs else {} step_count = 0 last_output = None while step_count < self.max_steps: try: printc(f'agent planner prompt: {self.agent.planner.get_prompt(**prompt_kwargs)}') # Execute one step asynchronously output = await self.agent.planner.acall( prompt_kwargs=prompt_kwargs, model_kwargs=model_kwargs, use_cache=use_cache, id=id, ) function = self._get_planner_function(output) function_results = await self._tool_execute_async(function) # if inspect.iscoroutine(result): # function_results = await result # else: # function_results = None # async for item in result: # function_results = item # function_results = await self._tool_execute_async(function) step_output: StepOutput = StepOutput( step=step_count, action=function, function=function, observation=function_results.output, ) self.step_history.append(step_output) if self._check_last_step(function): last_output = self._process_data(function_results.output) break # important to ensure the prompt at each step is correct log.debug( "The prompt with the prompt template is {}".format( self.agent.planner.get_prompt(**prompt_kwargs) ) ) step_count += 1 except Exception as e: error_msg = f"Error in step {step_count}: {str(e)}" log.error(error_msg) return self.step_history, error_msg return self.step_history, last_output
[docs] def astream(self, prompt_kwargs: Dict[str, Any], model_kwargs: Optional[Dict[str, Any]] = None, use_cache: Optional[bool] = None, id: Optional[str] = None): import asyncio self._event_queue = asyncio.Queue() self._run_impl_task = asyncio.create_task(self.impl_astream(prompt_kwargs, model_kwargs, use_cache, id)) return self._event_queue
[docs] async def impl_astream( self, prompt_kwargs: Dict[str, Any], model_kwargs: Optional[Dict[str, Any]] = None, use_cache: Optional[bool] = None, id: Optional[str] = None, ) -> Tuple[List[GeneratorOutput], T]: """Execute the planner asynchronously for multiple steps with function calling support. At the last step the action should be set to "finish" instead which terminates the sequence Args: prompt_kwargs: Dictionary of prompt arguments for the generator model_kwargs: Optional model parameters to override defaults use_cache: Whether to use cached results if available id: Optional unique identifier for the request Returns: Tuple containing: - List of step history (GeneratorOutput objects) - Final processed output """ self.step_history = [] prompt_kwargs = prompt_kwargs.copy() if prompt_kwargs else {} prompt_kwargs["step_history"] = ( self.step_history ) # a reference to the step history model_kwargs = model_kwargs.copy() if model_kwargs else {} step_count = 0 last_output = None while step_count < self.max_steps: try: # important to ensure the prompt at each step is correct log.debug( "The prompt with the prompt template is {}".format( self.agent.planner.get_prompt(**prompt_kwargs) ) ) printc(f'agent planner prompt: {self.agent.planner.get_prompt(**prompt_kwargs)}') # Execute one step asynchronously output = await self.agent.planner.acall( prompt_kwargs=prompt_kwargs, model_kwargs=model_kwargs, use_cache=use_cache, id=id, ) function = self._get_planner_function(output) printc(f'function: {function}', color="yellow") function_result = await self._tool_execute_async(function) # everything must be wrapped in FunctionOutput if not isinstance(function_result, FunctionOutput): raise ValueError(f"Result must be wrapped in FunctionOutput, got {type(function_result)}") function_output = function_result.output # TODO: function needs a stream_events real_function_output = None if inspect.iscoroutine(function_output): real_function_output = await function_output elif inspect.isasyncgen(function_output): # handle async generator printc(f"async generator detected") function_results = [] async for item in function_output: self._event_queue.put_nowait(item) function_results.append(item) real_function_output = function_results[-1] else: real_function_output = function_output self._event_queue.put_nowait(function_output) # function_results = [] # async for item in function_output: # function_results.append(item) # yield item # real_function_output = function_results[-1] # function_results = await self._tool_execute_async(function) step_output: StepOutput = StepOutput( step=step_count, action=function, function=function, observation=real_function_output, ) self.step_history.append(step_output) if self._check_last_step(function): last_output = self._process_data(real_function_output) break step_count += 1 except Exception as e: error_msg = f"Error in step {step_count}: {str(e)}" log.error(error_msg) return self.step_history, error_msg return self.step_history, last_output
async def _tool_execute_async( self, func: Function, ) -> Union[FunctionOutput, Parameter]: """ Call this in the acall method. Handles both sync and async functions. Note: this version has no support for streaming. """ result = await self.agent.tool_manager.execute_func_async(func=func) if not isinstance(result, FunctionOutput): raise ValueError("Result is not a FunctionOutput") return result