Source code for core.func_tool

"""
Tool is LLM's extended capability which is one of the core design pattern of Agent. All tools can be wrapped in a FunctionTool class.
This helps to standardize the tool interface and metadata to communicate with the Agent.
"""

from typing import Any, Optional, Callable, Awaitable, Union
from inspect import iscoroutinefunction
import logging
import asyncio
import nest_asyncio


from adalflow.core.types import (
    FunctionDefinition,
    FunctionOutput,
    Function,
)
from adalflow.core import Component
from adalflow.core.functional import (
    get_fun_schema,
)
from inspect import signature

AsyncCallable = Callable[..., Awaitable[Any]]

log = logging.getLogger(__name__)


[docs] def is_running_in_event_loop() -> bool: try: loop = asyncio.get_running_loop() if loop.is_running(): return True else: return False except RuntimeError: return False
FunctionType = Union[Callable[..., Any], Awaitable[Callable[..., Any]]] # TODO: improve the support for async functions, similarly a component might be used as a tool
[docs] class FunctionTool(Component): __doc__ = r"""Describing and executing a function via call with arguments. container for a function that orchestrates the function formatting(to LLM), parsing, and execution. Function be used by LLM as a tool to achieve a specific task. Features: - Supports both synchronous and asynchronous functions via ``call`` and ``acall``. - Creates a FunctionDefinition from the function using ``get_fun_schema``. - Executs the function with arguments. [You can use Function and FunctionExpression as output format] - Please Parses the function call expression[FunctionExpression] into Function (name, args, kwargs). - call or acall, or use execute to execute the function. - via call with args and kwargs. - via eval without any context or sandboxing. - via sandboxed execute directionly using ``sandbox_exec``. """ def __init__( self, fn: FunctionType, definition: Optional[FunctionDefinition] = None, ): super().__init__() nest_asyncio.apply() assert fn is not None, "fn must be provided" self.fn = fn self._is_async = iscoroutinefunction(fn) self.definition = definition or self._create_fn_definition() if self._is_async: log.info(f"FunctionTool: {fn} is async: {self._is_async}") @property def is_async(self) -> bool: return self._is_async def _create_fn_definition(self) -> FunctionDefinition: name = self.fn.__name__ docstring = self.fn.__doc__ description = f"{docstring}" description = f"{name}{signature(self.fn)}\n{docstring}" # description = f"{name}{signature(self.fn)}\n{docstring}" fn_parameters = get_fun_schema(name, self.fn) return FunctionDefinition( func_name=name, func_desc=description, func_parameters=fn_parameters )
[docs] def call(self, *args: Any, **kwargs: Any) -> FunctionOutput: r"""Execute the function synchronously. Example: .. code-block:: python import time def sync_function_1(): time.sleep(1) return "Function 1 completed" tool_1 = FunctionTool(sync_function_1) output = tool_1.call() """ if self._is_async: raise ValueError("FunctionTool is asynchronous, use acall instead") output, error = None, None try: output = self.fn(*args, **kwargs) except Exception as e: log.error(f"Error at calling {self.fn}: {e}") # raise ValueError(f"Error: {e}") error = str(e) return FunctionOutput( name=self.definition.func_name, # raw_input={"args": args, "kwargs": kwargs}, input=Function(name=self.definition.func_name, args=args, kwargs=kwargs), output=output, error=error, )
[docs] async def acall(self, *args: Any, **kwargs: Any) -> FunctionOutput: r"""Execute the function asynchronously. Need to be called in an async function or using asyncio.run. Example: .. code-block:: python import asyncio async def async_function_1(): await asyncio.sleep(1) # Simulate async work return "Function 1 completed" async def call_async_function(): tool_1 = FunctionTool(async_function_1) output = await tool_1.acall() asyncio.run(call_async_function()) """ if not self._is_async: raise ValueError("FunctionTool is not asynchronous, use call instead") output = None error = None try: output = await self.fn(*args, **kwargs) except Exception as e: log.error(f"Error at calling {self.fn}: {e}") error = str(e) return FunctionOutput( name=self.definition.func_name, input=Function(name=self.definition.func_name, args=args, kwargs=kwargs), output=output, error=error, )
[docs] def execute(self, *args, **kwargs) -> FunctionOutput: r"""Execute the function synchronously or asynchronously based on the function type. No matter of the function type, you can run the function using both asyncio and without asyncio. Use it with caution as it might block the event loop. Example: .. code-block:: python import asyncio import time async def async_function_1(): await asyncio.sleep(1) return "Function 1 completed" def sync_function_1(): time.sleep(1) return "Function 1 completed" async def async_function_2(): await asyncio.sleep(2) return "Function 2 completed" def sync_function_2(): time.sleep(2) return "Function 2 completed" async_tool_1 = FunctionTool(async_function_1) sync_tool_1 = FunctionTool(sync_function_2) async_tool_2 = FunctionTool(async_function_2) sync_tool_2 = FunctionTool(sync_function_2) def run_sync_and_async_mix_without_wait(): # both sync and async tool can use execute # sync tool can also use call # takes 5 seconds (1+1+2) + overhead start_time = time.time() results = [ async_tool_1.execute(), sync_tool_1.execute(), sync_tool_2.call(), ] end_time = time.time() print(f"run_sync_and_async_mix_without_wait time: {end_time - start_time}") return results async def run_sync_and_async_mix(): # both sync and async tool can use execute&to_thread # async tool can also use acall without to_thread # takes a bit over 2 seconds max(2) start_time = time.time() results = await asyncio.gather( async_tool_1.execute(), sync_tool_1.execute(), async_tool_2.acall(), ) end_time = time.time() print(f"run_sync_and_async_mix time: {end_time - start_time}") return results run_sync_and_async_mix_without_wait() asyncio.run(run_sync_and_async_mix()) """ if self._is_async: log.debug(f"Running async function: {self.fn}") if is_running_in_event_loop(): result = asyncio.create_task(self.acall(*args, **kwargs)) else: result = asyncio.run(self.acall(*args, **kwargs)) # NOTE: in juptyer notebook, it is always running in event loop else: log.debug(f"Running sync function: {self.fn}") if is_running_in_event_loop(): log.debug(f"Running sync function in event loop: {self.fn}") result = asyncio.to_thread(self.call, *args, **kwargs) else: result = self.call(*args, **kwargs) return result
def __call__(self, *args, **kwargs) -> FunctionOutput: r"""Execute the function synchronously or asynchronously based on the function type.""" return self.execute(*args, **kwargs) def _extra_repr(self) -> str: s = f"fn: {self.fn}, async: {self._is_async}, definition: {self.definition}" return s
if __name__ == "__main__": import asyncio import time async def async_function_1(): await asyncio.sleep(1) return "Function 1 completed" def sync_function_1(): time.sleep(1) return "Function 1 completed" async def async_function_2(): await asyncio.sleep(2) return "Function 2 completed" def sync_function_2(): time.sleep(2) return "Function 2 completed" async_tool_1 = FunctionTool(async_function_1) sync_tool_1 = FunctionTool(sync_function_2) async_tool_2 = FunctionTool(async_function_2) sync_tool_2 = FunctionTool(sync_function_2) def run_sync_and_async_mix_without_wait(): # both sync and async tool can use execute # sync tool can also use call # takes 5 seconds (1+1+2) + overhead start_time = time.time() results = [ async_tool_1.execute(), sync_tool_1.execute(), sync_tool_2.call(), ] print(results) end_time = time.time() print(f"run_sync_and_async_mix_without_wait time: {end_time - start_time}") return results async def run_sync_and_async_mix(): # both sync and async tool can use execute&to_thread # async tool can also use acall without to_thread # takes a bit over 2 seconds max(2) start_time = time.time() results = await asyncio.gather( async_tool_1.execute(), sync_tool_1.execute(), async_tool_2.acall(), ) print(results) end_time = time.time() print(f"run_sync_and_async_mix time: {end_time - start_time}") return results print(async_tool_1.execute()) run_sync_and_async_mix_without_wait() asyncio.run(run_sync_and_async_mix())