Source code for core.func_tool

"""
Tool is LLM's extended capability which is one of the core design pattern of Agent. All tools can be wrapped in a FunctionTool class.
This helps to standardize the tool interface and metadata to communicate with the Agent.
"""

from typing import Any, Optional, Callable, Awaitable, Union
from inspect import ismethod
import inspect
import logging
import asyncio
import nest_asyncio
from enum import Enum, auto


from adalflow.core.types import (
    FunctionDefinition,
    FunctionOutput,
    Function,
)
from adalflow.core import Component
from adalflow.optim.parameter import Parameter
from adalflow.optim.grad_component import FunGradComponent
from adalflow.core.functional import (
    get_fun_schema,
)
from adalflow.utils import printc
from inspect import signature

AsyncCallable = Callable[..., Awaitable[Any]]

log = logging.getLogger(__name__)


[docs] def is_running_in_event_loop() -> bool: try: loop = asyncio.get_running_loop() if loop.is_running(): return True else: return False except RuntimeError: return False
[docs] def find_instance_name_from_self(instance): """ Attempt to find the variable name of the instance in the calling context. :param instance: The instance to find the name for. :return: The variable name of the instance, if found; otherwise, None. """ # Inspect the calling stack frame frame = inspect.stack()[2].frame for var_name, var_obj in frame.f_locals.items(): if var_obj is instance: return var_name return None
# Specific function types supported by FunctionTool: # - Regular functions (sync/async) # - Generator functions (sync/async) # - Bound methods (class methods) # - FunGradComponent instances (trainable components)
[docs] class FunctionType(Enum): """Enumeration of the 4 core function types supported by FunctionTool.""" SYNC = auto() # Regular sync function: def func(): return value ASYNC = auto() # Async function: async def func(): return value SYNC_GENERATOR = auto() # Sync generator: def func(): yield value ASYNC_GENERATOR = auto() # Async generator: async def func(): yield value
# TODO: improve the support for async functions, similarly a component might be used as a tool
[docs] class FunctionTool(Component): __doc__ = r"""Describing and Parsing(to LLM) and executing a function. Supports both normal callable functions and class methods. When component is used, we support both the training and eval mode. Note: When the eval mode, it outputs FunctionOutput, and when the training mode, it outputs Parameter with data as FunctionOutput. Args: fn (Callable): The function to be executed. definition (FunctionDefinition, optional): The definition of the function. Defaults to None. Function be used by LLM as a tool to achieve a specific task. What function can you pass as a tool? 1. Any unbound function you wrote outside of a class. 2. Any class method you wrote in your component. It can call `self` and other methods inside of your component. 3. When the function is using a trainable component, and you can directly use the component's method as a tool or wrap it in a function. But you need to make sure to pass the component to the tool. Here are some examples: .. code-block:: python from adalflow.core.func_tool import FunctionTool class AgenticRAG(Component): def __init__(self, ...): super().__init__() self.retriever = Retriever() self.llm = Generator() def retriever_as_tool(input: str) -> str: r"Used as a retriever tool." return self.retriever(input) tools = [FunctionTool(retriever_as_tool, component=self.retriever), FunctionTool(self.llm.__call__, component=self.llm)] # if you have trainable component, this will ensure it can be trained together with your whole task pipeline # if you dont want to train them and simply treating them as a tool, you can call like this # tools = [FunctionTool(retriever_as_tool), FunctionTool(self.llm.__call__, component=self.llm)] Features: - Supports both synchronous and asynchronous functions via `call` and `acall`. - Creates a `FunctionDefinition` from the function using `get_fun_schema`. - Executes the function with arguments. - Parses the function call expression (`FunctionExpression`) into `Function` (name, args, kwargs). - Executes the function using one of the following methods: - Via `call` with args and kwargs. - Via `eval`, without any context or sandboxing. - Via sandboxed execution directly using `sandbox_exec`. A FunctionTool allows other GradComponent(as a tool) to pass through correctly. """ # key attributes: fn: Callable definition: FunctionDefinition function_type: FunctionType # not used in particular, but only for training control class_instance: Optional[object] = None # it inherits the training attribute from Component def __init__( self, fn: Union[Callable, FunGradComponent], definition: Optional[FunctionDefinition] = None, ): super().__init__( name="FunctionTool", desc="A component calls and executes a function." ) nest_asyncio.apply() assert fn is not None, "fn must be provided" # TODO: support FunGradComponent later. self.fn = fn self.function_type = self.detect_function_type(fn) self._is_async = self.function_type in [FunctionType.ASYNC, FunctionType.ASYNC_GENERATOR] self.class_instance = self._autodetect_class_instance(fn) if isinstance(fn, FunGradComponent): print(f"FunctionTool: {fn} is a component") self.definition = ( definition or self._create_fn_definition_for_grad_component(fn) ) else: self.definition = definition or self._create_fn_definition() if self._is_async: log.info(f"FunctionTool: {fn} is async: {self._is_async}")
[docs] @classmethod def detect_function_type(cls, fn: Callable) -> FunctionType: """ Detect the function type of a given callable. Args: fn: The callable to analyze Returns: FunctionType: The detected function type Raises: ValueError: If the function type cannot be determined or is not supported """ if fn is None: raise ValueError("Function cannot be None") # Check for async generator functions if inspect.isasyncgenfunction(fn): return FunctionType.ASYNC_GENERATOR # Check for sync generator functions if inspect.isgeneratorfunction(fn): return FunctionType.SYNC_GENERATOR # Check for async functions (coroutines) if inspect.iscoroutinefunction(fn): return FunctionType.ASYNC # Check for regular functions if inspect.isfunction(fn) or inspect.ismethod(fn): return FunctionType.SYNC # Check for callable objects (like classes with __call__) if callable(fn): # For callable objects, we need to check their __call__ method if hasattr(fn, '__call__'): call_method = fn.__call__ if inspect.ismethod(call_method): # It's a bound method, check the underlying function if inspect.isasyncgenfunction(call_method.__func__): return FunctionType.ASYNC_GENERATOR elif inspect.isgeneratorfunction(call_method.__func__): return FunctionType.SYNC_GENERATOR elif inspect.iscoroutinefunction(call_method.__func__): return FunctionType.ASYNC else: return FunctionType.SYNC else: # It's a function, check directly if inspect.isasyncgenfunction(call_method): return FunctionType.ASYNC_GENERATOR elif inspect.isgeneratorfunction(call_method): return FunctionType.SYNC_GENERATOR elif inspect.iscoroutinefunction(call_method): return FunctionType.ASYNC else: return FunctionType.SYNC raise ValueError(f"Cannot determine function type for {fn}")
def _create_fn_definition_for_grad_component( self, fn: FunGradComponent ) -> FunctionDefinition: name = fn.fun_name docstring = fn.doc_string signature_str = str(signature(fn.fun)) cls_name = None if ismethod(fn.fun): cls_name = fn.fun.__self__.__class__.__name__ name = cls_name + "_" + name if cls_name else name return FunctionDefinition( func_name=name, func_desc=( f"{name}{signature_str}\nDocstring:{docstring}" if isinstance(docstring, str) else f"{name}{signature_str}\nDocstring:{docstring.data}" ), func_parameters=get_fun_schema(name, fn.fun), ) def _autodetect_class_instance(self, fn: Callable) -> Optional[Any]: if ismethod(fn): return fn.__self__ return None def _create_fn_definition(self) -> FunctionDefinition: name = self.fn.__name__ docstring = self.fn.__doc__ signature_str = str(signature(self.fn)) # Get the class that owns the method, if applicable cls_name = None if ismethod(self.fn): # Check if it's a bound method cls_name = self.fn.__self__.__class__.__name__ # Build the description description = f"{name}{signature_str}\n" if cls_name: description += f"Belongs to class: {cls_name}\n" if docstring: description += f"Docstring: {docstring}\n" # Get function parameters schema fn_parameters = get_fun_schema(name, self.fn) name = cls_name + "_" + name if cls_name else name # create a unique identifier as the class method name return FunctionDefinition( func_name=name, func_desc=description, func_parameters=fn_parameters, )
[docs] def forward(self, *args, **kwargs) -> Parameter: r"""Forward the function tool.""" return self.bicall(*args, **kwargs)
def _call_sync(self, fn: Callable, *args: Any, **kwargs: Any) -> Any: """Call a sync function.""" if self.function_type == FunctionType.SYNC: return fn(*args, **kwargs) elif self.function_type == FunctionType.ASYNC: if is_running_in_event_loop(): loop = asyncio.get_running_loop() task = loop.create_task(fn(*args, **kwargs)) return asyncio.run_coroutine_threadsafe(task, loop).result() else: return asyncio.run(fn(*args, **kwargs)) elif self.function_type == FunctionType.SYNC_GENERATOR: return fn(*args, **kwargs) elif self.function_type == FunctionType.ASYNC_GENERATOR: if is_running_in_event_loop(): loop = asyncio.get_running_loop() async def collect_generator(): result = [] async for item in fn(*args, **kwargs): result.append(item) return result task = loop.create_task(collect_generator()) output = asyncio.run_coroutine_threadsafe(task, loop).result() return output else: # No event loop running, safe to create new one async def collect_generator(): result = [] async for item in fn(*args, **kwargs): result.append(item) return result output = asyncio.run(collect_generator()) return output else: raise ValueError(f"Unsupported function type: {self.function_type}")
[docs] def call(self, *args: Any, **kwargs: Any) -> FunctionOutput: """ Execute the function synchronously, supporting all function types. This method provides a unified sync interface for all function types: - SYNC: Calls the function directly - ASYNC: Runs the coroutine in a new event loop (blocks until complete) - SYNC_GENERATOR: Returns the generator object - ASYNC_GENERATOR: Runs the async generator and collects all values into a list Warning: For async functions, this will block the current thread until completion. For better performance with async functions, consider using acall() instead. Example: import asyncio async def async_func(): await asyncio.sleep(1) return "async result" def sync_func(): return "sync result" tool1 = FunctionTool(async_func) tool2 = FunctionTool(sync_func) # Both work synchronously result1 = tool1.call() # Blocks for 1 second result2 = tool2.call() # Returns immediately """ output, error = None, None try: output = self._call_sync(self.fn, *args, **kwargs) except Exception as e: log.error(f"Error at calling {self.fn}: {e}") error = f"Error at calling {self.fn}: {e}" # Handle Parameter output (training mode) if isinstance(output, Parameter): if not self.training: raise ValueError( f"FunctionTool {self.definition.func_name} is in eval mode, but the output is Parameter" ) output.data = FunctionOutput( name=self.definition.func_name, input=Function( name=self.definition.func_name, args=args, kwargs=kwargs ), output=output.data, error=error, ) return output # Create FunctionOutput function_output = FunctionOutput( name=self.definition.func_name, input=Function(name=self.definition.func_name, args=args, kwargs=kwargs), output=output, error=error, ) printc(f"call output: {function_output}", color="yellow") return function_output
[docs] def bicall(self, *args: Any, **kwargs: Any) -> Union[FunctionOutput, Parameter]: r"""This should only be used in training, where a fun is required to be a FunGradComponent where the output from function execution is a Parameter. """ if self._is_async: raise ValueError("FunctionTool is asynchronous, use acall instead") output, error = None, None # NOTE: special case: # self.fn can have both train and eval mode or untrainable as a function. try: log.debug(f"bicall args: {args}, kwargs: {kwargs}, fn: {self.fn}") # TODO: might to support more types of functions output = self.fn(*args, **kwargs) log.debug(f"output 1: {output}") printc(f"output 1: {output}", color="yellow") except Exception as e: log.error(f"Error at calling {self.fn}: {e}") error = f"Error at calling {self.fn}: {e}" if isinstance(output, Parameter): if not self.training: raise ValueError( f"FunctionTool {self.definition.func_name} is in eval mode, but the output is Parameter" ) output.data = FunctionOutput( name=self.definition.func_name, # raw_input={"args": args, "kwargs": kwargs}, input=Function( name=self.definition.func_name, args=args, kwargs=kwargs ), output=output.data, error=error, ) return output output = FunctionOutput( name=self.definition.func_name, input=Function(name=self.definition.func_name, args=args, kwargs=kwargs), output=output, error=error, ) printc(f"function output: {output}", color="yellow") return output
[docs] async def acall(self, *args, **kwargs) -> Union[FunctionOutput, Parameter]: """ Async call the function. Handles all function types appropriately. For different function types: - SYNC: Returns FunctionOutput with the result - ASYNC: Awaits the coroutine and returns FunctionOutput with the result - SYNC_GENERATOR: Returns FunctionOutput with the generator object - ASYNC_GENERATOR: Returns FunctionOutput with the async generator object Note: For generators, users need to iterate over the generator themselves. """ output, error = None, None try: if self.function_type == FunctionType.SYNC: # Sync function - call directly output = self.fn(*args, **kwargs) elif self.function_type == FunctionType.ASYNC: # Async function - await the coroutine output = await self.fn(*args, **kwargs) elif self.function_type == FunctionType.SYNC_GENERATOR: # Sync generator - return the generator object output = self.fn(*args, **kwargs) elif self.function_type == FunctionType.ASYNC_GENERATOR: # Async generator - return the async generator object output = self.fn(*args, **kwargs) else: raise ValueError(f"Unsupported function type: {self.function_type}") except Exception as e: log.error(f"Error at calling {self.fn}: {e}") error = f"Error at calling {self.fn}: {e}" # Handle Parameter output (training mode) if isinstance(output, Parameter): if not self.training: raise ValueError( f"FunctionTool {self.definition.func_name} is in eval mode, but the output is Parameter" ) output.data = FunctionOutput( name=self.definition.func_name, input=Function( name=self.definition.func_name, args=args, kwargs=kwargs ), output=output.data, error=error, ) return output function_output = FunctionOutput( name=self.definition.func_name, input=Function(name=self.definition.func_name, args=args, kwargs=kwargs), output=output, error=error, ) return function_output
# def execute(self, *args, **kwargs) -> FunctionOutput: # r"""Execute the function synchronously or asynchronously based on the function type. # No matter of the function type, you can run the function using both asyncio and without asyncio. # Use it with caution as it might block the event loop. # Example: # .. code-block:: python # import asyncio # import time # async def async_function_1(): # await asyncio.sleep(1) # return "Function 1 completed" # def sync_function_1(): # time.sleep(1) # return "Function 1 completed" # async def async_function_2(): # await asyncio.sleep(2) # return "Function 2 completed" # def sync_function_2(): # time.sleep(2) # return "Function 2 completed" # async_tool_1 = FunctionTool(async_function_1) # sync_tool_1 = FunctionTool(sync_function_2) # async_tool_2 = FunctionTool(async_function_2) # sync_tool_2 = FunctionTool(sync_function_2) # def run_sync_and_async_mix_without_wait(): # # both sync and async tool can use execute # # sync tool can also use call # # takes 5 seconds (1+1+2) + overhead # start_time = time.time() # results = [ # async_tool_1.execute(), # sync_tool_1.execute(), # sync_tool_2.call(), # ] # end_time = time.time() # print(f"run_sync_and_async_mix_without_wait time: {end_time - start_time}") # return results # async def run_sync_and_async_mix(): # # both sync and async tool can use execute&to_thread # # async tool can also use acall without to_thread # # takes a bit over 2 seconds max(2) # start_time = time.time() # results = await asyncio.gather( # async_tool_1.execute(), # sync_tool_1.execute(), # async_tool_2.acall(), # ) # end_time = time.time() # print(f"run_sync_and_async_mix time: {end_time - start_time}") # return results # run_sync_and_async_mix_without_wait() # asyncio.run(run_sync_and_async_mix()) # """ # if self._is_async: # log.debug(f"Running async function: {self.fn}") # if is_running_in_event_loop(): # result = asyncio.create_task(self.acall(*args, **kwargs)) # else: # result = asyncio.run(self.acall(*args, **kwargs)) # # NOTE: in juptyer notebook, it is always running in event loop # else: # log.debug(f"Running sync function: {self.fn}") # if is_running_in_event_loop(): # log.debug(f"Running sync function in event loop: {self.fn}") # result = asyncio.to_thread(self.call, *args, **kwargs) # else: # result = self.call(*args, **kwargs) # return result # def __call__(self, *args, **kwargs) -> FunctionOutput: # r"""Execute the function synchronously or asynchronously based on the function type.""" # return self.execute(*args, **kwargs) def _extra_repr(self) -> str: s = f"fn: {self.fn}, type: {self.function_type}, definition: {self.definition}" if self.class_instance is not None: s += f", class_instance: {self.class_instance}" return s
if __name__ == "__main__": # import asyncio # import time # async def async_function_1(): # await asyncio.sleep(1) # return "Function 1 completed" # def sync_function_1(): # time.sleep(1) # return "Function 1 completed" # async def async_function_2(): # await asyncio.sleep(2) # return "Function 2 completed" # def sync_function_2(): # time.sleep(2) # return "Function 2 completed" # async_tool_1 = FunctionTool(async_function_1) # sync_tool_1 = FunctionTool(sync_function_2) # async_tool_2 = FunctionTool(async_function_2) # sync_tool_2 = FunctionTool(sync_function_2) # def run_sync_and_async_mix_without_wait(): # # both sync and async tool can use execute # # sync tool can also use call # # takes 5 seconds (1+1+2) + overhead # start_time = time.time() # results = [ # async_tool_1.execute(), # sync_tool_1.execute(), # sync_tool_2.call(), # ] # print(results) # end_time = time.time() # print(f"run_sync_and_async_mix_without_wait time: {end_time - start_time}") # return results # async def run_sync_and_async_mix(): # # both sync and async tool can use execute&to_thread # # async tool can also use acall without to_thread # # takes a bit over 2 seconds max(2) # start_time = time.time() # results = await asyncio.gather( # async_tool_1.execute(), # sync_tool_1.execute(), # async_tool_2.acall(), # ) # print(results) # end_time = time.time() # print(f"run_sync_and_async_mix time: {end_time - start_time}") # return results # print(async_tool_1.execute()) # run_sync_and_async_mix_without_wait() # asyncio.run(run_sync_and_async_mix()) from adalflow.components.model_client import OpenAIClient from adalflow.core.generator import Generator from adalflow.optim.parameter import Parameter from adalflow.core.types import GeneratorOutput from adalflow.utils import setup_env, printc setup_env() llm = Generator( model_client=OpenAIClient(), model_kwargs={"model": "gpt-3.5-turbo"}, ) # llm.train() def llm_as_tool(input: str, id: Optional[str] = None) -> str: """Used as a calculator tool.""" printc(f"llm_as_tool: {input}", color="yellow") return llm(prompt_kwargs={"input_str": input}, id=id) llm_tool = FunctionTool(llm_as_tool, component=llm) llm_tool.train() output: Parameter = llm_tool("What is 2+2?") output.draw_graph() print(output) llm_tool.eval() output: FunctionTool = llm_tool("What is 2+2?") print(output) assert isinstance(output, FunctionOutput) assert isinstance(output.output, GeneratorOutput) # grad component from adalflow.optim.grad_component import fun_to_grad_component from adalflow.optim.parameter import ParameterType @fun_to_grad_component( desc="Finish", doc_string=Parameter( data="Finish the task with verbatim short factoid responses from retrieved context.", param_type=ParameterType.PROMPT, requires_opt=True, role_desc="Instruct how the agent creates the final answer from the step history.", name="doc_string", ), ) def finish(answer: str, **kwargs) -> str: # """Finish the task with verbatim short factoid responses from retrieved context.""" # printc(f"finish: {answer}", color="yellow") return answer finish_tool = FunctionTool(fn=finish, component=finish) definition = finish_tool.definition print(definition) # call function finish_tool.train() output: Parameter = finish_tool( "Finish the task with verbatim short factoid responses from retrieved context." ) print(output)