Source code for core.generator

"""Generator is a user-facing orchestration component with a simple and unified interface for LLM prediction.

It is a pipeline that consists of three subcomponents."""

import json
import re
from pathlib import Path

from typing import Any, Dict, Optional, Union, Callable, Tuple, List
import logging


from adalflow.core.types import (
    ModelType,
    GeneratorOutput,
    GeneratorOutputType,
)
from adalflow.core.component import Component
from adalflow.optim.grad_component import GradComponent
from adalflow.core.base_data_class import DataClass


from adalflow.optim.parameter import Parameter, GradientContext
from adalflow.optim.types import ParameterType

from adalflow.core.prompt_builder import Prompt
from adalflow.core.functional import compose_model_kwargs
from adalflow.core.model_client import ModelClient
from adalflow.core.default_prompt_template import DEFAULT_LIGHTRAG_SYSTEM_PROMPT
from adalflow.optim.function import BackwardContext
from adalflow.utils.cache import CachedEngine
from adalflow.tracing.callback_manager import CallbackManager
from adalflow.utils.global_config import get_adalflow_default_root_path

from adalflow.optim.text_grad.backend_engine_prompt import (
    FEEDBACK_ENGINE_TEMPLATE,
    LLM_CONVERSATION_TEMPLATE,
    VARIABLE_AND_PEERS_INFO,
    CONVERSATION_START_INSTRUCTION_BASE,
    CONVERSATION_START_INSTRUCTION_CHAIN,
    OBJECTIVE_INSTRUCTION_BASE,
    OBJECTIVE_INSTRUCTION_CHAIN,
)

__all__ = ["Generator", "BackwardEngine", "create_teacher_generator"]


log = logging.getLogger(__name__)

PromptArgType = Dict[str, Union[str, Parameter]]


