"""Functional interface.Core functions we use to build across the components.Users can leverage these functions to customize their own components."""fromtypingimport(Dict,Any,Callable,Union,List,Tuple,Optional,Type,get_type_hints,get_origin,get_args,Set,Sequence,TypeVar,)importloggingimportnumpyasnpfromenumimportEnumimportreimportjsonimportyamlimportastimportthreadingfrominspectimportsignature,Parameterfromdataclassesimportfields,is_dataclass,MISSING,Fieldlog=logging.getLogger(__name__)ExcludeType=Optional[Dict[str,List[str]]]T_co=TypeVar("T_co",covariant=True)######################################################################################### For Dataclass base class and all schema related functions########################################################################################
[docs]defcustom_asdict(obj,*,dict_factory=dict,exclude:ExcludeType=None)->Dict[str,Any]:"""Equivalent to asdict() from dataclasses module but with exclude fields. Return the fields of a dataclass instance as a new dictionary mapping field names to field values, while allowing certain fields to be excluded. If given, 'dict_factory' will be used instead of built-in dict. The function applies recursively to field values that are dataclass instances. This will also look into built-in containers: tuples, lists, and dicts. """ifnotis_dataclass_instance(obj):raiseTypeError("custom_asdict() should be called on dataclass instances")return_asdict_inner(obj,dict_factory,excludeor{})
def_asdict_inner(obj,dict_factory,exclude):ifis_dataclass_instance(obj):result=[]forfinfields(obj):iff.nameinexclude.get(obj.__class__.__name__,[]):continuevalue=_asdict_inner(getattr(obj,f.name),dict_factory,exclude)result.append((f.name,value))returndict_factory(result)elifisinstance(obj,tuple)andhasattr(obj,"_fields"):returntype(obj)(*[_asdict_inner(v,dict_factory,exclude)forvinobj])elifisinstance(obj,(list,tuple)):returntype(obj)(_asdict_inner(v,dict_factory,exclude)forvinobj)elifisinstance(obj,dict):returntype(obj)((_asdict_inner(k,dict_factory,exclude),_asdict_inner(v,dict_factory,exclude),)fork,vinobj.items())else:returnobj# return deepcopy(obj)# def dataclass_obj_to_dict(# obj: Any, exclude: ExcludeType = None, parent_key: str = ""# ) -> Dict[str, Any]:# r"""Convert a dataclass object to a dictionary With exclude fields.# Equivalent to asdict() from dataclasses module but with exclude fields.# Supports nested dataclasses, lists, and dictionaries.# Allow exclude keys for each dataclass object.# Example:# .. code-block:: python# from dataclasses import dataclass# from typing import List# @dataclass# class TrecData:# question: str# label: int# @dataclass# class TrecDataList:# data: List[TrecData]# name: str# trec_data = TrecData(question="What is the capital of France?", label=0)# trec_data_list = TrecDataList(data=[trec_data], name="trec_data_list")# dataclass_obj_to_dict(trec_data_list, exclude={"TrecData": ["label"], "TrecDataList": ["name"]})# # Output:# # {'data': [{'question': 'What is the capital of France?'}]}# """# if not is_dataclass_instance(obj):# raise ValueError(# f"dataclass_obj_to_dict() should be called with a dataclass instance."# )# if exclude is None:# exclude = {}# obj_class_name = obj.__class__.__name__# current_exclude = exclude.get(obj_class_name, [])# if hasattr(obj, "__dataclass_fields__"):# return {# key: dataclass_obj_to_dict(value, exclude, parent_key=key)# for key, value in obj.__dict__.items()# if key not in current_exclude# }# elif isinstance(obj, list):# return [dataclass_obj_to_dict(item, exclude, parent_key) for item in obj]# elif isinstance(obj, set):# return {dataclass_obj_to_dict(item, exclude, parent_key) for item in obj}# elif isinstance(obj, tuple):# return (dataclass_obj_to_dict(item, exclude, parent_key) for item in obj)# elif isinstance(obj, dict):# return {# key: dataclass_obj_to_dict(value, exclude, parent_key)# for key, value in obj.items()# }# else:# return deepcopy(obj)
[docs]defis_potential_dataclass(t):"""Check if the type is directly a dataclass or potentially a wrapped dataclass like Optional."""origin=get_origin(t)iforiginisUnion:# This checks if any of the arguments in a Union (which is what Optional is) is a dataclassreturnany(is_dataclass(arg)forarginget_args(t)ifargisnottype(None))returnis_dataclass(t)
[docs]defextract_dataclass_type(type_hint):"""Extract the actual dataclass type from a type hint that could be Optional or other generic."""origin=get_origin(type_hint)iforiginin(Union,Optional):# Unpack Optional[SomeClass] or Union[SomeClass, None]args=get_args(type_hint)forarginargs:ifargisnottype(None)andis_dataclass(arg):returnargreturntype_hintifis_dataclass(type_hint)elseNone
[docs]defcheck_data_class_field_args_zero(cls):"""Check if the field is a dataclass."""return(hasattr(cls,"__args__")andlen(cls.__args__)>0andcls.__args__[0]andhasattr(cls.__args__[0],"__dataclass_fields__"))
[docs]defcheck_if_class_field_args_zero_exists(cls):"""Check if the field is a dataclass."""returnhasattr(cls,"__args__")andlen(cls.__args__)>0andcls.__args__[0]
[docs]defcheck_data_class_field_args_one(cls):"""Check if the field is a dataclass."""return(hasattr(cls,"__args__")andlen(cls.__args__)>1andcls.__args__[1]andhasattr(cls.__args__[1],"__dataclass_fields__"))
[docs]defcheck_if_class_field_args_one_exists(cls):"""Check if the field is a dataclass."""returnhasattr(cls,"__args__")andlen(cls.__args__)>1andcls.__args__[1]
[docs]defdataclass_obj_from_dict(cls:Type[object],data:Dict[str,object])->Any:r"""Convert a dictionary to a dataclass object. Supports nested dataclasses, lists, and dictionaries. .. note:: If any required field is missing, it will raise an error. Do not use the dict that has excluded required fields. Example: .. code-block:: python from dataclasses import dataclass from typing import List @dataclass class TrecData: question: str label: int @dataclass class TrecDataList: data: List[TrecData] name: str trec_data_dict = {"data": [{"question": "What is the capital of France?", "label": 0}], "name": "trec_data_list"} dataclass_obj_from_dict(TrecDataList, trec_data_dict) # Output: # TrecDataList(data=[TrecData(question='What is the capital of France?', label=0)], name='trec_data_list') """log.debug(f"Dataclass: {cls}, Data: {data}")ifdataisNone:returnNoneifis_dataclass(cls)oris_potential_dataclass(cls):# Optional[Address] will be false, and true for each checklog.debug(f"{is_dataclass(cls)} of {cls}, {is_potential_dataclass(cls)} of {cls}")# Ensure the data is a dictionaryifnotisinstance(data,dict):raiseValueError(f"Expected data of type dict for {cls}, but got {type(data).__name__}")cls_type=extract_dataclass_type(cls)fieldtypes={f.name:f.typeforfincls_type.__dataclass_fields__.values()}restored_data=cls_type(**{key:dataclass_obj_from_dict(fieldtypes[key],value)forkey,valueindata.items()})returnrestored_dataelifisinstance(data,(list,tuple)):log.debug(f"List or Tuple: {cls}, {data}")restored_data=[]foritemindata:ifcheck_data_class_field_args_zero(cls):# restore the value to its dataclass typerestored_data.append(dataclass_obj_from_dict(cls.__args__[0],item))elifcheck_if_class_field_args_zero_exists(cls):# Use the original data [Any]restored_data.append(dataclass_obj_from_dict(cls.__args__[0],item))else:restored_data.append(item)returnrestored_dataelifisinstance(data,set):log.debug(f"Set: {cls}, {data}")restored_data=set()foritemindata:ifcheck_data_class_field_args_zero(cls):# restore the value to its dataclass typerestored_data.add(dataclass_obj_from_dict(cls.__args__[0],item))elifcheck_if_class_field_args_zero_exists(cls):# Use the original data [Any]restored_data.add(dataclass_obj_from_dict(cls.__args__[0],item))else:# Use the original data [Any]restored_data.add(item)returnrestored_dataelifisinstance(data,dict):log.debug(f"Dict: {cls}, {data}")forkey,valueindata.items():ifcheck_data_class_field_args_one(cls):# restore the value to its dataclass typedata[key]=dataclass_obj_from_dict(cls.__args__[1],value)elifcheck_if_class_field_args_one_exists(cls):# Use the original data [Any]data[key]=dataclass_obj_from_dict(cls.__args__[1],value)else:# Use the original data [Any]data[key]=valuereturndata# else normal data like int, str, float, etc.else:log.debug(f"Not datclass, or list, or dict: {cls}, use the original data.")returndata
[docs]deffrom_dict_to_json(data:Dict[str,Any],sort_keys:bool=False)->str:r"""Convert a dictionary to a JSON string."""try:returnjson.dumps(data,indent=4,sort_keys=sort_keys)exceptjson.JSONDecodeErrorase:raiseValueError(f"Failed to convert dict to JSON: {e}")
[docs]deffrom_dict_to_yaml(data:Dict[str,Any],sort_keys:bool=False)->str:r"""Convert a dictionary to a YAML string."""try:returnyaml.dump(data,default_flow_style=False,sort_keys=sort_keys)exceptyaml.YAMLErrorase:raiseValueError(f"Failed to convert dict to YAML: {e}")
[docs]deffrom_json_to_dict(json_str:str)->Dict[str,Any]:r"""Convert a JSON string to a dictionary."""try:returnjson.loads(json_str)exceptjson.JSONDecodeErrorase:raiseValueError(f"Failed to convert JSON to dict: {e}")
[docs]deffrom_yaml_to_dict(yaml_str:str)->Dict[str,Any]:r"""Convert a YAML string to a dictionary."""try:returnyaml.safe_load(yaml_str)exceptyaml.YAMLErrorase:raiseValueError(f"Failed to convert YAML to dict: {e}")
[docs]defget_type_schema(type_obj,exclude:ExcludeType=None,type_var_map:Optional[Dict]=None,)->str:"""Retrieve the type name, handling complex and nested types."""origin=get_origin(type_obj)type_var_map=type_var_mapor{}# Replace type variables with their actual types to support Generic[T/To]ifhasattr(type_obj,"__origin__")andtype_obj.__origin__isnotNone:type_obj=type_var_map.get(type_obj.__origin__,type_obj)else:type_obj=type_var_map.get(type_obj,type_obj)iforiginisUnion:# Handle Optional[Type] and other unionsargs=get_args(type_obj)types=[get_type_schema(arg,exclude,type_var_map)forarginargsifargisnottype(None)]return(f"Optional[{types[0]}]"iflen(types)==1elsef"Union[{', '.join(types)}]")eliforiginin{List,list}:args=get_args(type_obj)ifargs:inner_type=get_type_schema(args[0],exclude,type_var_map)returnf"List[{inner_type}]"else:return"List"eliforiginin{Dict,dict}:args=get_args(type_obj)ifargsandlen(args)>=2:key_type=get_type_schema(args[0],exclude,type_var_map)value_type=get_type_schema(args[1],exclude,type_var_map)returnf"Dict[{key_type}, {value_type}]"else:return"Dict"eliforiginin{Set,set}:args=get_args(type_obj)return(f"Set[{get_type_schema(args[0],exclude,type_var_map)}]"ifargselse"Set")eliforiginisSequence:args=get_args(type_obj)return(f"Sequence[{get_type_schema(args[0],exclude,type_var_map)}]"ifargselse"Sequence")eliforiginin{Tuple,tuple}:args=get_args(type_obj)ifargs:returnf"Tuple[{', '.join(get_type_schema(arg,exclude,type_var_map)forarginargs)}]"return"Tuple"elifis_dataclass(type_obj):ifissubclass(type_obj,Enum):# Handle Enum dataclass typesenum_members=", ".join([f"{e.name}={e.value}"foreintype_obj])returnf"Enum[{type_obj.__name__}({enum_members})]"# Recursively handle nested dataclassesoutput=str(get_dataclass_schema(type_obj,exclude,type_var_map))returnoutputelifisinstance(type_obj,type)andissubclass(type_obj,Enum):# Handle Enum typesenum_members=", ".join([f"{e.name}={e.value}"foreintype_obj])returnf"Enum[{type_obj.__name__}({enum_members})]"returntype_obj.__name__ifhasattr(type_obj,"__name__")elsestr(type_obj)
[docs]defget_dataclass_schema(cls,exclude:ExcludeType=None,type_var_map:Optional[Dict]=None,)->Dict[str,Dict[str,object]]:"""Generate a schema dictionary for a dataclass including nested structures. 1. Support customized dataclass with required_field function. 2. Support nested dataclasses, even with generics like List, Dict, etc. 3. Support metadata in the dataclass fields. """ifnotis_dataclass(cls):raiseValueError("Provided class is not a dataclass, please decorate your class with @dataclass")type_var_map=type_var_mapor{}# TODO: Add support for having a description in the dataclassschema={"type":cls.__name__,"properties":{},"required":[],# "description": cls.__doc__ if cls.__doc__ else "",}# get the exclude list for the current classcurrent_exclude=exclude.get(cls.__name__,[])ifexcludeelse[]# handle Combination of Enum and dataclassifissubclass(cls,Enum):schema["type"]=get_type_schema(cls,exclude,type_var_map)returnschemaforfinfields(cls):iff.nameincurrent_exclude:continue# prepare field schema, it weill be done recursively for nested dataclassesfield_type=type_var_map.get(f.type,f.type)field_schema={"type":get_type_schema(field_type,exclude,type_var_map)}# check required fieldis_required=_is_required_field(f)ifis_required:schema["required"].append(f.name)# add metadata to the field schemaiff.metadata:field_schema.update(f.metadata)# handle nested dataclasses and complex typesschema["properties"][f.name]=field_schemareturnschema
def_is_required_field(f:Field)->bool:r"""Determine if the field of dataclass is required or optional. Customized for required_field function."""# Determine if the field is required or optional# Using __name__ to check for function identityiff.defaultisMISSINGand(f.default_factoryisMISSINGor(hasattr(f.default_factory,"__name__")andf.default_factory.__name__=="required_field")):returnTruereturnFalse
[docs]defconvert_schema_to_signature(schema:Dict[str,Dict[str,Any]])->Dict[str,str]:r"""Convert the value from get_data_class_schema to a string description."""signature={}schema_to_use=schema.get("properties",{})required_fields=schema.get("required",[])forfield_name,field_infoinschema_to_use.items():field_signature=field_info.get("desc","")# add type to the signatureiffield_info["type"]:field_signature+=f" ({field_info['type']})"# add required/optional to the signatureiffield_nameinrequired_fields:field_signature+=" (required)"else:field_signature+=" (optional)"signature[field_name]=field_signaturereturnsignature
######################################################################################### For FunctionTool component# It uses get_type_schema and get_dataclass_schema to generate the schema of arguments.########################################################################################
[docs]defget_fun_schema(name:str,func:Callable[...,object])->Dict[str,object]:r"""Get the schema of a function. Support dataclass, Union and normal data types such as int, str, float, etc, list, dict, set. Examples: def example_function(x: int, y: str = "default") -> int: return x schema = get_fun_schema("example_function", example_function) print(json.dumps(schema, indent=4)) # Output: { "type": "object", "properties": { "x": { "type": "int" }, "y": { "type": "str", "default": "default" } }, "required": [ "x" ] } """sig=signature(func)schema={"type":"object","properties":{},"required":[]}type_hints=get_type_hints(func)forparam_name,parameterinsig.parameters.items():param_type=type_hints.get(param_name,"Any")ifparameter.default==Parameter.empty:schema["required"].append(param_name)schema["properties"][param_name]={"type":get_type_schema(param_type)}else:schema["properties"][param_name]={"type":get_type_schema(param_type),"default":parameter.default,}returnschema
# For parse function call for FunctionTool component
[docs]defevaluate_ast_node(node:ast.AST,context_map:Dict[str,Any]=None):""" Recursively evaluates an AST node and returns the corresponding Python object. Args: node (ast.AST): The AST node to evaluate. This node can represent various parts of Python expressions, such as literals, identifiers, lists, dictionaries, and function calls. context_map (Dict[str, Any]): A dictionary that maps variable names to their respective values and functions. This context is used to resolve names and execute functions. Returns: Any: The result of evaluating the node. The type of the returned object depends on the nature of the node: - Constants return their literal value. - Names are looked up in the context_map. - Lists and tuples return their contained values as a list or tuple. - Dictionaries return a dictionary with keys and values evaluated. - Function calls invoke the function with evaluated arguments and return its result. Raises: ValueError: If the node type is unsupported, a ValueError is raised indicating the inability to evaluate the node. """ifisinstance(node,ast.Constant):returnnode.valueelifisinstance(node,ast.Dict):return{evaluate_ast_node(k):evaluate_ast_node(v)fork,vinzip(node.keys,node.values)}elifisinstance(node,ast.List):return[evaluate_ast_node(elem)foreleminnode.elts]elifisinstance(node,ast.Tuple):returntuple(evaluate_ast_node(elem)foreleminnode.elts)elifisinstance(node,ast.UnaryOp)andisinstance(node.op,ast.USub):return-evaluate_ast_node(node.operand,context_map)# unary minuselifisinstance(node,ast.BinOp):# support "multiply(2024-2017, 12)", the "2024-2017" is a "BinOp" nodeleft=evaluate_ast_node(node.left,context_map)right=evaluate_ast_node(node.right,context_map)ifisinstance(node.op,ast.Add):returnleft+rightelifisinstance(node.op,ast.Sub):returnleft-rightelifisinstance(node.op,ast.Mult):returnleft*rightelifisinstance(node.op,ast.Div):returnleft/rightelifisinstance(node.op,ast.Mod):returnleft%rightelifisinstance(node.op,ast.Pow):returnleft**rightelse:log.error(f"Unsupported binary operator: {type(node.op)}")raiseValueError(f"Unsupported binary operator: {type(node.op)}")elifisinstance(node,ast.Name):# variable nametry:output_fun=context_map[node.id]returnoutput_fun# TODO: raise the error back to the caller so that the llm can get the error messageexceptKeyErrorase:log.error(f"Error: {e}, {node.id} does not exist in the context_map.")raiseValueError(f"Error: {e}, {node.id} does not exist in the context_map.")elifisinstance(node,ast.Attribute):# e.g. math.pivalue=evaluate_ast_node(node.value,context_map)returngetattr(value,node.attr)elifisinstance(node,ast.Call):# another fun or class as argument and value, e.g. add( multiply(4,5), 3)func=evaluate_ast_node(node.func,context_map)args=[evaluate_ast_node(arg,context_map)forarginnode.args]kwargs={kw.arg:evaluate_ast_node(kw.value,context_map)forkwinnode.keywords}output=func(*args,**kwargs)ifhasattr(output,"raw_output"):returnoutput.raw_outputreturnoutputelse:# directly evaluate the node# print(f"Unsupported AST node type: {type(node)}")# return eval(compile(ast.Expression(node), filename="<ast>", mode="eval"))log.error(f"Unsupported AST node type: {type(node)}")raiseValueError(f"Unsupported AST node type: {type(node)}")
[docs]defparse_function_call_expr(function_expr:str,context_map:Dict[str,Any]=None)->Tuple[str,List[Any],Dict[str,Any]]:""" Parse a string representing a function call into its components and ensure safe execution by only allowing function calls from a predefined context map. Args: function_expr (str): The string representing the function context_map (Dict[str, Any]): A dictionary that maps variable names to their respective values and functions. This context is used to resolve names and execute functions. """function_expr=function_expr.strip()# detect if it is missing the right parenthesis# if function_expr[-1] != ")":# # add the right parenthesis# function_expr += ")"# Parse the string into an ASTtry:function_expr=extract_function_expression(function_expr)tree=ast.parse(function_expr,mode="eval")ifisinstance(tree.body,ast.Call):# Extract the function namefunc_name=(tree.body.func.idifisinstance(tree.body.func,ast.Name)elseNone)# Prepare the list of arguments and keyword argumentsargs=[evaluate_ast_node(arg,context_map)forargintree.body.args]keywords={kw.arg:evaluate_ast_node(kw.value,context_map)forkwintree.body.keywords}returnfunc_name,args,keywordselse:log.error("Provided string is not a function call.")raiseValueError("Provided string is not a function call.")exceptExceptionase:log.error(f"Error at parse_function_call_expr: {e}")raisee
[docs]defgenerate_function_call_expression_from_callable(func:Callable[...,Any],*args:Any,**kwargs:Any)->str:""" Generate a function call expression string from a callable function and its arguments. Args: func (Callable[..., Any]): The callable function. *args (Any): Positional arguments to be passed to the function. **kwargs (Any): Keyword arguments to be passed to the function. Returns: str: The function call expression string. """func_name=func.__name__args_str=", ".join(repr(arg)forarginargs)kwargs_str=", ".join(f"{k}={repr(v)}"fork,vinkwargs.items())ifargs_strandkwargs_str:full_args_str=f"{args_str}, {kwargs_str}"else:full_args_str=args_strorkwargs_strreturnf"{func_name}({full_args_str})"
# Define a list of safe built-insSAFE_BUILTINS={"abs":abs,"all":all,"any":any,"bin":bin,"bool":bool,"bytearray":bytearray,"bytes":bytes,"callable":callable,"chr":chr,"complex":complex,"dict":dict,"divmod":divmod,"enumerate":enumerate,"filter":filter,"float":float,"format":format,"frozenset":frozenset,"getattr":getattr,"hasattr":hasattr,"hash":hash,"hex":hex,"int":int,"isinstance":isinstance,"issubclass":issubclass,"iter":iter,"len":len,"list":list,"map":map,"max":max,"min":min,"next":next,"object":object,"oct":oct,"ord":ord,"pow":pow,"range":range,"repr":repr,"reversed":reversed,"round":round,"set":set,"slice":slice,"sorted":sorted,"str":str,"sum":sum,"tuple":tuple,"type":type,"zip":zip,}
[docs]defsandbox_exec(code:str,context:Optional[Dict[str,object]]=None,timeout:int=5)->Dict:r"""Execute code in a sandboxed environment with a timeout. 1. Works similar to eval(), but with timeout and context similar to parse_function_call_expr. 2. With more flexibility as you can write additional function in the code compared with simply the function call. Args: code (str): The code to execute. Has to be output=... or similar so that the result can be captured. context (Dict[str, Any]): The context to use for the execution. timeout (int): The execution timeout in seconds. """result={"output":None,"error":None}context={**context,**SAFE_BUILTINS}ifcontextelseSAFE_BUILTINStry:compiled_code=compile(code,"<string>","exec")# Result dictionary to store execution results# Define a target function for the threaddeftarget():try:# Execute the codeexec(compiled_code,context,result)exceptExceptionase:result["error"]=e# Create a thread to execute the codethread=threading.Thread(target=target)thread.start()thread.join(timeout)# Check if the thread is still alive (timed out)ifthread.is_alive():result["error"]=TimeoutError("Execution timed out")raiseTimeoutError("Execution timed out")exceptExceptionase:print(f"Errpr at sandbox_exec: {e}")raiseereturnresult
######################################################################################### For ** component########################################################################################
[docs]defcompose_model_kwargs(default_model_kwargs:Dict,model_kwargs:Dict)->Dict:r"""Add new arguments or overwrite the default arguments with the new arguments. Example: model_kwargs = {"temperature": 0.5, "model": "gpt-3.5-turbo"} self.model_kwargs = {"model": "gpt-3.5"} combine_kwargs(model_kwargs) => {"temperature": 0.5, "model": "gpt-3.5-turbo"} """pass_model_kwargs=default_model_kwargs.copy()ifmodel_kwargs:pass_model_kwargs.update(model_kwargs)returnpass_model_kwargs
######################################################################################### For Tokenizer component########################################################################################VECTOR_TYPE=Union[List[float],np.ndarray]
[docs]defis_normalized(v:VECTOR_TYPE,tol=1e-4)->bool:ifisinstance(v,list):v=np.array(v)# Compute the norm of the vector (assuming v is 1D)norm=np.linalg.norm(v)# Check if the norm is approximately 1returnnp.abs(norm-1)<tol
[docs]defnormalize_np_array(v:np.ndarray)->np.ndarray:# Compute the norm of the vector (assuming v is 1D)norm=np.linalg.norm(v)# Normalize the vectornormalized_v=v/norm# Return the normalized vectorreturnnormalized_v
[docs]defnormalize_vector(v:VECTOR_TYPE)->List[float]:ifisinstance(v,list):v=np.array(v)# Compute the norm of the vector (assuming v is 1D)norm=np.linalg.norm(v)# Normalize the vectornormalized_v=v/norm# Return the normalized vector as a listreturnnormalized_v.tolist()
######################################################################################### For Parser components########################################################################################
[docs]defextract_first_int(text:str)->int:"""Extract the first integer from the provided text. Args: text (str): The text containing potential integer data. Returns: int: The extracted integer. Raises: ValueError: If no integer is found in the text. """match=re.search(r"\b\d+\b",text)ifmatch:returnint(match.group())raiseValueError("No integer found in the text.")
[docs]defextract_first_float(text:str)->float:"""Extract the first float from the provided text. Args: text (str): The text containing potential float data. Returns: float: The extracted float. Raises: ValueError: If no float is found in the text. """match=re.search(r"\b\d+(\.\d+)?\b",text)ifmatch:returnfloat(match.group())raiseValueError("No float found in the text.")
[docs]defextract_first_boolean(text:str)->bool:"""Extract the first boolean from the provided text. Args: text (str): The text containing potential boolean data. Returns: bool: The extracted boolean. Raises: ValueError: If no boolean is found in the text. """match=re.search(r"\b(?:true|false|True|False)\b",text)ifmatch:returnmatch.group().lower()=="true"raiseValueError("No boolean found in the text.")
[docs]defextract_function_expression(text:str,add_missing_right_parenthesis:bool=True)->str:"""Extract function expression from text. It will extract the first function expression found in the text by searching for '('. If the right parenthesis is not found, we add one to the end of the string. Args: text (str): The text containing the potential function expression. add_missing_right_parenthesis (bool): Whether to add a missing right parenthesis if it is missing. Returns: str: The extracted function expression. Raises: ValueError: If no function expression is found or if the function extraction is incomplete without the option to add a missing parenthesis. """text=text.strip()start=text.find("(")ifstart==-1:raiseValueError(f"No function expression found in the text: {text}")brace_count=0end=-1foriinrange(start,len(text)):iftext[i]=="(":brace_count+=1eliftext[i]==")":brace_count-=1ifbrace_count==0:end=ibreakifend==-1andadd_missing_right_parenthesis:# If no closing parenthesis is found, but we are allowed to add onetext+=")"end=len(text)-1elifend==-1:raiseValueError("Incomplete function expression found and add_missing_right_parenthesis is False.")returntext[:end+1]
[docs]defextract_json_str(text:str,add_missing_right_brace:bool=True)->str:"""Extract JSON string from text. It will extract the first JSON object or array found in the text by searching for { or [. If right brace is not found, we add one to the end of the string. Args: text (str): The text containing potential JSON data. add_missing_right_brace (bool): Whether to add a missing right brace if it is missing. Returns: str: The extracted JSON string. Raises: ValueError: If no JSON object or array is found or if the JSON extraction is incomplete without the option to add a missing brace """# NOTE: this regex parsing is taken from langchain.output_parsers.pydantictext=text.strip()start_obj=text.find("{")start_arr=text.find("[")ifstart_obj==-1andstart_arr==-1:raiseValueError(f"No JSON object or array found in the text: {text}")start=min(start_objifstart_obj!=-1elsefloat("inf"),start_arrifstart_arr!=-1elsefloat("inf"),)open_brace=text[start]# Attempt to find the matching closing bracebrace_count=0end=-1foriinrange(start,len(text)):iftext[i]==open_brace:brace_count+=1eliftext[i]==("}"ifopen_brace=="{"else"]"):brace_count-=1ifbrace_count==0:end=ibreakifend==-1andadd_missing_right_brace:# If no closing brace is found, but we are allowed to add onelog.debug("Adding missing right brace to the JSON string.")text+="}"ifopen_brace=="{"else"]"end=len(text)-1elifend==-1:raiseValueError("Incomplete JSON object found and add_missing_right_brace is False.")returntext[start:end+1]
[docs]defextract_list_str(text:str,add_missing_right_bracket:bool=True)->str:"""Extract the first complete list string from the provided text. If the list string is incomplete (missing the closing bracket), an option allows adding a closing bracket at the end. Args: text (str): The text containing potential list data. add_missing_right_bracket (bool): Whether to add a closing bracket if it is missing. Returns: str: The extracted list string. Raises: ValueError: If no list is found or if the list extraction is incomplete without the option to add a missing bracket. """text=text.strip()start=text.find("[")ifstart==-1:log.error("No list found in the text.")# return NoneraiseValueError("No list found in the text.")# Attempt to find the matching closing bracketbracket_count=0end=-1foriinrange(start,len(text)):iftext[i]=="[":bracket_count+=1eliftext[i]=="]":bracket_count-=1ifbracket_count==0:end=ibreakifend==-1andadd_missing_right_bracket:# If no closing bracket is found, but we are allowed to add onetext+="]"end=len(text)-1elifend==-1:log.error("Incomplete list found and add_missing_right_bracket is False.")# return NoneraiseValueError("Incomplete list found and add_missing_right_bracket is False.")returntext[start:end+1]
[docs]defextract_yaml_str(text:str)->str:r"""Extract YAML string from text. .. note:: As yaml string does not have a format like JSON which we can extract from {} or [], it is crucial to have a format such as ```yaml``` or ```yml``` to indicate the start of the yaml string. Args: text (str): The text containing potential YAML data. Returns: str: The extracted YAML string. Raises: ValueError: If no YAML string is found in the text. """try:yaml_re_pattern:re.Pattern=re.compile(r"^```(?:ya?ml)?(?P<yaml>[^`]*)",re.MULTILINE|re.DOTALL)match=yaml_re_pattern.search(text.strip())yaml_str=""ifmatch:yaml_str=match.group("yaml").strip()else:yaml_str=text.strip()returnyaml_strexceptExceptionase:raiseValueError(f"Failed to extract YAML from text: {e}")
[docs]deffix_json_missing_commas(json_str:str)->str:# Example: adding missing commas, only after double quotes# Regular expression to find missing commasregex=r'(?<=[}\]"\'\d])(\s+)(?=[\{"\[])'# Add commas where missingfixed_json_str=re.sub(regex,r",\1",json_str)returnfixed_json_str
[docs]deffix_json_escaped_single_quotes(json_str:str)->str:# First, replace improperly escaped single quotes inside strings# json_str = re.sub(r"(?<!\\)\'", '"', json_str)# Fix escaped single quotesjson_str=json_str.replace(r"\'","'")returnjson_str
[docs]defparse_yaml_str_to_obj(yaml_str:str)->Dict[str,Any]:r"""Parse a YAML string to a Python object. yaml_str: has to be a valid YAML string. """yaml_str=yaml_str.strip()try:importyamlyaml_obj=yaml.safe_load(yaml_str)returnyaml_objexceptyaml.YAMLErrorase:raiseValueError(f"Got invalid YAML object. Error: {e}. Got YAML string: {yaml_str}")exceptNameErrorasexc:raiseImportError("Please pip install PyYAML.")fromexc
[docs]defparse_json_str_to_obj(json_str:str)->Union[Dict[str,Any],List[Any]]:r"""Parse a varietry of json format string to Python object. json_str: has to be a valid JSON string. Either {} or []. """json_str=json_str.strip()# 1st attemp with json.loadstry:json_obj=json.loads(json_str)returnjson_objexceptjson.JSONDecodeErrorase:log.info(f"Got invalid JSON object with json.loads. Error: {e}. Got JSON string: {json_str}")# 2nd attemp after fixing the json stringtry:log.info("Trying to fix potential missing commas...")json_str=fix_json_missing_commas(json_str)log.info("Trying to fix scaped single quotes...")json_str=fix_json_escaped_single_quotes(json_str)log.info(f"Fixed JSON string: {json_str}")json_obj=json.loads(json_str)returnjson_objexceptjson.JSONDecodeError:# 3rd attemp using yamltry:# NOTE: parsing again with pyyaml# pyyaml is less strict, and allows for trailing commas# right now we rely on this since guidance program generates# trailing commaslog.info("Parsing JSON string with PyYAML...")json_obj=yaml.safe_load(json_str)returnjson_objexceptyaml.YAMLErrorase:raiseValueError(f"Got invalid JSON object with yaml.safe_load. Error: {e}. Got JSON string: {json_str}")
######################################################################################### For sampling########################################################################################
[docs]defrandom_sample(dataset:Sequence[T_co],num_shots:int,replace:Optional[bool]=False,weights:Optional[List[float]]=None,delta:float=1e-5,# to avoid zero division)->List[T_co]:r""" Randomly sample num_shots from the dataset. If replace is True, sample with replacement. """dataset_size=len(dataset)ifdataset_size==0:return[]ifnotreplaceandnum_shots>dataset_size:log.debug(f"num_shots {num_shots} is larger than the dataset size {dataset_size}")num_shots=dataset_sizeifweightsisnotNone:weights=np.array(weights)# Add a small delta to all weights to avoid zero probabilitiesweights=weights+deltaifweights.sum()==0:raiseValueError("Sum of weights cannot be zero.")# Normalize weights to sum to 1weights=weights/weights.sum()indices=np.random.choice(len(dataset),size=num_shots,replace=replace,p=weights)return[dataset[i]foriinindices]