Spaces:
Running
Running
File size: 8,880 Bytes
d8d14f1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 |
import concurrent.futures
from typing import Callable, Any, Dict, List
from swarms.utils.loguru_logger import initialize_logger
logger = initialize_logger(log_folder="func_calling_executor")
# def openai_tool_executor(
# tools: List[Dict[str, Any]],
# function_map: Dict[str, Callable],
# verbose: bool = True,
# return_as_string: bool = False,
# *args,
# **kwargs,
# ) -> Callable:
# """
# Creates a function that dynamically and concurrently executes multiple functions based on parameters specified
# in a list of tool dictionaries, with extensive error handling and validation.
# Args:
# tools (List[Dict[str, Any]]): A list of dictionaries, each containing configuration for a tool, including parameters.
# function_map (Dict[str, Callable]): A dictionary mapping function names to their corresponding callable functions.
# verbose (bool): If True, enables verbose logging.
# return_as_string (bool): If True, returns the results as a concatenated string.
# Returns:
# Callable: A function that, when called, executes the specified functions concurrently with the parameters given.
# Examples:
# >>> def test_function(param1: int, param2: str) -> str:
# ... return f"Test function called with parameters: {param1}, {param2}"
# >>> tool_executor = openai_tool_executor(
# ... tools=[
# ... {
# ... "type": "function",
# ... "function": {
# ... "name": "test_function",
# ... "parameters": {
# ... "param1": 1,
# ... "param2": "example"
# ... }
# ... }
# ... }
# ... ],
# ... function_map={
# ... "test_function": test_function
# ... },
# ... return_as_string=True
# ... )
# >>> results = tool_executor()
# >>> print(results)
# """
# def tool_executor():
# # Prepare tasks for concurrent execution
# results = []
# logger.info(f"Executing {len(tools)} tools concurrently.")
# with concurrent.futures.ThreadPoolExecutor() as executor:
# futures = []
# for tool in tools:
# if tool.get("type") != "function":
# continue # Skip non-function tool entries
# function_info = tool.get("function", {})
# func_name = function_info.get("name")
# logger.info(f"Executing function: {func_name}")
# # Check if the function name is mapped to an actual function
# if func_name not in function_map:
# error_message = f"Function '{func_name}' not found in function map."
# logger.error(error_message)
# results.append(error_message)
# continue
# # Validate parameters
# params = function_info.get("parameters", {})
# if not params:
# error_message = f"No parameters specified for function '{func_name}'."
# logger.error(error_message)
# results.append(error_message)
# continue
# # Submit the function for execution
# try:
# future = executor.submit(
# function_map[func_name], **params
# )
# futures.append((func_name, future))
# except Exception as e:
# error_message = f"Failed to submit the function '{func_name}' for execution: {e}"
# logger.error(error_message)
# results.append(error_message)
# # Gather results from all futures
# for func_name, future in futures:
# try:
# result = future.result() # Collect result from future
# results.append(f"{func_name}: {result}")
# except Exception as e:
# error_message = f"Error during execution of function '{func_name}': {e}"
# logger.error(error_message)
# results.append(error_message)
# if return_as_string:
# return "\n".join(results)
# logger.info(f"Results: {results}")
# return results
# return tool_executor
def openai_tool_executor(
tools: List[Dict[str, Any]],
function_map: Dict[str, Callable],
verbose: bool = True,
return_as_string: bool = False,
*args,
**kwargs,
) -> Callable:
def tool_executor():
results = []
logger.info(f"Executing {len(tools)} tools concurrently.")
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
for tool in tools:
if tool.get("type") != "function":
continue
function_info = tool.get("function", {})
func_name = function_info.get("name")
logger.info(f"Executing function: {func_name}")
if func_name not in function_map:
error_message = f"Function '{func_name}' not found in function map."
logger.error(error_message)
results.append(error_message)
continue
params = function_info.get("parameters", {})
if not params:
error_message = f"No parameters specified for function '{func_name}'."
logger.error(error_message)
results.append(error_message)
continue
if (
"name" in params
and params["name"] in function_map
):
try:
result = function_map[params["name"]](
**params
)
results.append(f"{params['name']}: {result}")
except Exception as e:
error_message = f"Failed to execute the function '{params['name']}': {e}"
logger.error(error_message)
results.append(error_message)
continue
try:
future = executor.submit(
function_map[func_name], **params
)
futures.append((func_name, future))
except Exception as e:
error_message = f"Failed to submit the function '{func_name}' for execution: {e}"
logger.error(error_message)
results.append(error_message)
for func_name, future in futures:
try:
result = future.result()
results.append(f"{func_name}: {result}")
except Exception as e:
error_message = f"Error during execution of function '{func_name}': {e}"
logger.error(error_message)
results.append(error_message)
if return_as_string:
return "\n".join(results)
logger.info(f"Results: {results}")
return results
return tool_executor
# function_schema = {
# "name": "execute",
# "description": "Executes code on the user's machine **in the users local environment** and returns the output",
# "parameters": {
# "type": "object",
# "properties": {
# "language": {
# "type": "string",
# "description": "The programming language (required parameter to the `execute` function)",
# "enum": [
# # This will be filled dynamically with the languages OI has access to.
# ],
# },
# "code": {
# "type": "string",
# "description": "The code to execute (required)",
# },
# },
# "required": ["language", "code"],
# },
# }
# def execute(language: str, code: str):
# """
# Executes code on the user's machine **in the users local environment** and returns the output
# Args:
# language (str): The programming language (required parameter to the `execute` function)
# code (str): The code to execute (required)
# Returns:
# str: The output of the code execution
# """
# # This function will be implemented by the user
# return "Code execution not implemented yet"
# # Example execution
# out = openai_tool_executor(
# tools=[function_schema],
# function_map={
# "execute": execute,
# },
# return_as_string=True,
# )
# print(out)
|