[docs] class Generator(GradComponent, CachedEngine, CallbackManager): __doc__ = """An user-facing orchestration component for LLM prediction. It is also a GradComponent that can be used for backpropagation through the LLM model. By orchestrating the following three components along with their required arguments, it enables any LLM prediction with required task output format. - Prompt - Model client - Output processors Args: model_client (ModelClient): The model client to use for the generator. model_kwargs (Dict[str, Any], optional): The model kwargs to pass to the model client. Defaults to {}. Please refer to :ref:`ModelClient<components-model_client>` for the details on how to set the model_kwargs for your specific model if it is from our library. template (Optional[str], optional): The template for the prompt. Defaults to :ref:`DEFAULT_LIGHTRAG_SYSTEM_PROMPT<core-default_prompt_template>`. prompt_kwargs (Optional[Dict], optional): The preset prompt kwargs to fill in the variables in the prompt. Defaults to None. output_processors (Optional[Component], optional): The output processors after model call. It can be a single component or a chained component via ``Sequential``. Defaults to None. trainable_params (Optional[List[str]], optional): The list of trainable parameters. Defaults to []. Note: The output_processors will be applied to the string output of the model completion. And the result will be stored in the data field of the output. And we encourage you to only use it to parse the response to data format you will use later. """ model_type: ModelType = ModelType.LLM model_client: ModelClient # for better type checking _use_cache: bool = False _kwargs: Dict[str, Any] = {} def __init__( self, *, # args for the model model_client: ModelClient, # will be intialized in the main script model_kwargs: PromptArgType = {}, # args for the prompt template: Optional[str] = None, prompt_kwargs: Optional[Dict] = {}, # args for the output processing output_processors: Optional[Component] = None, name: Optional[str] = None, # args for the cache cache_path: Optional[str] = None, use_cache: bool = False, ) -> None: r"""The default prompt is set to the DEFAULT_LIGHTRAG_SYSTEM_PROMPT. It has the following variables: - task_desc_str - tools_str - example_str - chat_history_str - context_str - steps_str You can preset the prompt kwargs to fill in the variables in the prompt using prompt_kwargs. But you can replace the prompt and set any variables you want and use the prompt_kwargs to fill in the variables. """ if not isinstance(model_client, ModelClient): raise TypeError( f"{type(self).__name__} requires a ModelClient instance for model_client, please pass it as OpenAIClient() or GroqAPIClient() for example.\ Got {model_client} instead." ) template = template or DEFAULT_LIGHTRAG_SYSTEM_PROMPT # create the cache path and initialize the cache engine self.set_cache_path( cache_path, model_client, model_kwargs.get("model", "default") ) CachedEngine.__init__(self, cache_path=self.cache_path) Component.__init__(self) GradComponent.__init__(self) CallbackManager.__init__(self) self.name = name or self.__class__.__name__ self._init_prompt(template, prompt_kwargs) self.model_kwargs = model_kwargs.copy() # init the model client self.model_client = model_client self.output_processors = output_processors self.set_parameters(prompt_kwargs) # end of trainable parameters self.backward_engine: "BackwardEngine" = None log.info(f"Generator {self.name} initialized.") # to support better testing on the parts beside of the model call self.mock_output: bool = False self.mock_output_data: str = "mock data" self.data_map_func: Callable = None self.set_data_map_func() self._use_cache = use_cache self._kwargs = { "model_client": model_client, "model_kwargs": model_kwargs, "template": template, "prompt_kwargs": prompt_kwargs, "output_processors": output_processors, "name": name, "cache_path": cache_path, "use_cache": use_cache, } self._teacher: Optional["Generator"] = None
[docs] def set_cache_path(self, cache_path: str, model_client: object, model: str): """Set the cache path for the generator.""" # Construct a valid model string using the client class name and model self.model_str = f"{model_client.__class__.__name__}_{model}" # Remove any characters that are not allowed in file names (cross-platform) # On Windows, characters like `:<>?/\|*` are prohibited. self.model_str = re.sub(r"[^a-zA-Z0-9_\-]", "_", self.model_str) _cache_path = ( get_adalflow_default_root_path() if cache_path is None else cache_path ) # Use pathlib to handle paths more safely across OS self.cache_path = Path(_cache_path) / f"cache_{self.model_str}.db" log.debug(f"Cache path set to: {self.cache_path}")
[docs] def get_cache_path(self) -> str: r"""Get the cache path for the generator.""" return self.cache_path
@staticmethod def _get_default_mapping( output: "GeneratorOutput" = None, ) -> Tuple[Dict[str, Callable], List[str]]: if ( output.data and isinstance(output.data, DataClass) and len(output.data.get_output_fields()) > 0 ): output_fields = output.data.get_output_fields() output_mapping = { f: lambda x, f=f: getattr(x.data, f) for f in output_fields } elif output.raw_response: output_fields = ["raw_response"] output_mapping = {f: lambda x, f=f: getattr(x, f) for f in output_fields} output_fields = ["Answer"] output_mapping["Example"] = output_mapping["raw_response"] del output_mapping["raw_response"] # elif output.error: # output_fields = ["raw_response", "error"] # output_mapping = { # "error": lambda x: x.error, # "raw_response": lambda x: x.raw_response, # } # output_fields = ["Answer"] return output_mapping, output_fields
[docs] def set_mock_output( self, mock_output: bool = True, mock_output_data: str = "mock data" ): self.mock_output = mock_output self.mock_output_data = mock_output_data
[docs] def reset_mock_output(self): self.mock_output = False self.mock_output_data = "mock data"
[docs] def set_parameters(self, prompt_kwargs: PromptArgType): r"""Set name for each paramter and set all context for each other. Make all parameters attributes to the generator for finding them easily for optimizers and other components. """ for key, p in prompt_kwargs.items(): if isinstance(p, Parameter): if not p.name or p.name == "": p.name = key peers = [ p for k, p in prompt_kwargs.items() if isinstance(p, Parameter) and k != key ] p.set_peers(peers) setattr(self, key, p)
def _init_prompt(self, template: str, prompt_kwargs: Dict): r"""Initialize the prompt with the template and prompt_kwargs.""" self.template = template self.prompt_kwargs = prompt_kwargs # NOTE: Prompt can handle parameters self.prompt = Prompt(template=template, prompt_kwargs=self.prompt_kwargs)
[docs] @classmethod def from_config(cls, config: Dict[str, Any]) -> "Generator": r"""Create a Generator instance from the config dictionary. Example: .. code-block:: python config = { "model_client": { "component_name": "OpenAIClient", "component_config": {} }, "model_kwargs": {"model": "gpt-3.5-turbo", "temperature": 0} } generator = Generator.from_config(config) """ # create init_kwargs from the config assert "model_client" in config, "model_client is required in the config" return super().from_config(config)
def _compose_model_kwargs(self, **model_kwargs) -> Dict: r""" The model configuration exclude the input itself. Combine the default model, model_kwargs with the passed model_kwargs. Example: model_kwargs = {"temperature": 0.5, "model": "gpt-3.5-turbo"} self.model_kwargs = {"model": "gpt-3.5-turbo"} combine_kwargs(model_kwargs) => {"temperature": 0.5, "model": "gpt-3.5-turbo"} """ combined_model_kwargs = self.model_kwargs.copy() if model_kwargs: combined_model_kwargs.update(model_kwargs) return combined_model_kwargs # TODO: use prompt_kwargs as users are already familiar with it
[docs] def print_prompt(self, **kwargs) -> str: return self.prompt.print_prompt(**kwargs)
[docs] def get_prompt(self, **kwargs) -> str: return self.prompt.call(**kwargs)
def _extra_repr(self) -> str: s = f"model_kwargs={self.model_kwargs}, model_type={self.model_type}" return s def _post_call(self, completion: Any) -> GeneratorOutput: r"""Get string completion and process it with the output_processors.""" # parse chat completion will only fill the raw_response output: GeneratorOutput = self.model_client.parse_chat_completion(completion) # Now adding the data filed to the output data = output.raw_response if self.output_processors: if data: try: data = self.output_processors(data) output.data = data except Exception as e: log.error(f"Error processing the output processors: {e}") output.error = str(e) else: output.data = data return output def _pre_call(self, prompt_kwargs: Dict, model_kwargs: Dict) -> Dict[str, Any]: r"""Prepare the input, prompt_kwargs, model_kwargs for the model call.""" # 1. render the prompt from the template prompt_str = self.prompt.call(**prompt_kwargs).strip() # 2. combine the model_kwargs with the default model_kwargs composed_model_kwargs = self._compose_model_kwargs(**model_kwargs) # 3. convert app's inputs to api inputs api_kwargs = self.model_client.convert_inputs_to_api_kwargs( input=prompt_str, model_kwargs=composed_model_kwargs, model_type=self.model_type, ) return api_kwargs def _model_client_call(self, api_kwargs: Dict, use_cache: bool = False) -> Any: # call the model client try: # check the cache index_content = json.dumps(api_kwargs) # + f"training: {self.training}" if use_cache: # print(f"check cache first: {no_cache}") cached_completion = self._check_cache(index_content) if cached_completion is not None: return cached_completion completion = self.model_client.call( api_kwargs=api_kwargs, model_type=self.model_type ) # prepare cache if use_cache: self._save_cache(index_content, completion) return completion except Exception as e: log.error(f"Error calling the model: {e}") raise e ############################################################################################################## ### Forward, backwards, teacher generator, create demo data instance, # are for training and backpropagation ##############################################################################################################
[docs] def create_demo_data_instance( self, input_prompt_kwargs: Dict[str, Any], output: GeneratorOutput, id: Optional[str] = None, ): r"""Automatically create a demo data instance from the input and output of the generator. Used to trace the demos for the demo paramter in the prompt_kwargs. Part of the few-shot learning. """ from adalflow.core.base_data_class import DynamicDataClassFactory # map the input fields demo_data = {"id": id} demo_data_class_output_mapping, output_fields = self._get_default_mapping( output ) for k, v in input_prompt_kwargs.items(): if isinstance(v, Parameter): demo_data[k] = v.map_to_successor(self) else: demo_data[k] = v # map the output fields for key, value in demo_data_class_output_mapping.items(): demo_data[key] = value(output) obj = DynamicDataClassFactory.from_dict(demo_data) obj.set_input_fields([k for k in input_prompt_kwargs.keys()]) obj.set_output_fields(output_fields) if obj is None: raise ValueError(f"Error creating the demo data instance:{demo_data}") return obj
[docs] def set_backward_engine(self, backward_engine: "BackwardEngine" = None): if backward_engine is None: backward_engine = BackwardEngine( model_client=self.model_client, model_kwargs=self.model_kwargs, ) if self.mock_output: backward_engine.set_mock_output() self.backward_engine = backward_engine
[docs] def set_teacher_generator(self, teacher: "Generator" = None): self._teacher = teacher print(f"Teacher generator set: {self._teacher}, teacher {teacher}") log.debug(f"Teacher generator set: {self._teacher}")
[docs] def set_data_map_func(self, map_func: Callable = None): def default_map_func(data: "GeneratorOutputType") -> str: return ( data.data if data.data else self.failure_message_to_backward_engine(data) ) self.data_map_func = map_func or default_map_func log.debug(f"Data map function set: {self.data_map_func}")
# TODO: limit to only one demo parameter.
[docs] @staticmethod def find_demo_parameter(prompt_kwargs: Dict) -> Optional[Parameter]: from adalflow.optim.parameter import Parameter, ParameterType for p in prompt_kwargs.values(): if isinstance(p, Parameter) and p.param_type == ParameterType.DEMOS: return p return None
# NOTE: when training is true, forward will be called in __call__ instead of call
[docs] def forward( self, prompt_kwargs: Optional[Dict] = {}, # the input need to be passed to the prompt model_kwargs: Optional[Dict] = {}, id: Optional[str] = None, ) -> "Parameter": # 1. call the model output: GeneratorOutputType = None input_args = {} if self.mock_output: output = GeneratorOutput(data=self.mock_output_data) else: if self.teacher_mode and not isinstance(self, BackwardEngine): if not self._teacher: print( f"prompt_kwargs: {prompt_kwargs}, model_kwargs: {model_kwargs}" ) print(f"names: {self.name}") raise ValueError("Teacher generator is not set.") log.info(f"Using teacher: {self._teacher}") input_args = { "prompt_kwargs": compose_model_kwargs( self._teacher.prompt_kwargs, prompt_kwargs ), "model_kwargs": compose_model_kwargs( self._teacher.model_kwargs, model_kwargs ), } output = self._teacher.call(prompt_kwargs, model_kwargs) else: input_args = { "prompt_kwargs": compose_model_kwargs( self.prompt_kwargs, prompt_kwargs ), "model_kwargs": compose_model_kwargs( self.model_kwargs, model_kwargs ), } output = self.call(prompt_kwargs, model_kwargs) # 2. Generate a Parameter object from the output combined_prompt_kwargs = compose_model_kwargs(self.prompt_kwargs, prompt_kwargs) if self.data_map_func is None: self.set_data_map_func() predecessors = [ p for p in combined_prompt_kwargs.values() if isinstance(p, Parameter) ] log.debug(f"Predecessors: {predecessors} for generator {self.name}") param_data = ( output.raw_response if output and not output.error else f"Error: {output.error}, raw_response: {output.raw_response}" ) response: Parameter = Parameter( data=param_data, name=self.name + "_output", role_desc=f"Output from (llm) {self.name}", param_type=ParameterType.GENERATOR_OUTPUT, ) response.set_predecessors(predecessors) response.trace_forward_pass(input_args=input_args, full_response=output) # attach the demo to the demo parameter # if self.tracing: demo_param = self.find_demo_parameter(combined_prompt_kwargs) if demo_param: if id is None: raise ValueError( "ID is required for tracing. Please pass it to your Geneartor call." ) demo = self.create_demo_data_instance( prompt_kwargs, output, id=id, ) demo_param.add_to_trace(demo, is_teacher=self.teacher_mode) else: log.debug( "No demo parameter found in the prompt_kwargs. You can not trace the demo data." ) if not self.backward_engine: # self.set_backward_engine() log.debug(f"Backward engine: {self.backward_engine}") # attach a funtion to compute gradient for predecessors response.set_grad_fn( BackwardContext( backward_fn=self.backward, backward_engine=self.backward_engine, response=response, prompt_kwargs={ k: v.data if isinstance(v, Parameter) else v for k, v in prompt_kwargs.items() }, template=self.template, prompt_str=self.get_prompt(**combined_prompt_kwargs), id=id, ) ) return response
# == pytorch custom autograd function ==
[docs] def backward( self, response: Parameter, # the output of the forward pass prompt_kwargs: Dict, template: str, prompt_str: str, backward_engine: Optional["Generator"] = None, id: Optional[str] = None, # the id of the input ) -> Parameter: log.info(f"Generator: Backward: {response}") children_params = response.predecessors is_chain = True if response.get_gradient_and_context_text().strip() == "": log.info(f"Generator: Backward: No gradient found for {response}.") # backward score to the demo parameter for pred in children_params: if pred.requires_opt: pred.set_score(response._score) log.debug( f"backpropagate the score {response._score} to {pred.name}, is_teacher: {self.teacher_mode}" ) if pred.param_type == ParameterType.DEMOS: # Accumulate the score to the demo pred.add_score_to_trace( trace_id=id, score=response._score, is_teacher=self.teacher_mode ) log.debug(f"Pred: {pred.name}, traces: {pred._traces}") # 1.backward for text-gradients if backward_engine: log.debug( f"Generator: Backward engine is set for the generator. {backward_engine}" ) for pred in children_params: if not pred.requires_opt or pred.param_type == ParameterType.DEMOS: log.debug( f"EvalFnToTextLoss: Skipping {pred} as it does not require optimization." ) continue self._backward_through_one_predecessor( pred=pred, response=response, prompt_kwargs=prompt_kwargs, template=template, backward_engine=backward_engine, prompt_str=prompt_str, is_chain=is_chain, ) else: log.debug("Backward engine is not set for the generator. No text gradient.")
@staticmethod def _backward_through_one_predecessor( pred: Parameter, response: Parameter, prompt_kwargs: Dict[str, str], template: str, backward_engine: "BackwardEngine", prompt_str: str, is_chain: bool = False, ): if not pred.requires_opt: log.debug( f"Generator: Skipping {pred} as it does not require optimization." ) return log.debug(f"Generator: Backward through {pred}, is_chain: {is_chain}") if pred.check_if_already_computed_gradient_respect_to(response.id): log.debug( f"Generator: Skipping {pred} as the gradient is already computed." ) return instruction_str, objective_str = None, None # 1. Generate the conversation string input_prompt_kwargs = { k: v.data if isinstance(v, Parameter) else v for k, v in prompt_kwargs.items() } conversation_prompt_kwargs = { "variable_name": pred.name, "input_value": input_prompt_kwargs, "llm_output": response.data, } conversation_str = Prompt( prompt_kwargs=conversation_prompt_kwargs, template=LLM_CONVERSATION_TEMPLATE, )() variable_dict = pred.get_param_info() variable_and_peers_info = Prompt( prompt_kwargs={"variable": variable_dict, "peers": pred.peers}, template=VARIABLE_AND_PEERS_INFO, )() conv_ins_template = CONVERSATION_START_INSTRUCTION_BASE obj_ins_template = OBJECTIVE_INSTRUCTION_BASE if is_chain: conv_ins_template = CONVERSATION_START_INSTRUCTION_CHAIN obj_ins_template = OBJECTIVE_INSTRUCTION_CHAIN instruction_str = Prompt( template=conv_ins_template, prompt_kwargs={ "variable_and_peers_info": variable_and_peers_info, "conversation_str": conversation_str, }, )() log.info(f"Conversation start instruction base str: {instruction_str}") objective_str = Prompt( template=obj_ins_template, prompt_kwargs={ "response_desc": response.role_desc, "response_gradient": response.get_gradient_and_context_text(), "instruction_to_backward_engine": pred.instruction_to_backward_engine, }, )() backward_engine_prompt_kwargs = { "conversation_sec": instruction_str, "objective_instruction_sec": objective_str, } gradient_output: GeneratorOutput = None if response._score is not None and float(response._score) > 0.9: log.debug(f"EvalFnToTextLoss: Skipping {pred} as the score is high enough.") manual_response = f"You get a high score: {response._score}." gradient_output = GeneratorOutput( data=manual_response, raw_response=manual_response ) else: gradient_output: GeneratorOutput = backward_engine( prompt_kwargs=backward_engine_prompt_kwargs ) # USE this to trace each node's input and output, all nodes can be visualized log.info( f"Generator Backward Engine Prompt: {backward_engine.get_prompt( **backward_engine_prompt_kwargs)}" ) gradient_value = ( gradient_output.data or backward_engine.failure_message_to_optimizer(gradient_output) ) log.info( f"Generator Gradient value: {gradient_value}, raw response: {gradient_output.raw_response}" ) # TODO: make it a debug feature # prompt_str = backward_engine.get_prompt(**backward_engine_prompt_kwargs) var_gradient = Parameter( name=f"{response.name}_to_{pred.name}_grad", # gradient_prompt=prompt_str, # trace the prompt data=gradient_value, requires_opt=True, role_desc=f"feedback for {pred.name}", score=response._score, # add score to gradient param_type=ParameterType.GRADIENT, from_response_id=response.id, ) pred.add_gradient(var_gradient) pred.set_score(response._score) pred.gradients_context[var_gradient] = GradientContext( context=conversation_str, response_desc=response.role_desc, variable_desc=pred.role_desc, ) def _run_callbacks( self, output: GeneratorOutput, input: Dict, prompt_kwargs: Dict, model_kwargs: Dict, ): self.trigger_callbacks( "on_complete", output=output, input=input, prompt_kwargs=prompt_kwargs, model_kwargs=model_kwargs, ) if output.error: self.trigger_callbacks( "on_failure", output=output, input=input, prompt_kwargs=prompt_kwargs, model_kwargs=model_kwargs, ) else: self.trigger_callbacks( "on_success", output=output, input=input, prompt_kwargs=prompt_kwargs, model_kwargs=model_kwargs, )
[docs] def call( self, prompt_kwargs: Optional[Dict] = {}, # the input need to be passed to the prompt model_kwargs: Optional[Dict] = {}, use_cache: Optional[bool] = None, id: Optional[str] = None, ) -> GeneratorOutputType: r""" Call the model_client by formatting prompt from the prompt_kwargs, and passing the combined model_kwargs to the model client. """ if self.mock_output: return GeneratorOutput(data=self.mock_output_data, id=id) log.debug(f"prompt_kwargs: {prompt_kwargs}") log.debug(f"model_kwargs: {model_kwargs}") api_kwargs = self._pre_call(prompt_kwargs, model_kwargs) log.debug(f"api_kwargs: {api_kwargs}") output: GeneratorOutputType = None # call the model client completion = None use_cache = use_cache if use_cache is not None else self._use_cache try: completion = self._model_client_call( api_kwargs=api_kwargs, use_cache=use_cache ) except Exception as e: log.error(f"Error calling the model: {e}") output = GeneratorOutput(error=str(e), id=id) # process the completion if completion is not None: try: output = self._post_call(completion) except Exception as e: log.error(f"Error processing the output: {e}") output = GeneratorOutput( raw_response=str(completion), error=str(e), id=id ) # User only need to use one of them, no need to use them all. output.id = id self._run_callbacks( output, input=api_kwargs, prompt_kwargs=prompt_kwargs, model_kwargs=model_kwargs, ) log.info(f"output: {output}") return output
# TODO: training is not supported in async call yet
[docs] async def acall( self, prompt_kwargs: Optional[Dict] = {}, model_kwargs: Optional[Dict] = {}, use_cache: Optional[bool] = None, id: Optional[str] = None, ) -> GeneratorOutputType: r"""Async call the model with the input and model_kwargs. :warning:: Training is not supported in async call yet. """ log.info(f"prompt_kwargs: {prompt_kwargs}") log.info(f"model_kwargs: {model_kwargs}") api_kwargs = self._pre_call(prompt_kwargs, model_kwargs) output: GeneratorOutputType = None # call the model client completion = None try: completion = await self.model_client.acall( api_kwargs=api_kwargs, model_type=self.model_type ) except Exception as e: log.error(f"Error calling the model: {e}") output = GeneratorOutput(error=str(e)) if completion: try: output = self._post_call(completion) except Exception as e: log.error(f"Error processing the output: {e}") output = GeneratorOutput(raw_response=str(completion), error=str(e)) log.info(f"output: {output}") self._run_callbacks( output, input=api_kwargs, prompt_kwargs=prompt_kwargs, model_kwargs=model_kwargs, ) return output
def __call__(self, *args, **kwargs) -> Union[GeneratorOutputType, Any]: if self.training: log.debug("Training mode") return self.forward(*args, **kwargs) else: log.debug("Inference mode") return self.call(*args, **kwargs) def _extra_repr(self) -> str: # Create the string for model_kwargs s = f"model_kwargs={self.model_kwargs}, " # Create the string for trainable prompt_kwargs prompt_kwargs_repr = [ k for k, v in self.prompt_kwargs.items() if isinstance(v, Parameter) and v.requires_opt ] s += f"trainable_prompt_kwargs={prompt_kwargs_repr}" return s
[docs] def to_dict(self) -> Dict[str, Any]: r"""Convert the generator to a dictionary.""" # TODO: exclude default functions return super().to_dict()
[docs] @staticmethod def failure_message_to_backward_engine( gradient_response: GeneratorOutput, ) -> Optional[str]: response_value = None if gradient_response.error or not gradient_response.data: response_value = f"Error: {gradient_response.error}, Raw response: {gradient_response.raw_response}" return response_value
[docs] class BackwardEngine(Generator): # it is a generator with defaule template __doc__ = """The backward engine is a Generator with a default template for the backward pass. If you want to customize the template, you can create your own backward engine""" def __init__(self, **kwargs): if kwargs is None: kwargs = {} kwargs["template"] = FEEDBACK_ENGINE_TEMPLATE super().__init__(**kwargs) self.name = "BackwardEngine" self.teacher_mode = False
[docs] @staticmethod def failure_message_to_optimizer( gradient_response: GeneratorOutput, ) -> Optional[str]: gradient_value_data = None if gradient_response.error or not gradient_response.data: gradient_value_data = f"The backward engine failed to compute the gradient. Raw response: {gradient_response.raw_response}, Error: {gradient_response.error}" return gradient_value_data
[docs] def create_teacher_generator( student: Generator, model_client: ModelClient, model_kwargs: Dict[str, Any], template: Optional[str] = None, ) -> Generator: r"""Create a teacher generator from the student generator. Note: Teacher generator will have no parameters. If you want to keep it to be the same as the student, just create one each time your student has been updated. Or else, task.parameters will list teacher parameters. Args: student (Generator): The student generator. model_client (ModelClient): The model client to use for the teacher generator. model_kwargs (Dict[str, Any]): The model kwargs to pass to the model client. name (str, optional): The name of the teacher generator. Defaults to "teacher". Returns: Generator: The teacher generator. """ kwargs = student._kwargs.copy() kwargs["model_client"] = model_client kwargs["model_kwargs"] = model_kwargs if template: kwargs["template"] = template kwargs["name"] = f"{student.name}_teacher" prompt_kwargs_str: Dict[str, str] = {} for key, p in kwargs["prompt_kwargs"].items(): if isinstance(p, Parameter): prompt_kwargs_str[key] = str(p.data) else: prompt_kwargs_str[key] = p kwargs["prompt_kwargs"] = prompt_kwargs_str teacher = Generator( **kwargs, ) return teacher
if __name__ == "__main__": # test the generator with backward engine # TODO: move this to external local tests before packaging from adalflow.components.model_client import ( GroqAPIClient, OpenAIClient, GoogleGenAIClient, AnthropicAPIClient, ) from adalflow.utils import setup_env from adalflow.core.model_client import ModelClient setup_env() # log = get_logger(level="DEBUG") llama3_model = { "model_client": GroqAPIClient(), "model_kwargs": { "model": "llama-3.1-8b-instant", }, } gpt_3_model = { "model_client": OpenAIClient(), "model_kwargs": { "model": "gpt-3.5-turbo", }, } gemini_model = { "model_client": GoogleGenAIClient(), "model_kwargs": { "model": "gemini-1.0-pro", }, } claude_model = { "model_client": AnthropicAPIClient(), "model_kwargs": { "model": "claude-3-opus-20240229", "max_tokens": 100, }, } from adalflow.tracing.generator_call_logger import GeneratorCallLogger from functools import partial # setup the logger call_logger = GeneratorCallLogger(save_dir="traces") def on_complete(output, input, prompt_kwargs, model_kwargs, logger_call: Callable): logger_call( output=output, input=input, prompt_kwargs=prompt_kwargs, model_kwargs=model_kwargs, ) for model in [llama3_model, gpt_3_model, gemini_model, claude_model]: generator = Generator(**model) teacher = create_teacher_generator(generator, **claude_model) call_logger.register_generator("generator", "generator_call") # setup the callback logger_call = partial(call_logger.log_call, name="generator") generator.register_callback( "on_complete", partial(on_complete, logger_call=logger_call) ) output = generator( prompt_kwargs={ "input_str": "Hello, world!", } ) break # test the backward engine # TODO: test ollama and transformer client to update the change