File size: 8,880 Bytes
d8d14f1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
import concurrent.futures
from typing import Callable, Any, Dict, List
from swarms.utils.loguru_logger import initialize_logger

logger = initialize_logger(log_folder="func_calling_executor")

# def openai_tool_executor(
#     tools: List[Dict[str, Any]],
#     function_map: Dict[str, Callable],
#     verbose: bool = True,
#     return_as_string: bool = False,
#     *args,
#     **kwargs,
# ) -> Callable:
#     """
#     Creates a function that dynamically and concurrently executes multiple functions based on parameters specified
#     in a list of tool dictionaries, with extensive error handling and validation.

#     Args:
#         tools (List[Dict[str, Any]]): A list of dictionaries, each containing configuration for a tool, including parameters.
#         function_map (Dict[str, Callable]): A dictionary mapping function names to their corresponding callable functions.
#         verbose (bool): If True, enables verbose logging.
#         return_as_string (bool): If True, returns the results as a concatenated string.

#     Returns:
#         Callable: A function that, when called, executes the specified functions concurrently with the parameters given.

#     Examples:
#     >>> def test_function(param1: int, param2: str) -> str:
#     ...     return f"Test function called with parameters: {param1}, {param2}"

#     >>> tool_executor = openai_tool_executor(
#     ...     tools=[
#     ...         {
#     ...             "type": "function",
#     ...             "function": {
#     ...                 "name": "test_function",
#     ...                 "parameters": {
#     ...                     "param1": 1,
#     ...                     "param2": "example"
#     ...                 }
#     ...             }
#     ...         }
#     ...     ],
#     ...     function_map={
#     ...         "test_function": test_function
#     ...     },
#     ...     return_as_string=True
#     ... )
#     >>> results = tool_executor()
#     >>> print(results)
#     """

#     def tool_executor():
#         # Prepare tasks for concurrent execution
#         results = []
#         logger.info(f"Executing {len(tools)} tools concurrently.")
#         with concurrent.futures.ThreadPoolExecutor() as executor:
#             futures = []
#             for tool in tools:
#                 if tool.get("type") != "function":
#                     continue  # Skip non-function tool entries

#                 function_info = tool.get("function", {})
#                 func_name = function_info.get("name")
#                 logger.info(f"Executing function: {func_name}")

#                 # Check if the function name is mapped to an actual function
#                 if func_name not in function_map:
#                     error_message = f"Function '{func_name}' not found in function map."
#                     logger.error(error_message)
#                     results.append(error_message)
#                     continue

#                 # Validate parameters
#                 params = function_info.get("parameters", {})
#                 if not params:
#                     error_message = f"No parameters specified for function '{func_name}'."
#                     logger.error(error_message)
#                     results.append(error_message)
#                     continue

#                 # Submit the function for execution
#                 try:
#                     future = executor.submit(
#                         function_map[func_name], **params
#                     )
#                     futures.append((func_name, future))
#                 except Exception as e:
#                     error_message = f"Failed to submit the function '{func_name}' for execution: {e}"
#                     logger.error(error_message)
#                     results.append(error_message)

#             # Gather results from all futures
#             for func_name, future in futures:
#                 try:
#                     result = future.result()  # Collect result from future
#                     results.append(f"{func_name}: {result}")
#                 except Exception as e:
#                     error_message = f"Error during execution of function '{func_name}': {e}"
#                     logger.error(error_message)
#                     results.append(error_message)

#         if return_as_string:
#             return "\n".join(results)

#         logger.info(f"Results: {results}")

#         return results

#     return tool_executor


def openai_tool_executor(
    tools: List[Dict[str, Any]],
    function_map: Dict[str, Callable],
    verbose: bool = True,
    return_as_string: bool = False,
    *args,
    **kwargs,
) -> Callable:
    def tool_executor():
        results = []
        logger.info(f"Executing {len(tools)} tools concurrently.")
        with concurrent.futures.ThreadPoolExecutor() as executor:
            futures = []
            for tool in tools:
                if tool.get("type") != "function":
                    continue

                function_info = tool.get("function", {})
                func_name = function_info.get("name")
                logger.info(f"Executing function: {func_name}")

                if func_name not in function_map:
                    error_message = f"Function '{func_name}' not found in function map."
                    logger.error(error_message)
                    results.append(error_message)
                    continue

                params = function_info.get("parameters", {})
                if not params:
                    error_message = f"No parameters specified for function '{func_name}'."
                    logger.error(error_message)
                    results.append(error_message)
                    continue

                if (
                    "name" in params
                    and params["name"] in function_map
                ):
                    try:
                        result = function_map[params["name"]](
                            **params
                        )
                        results.append(f"{params['name']}: {result}")
                    except Exception as e:
                        error_message = f"Failed to execute the function '{params['name']}': {e}"
                        logger.error(error_message)
                        results.append(error_message)
                    continue

                try:
                    future = executor.submit(
                        function_map[func_name], **params
                    )
                    futures.append((func_name, future))
                except Exception as e:
                    error_message = f"Failed to submit the function '{func_name}' for execution: {e}"
                    logger.error(error_message)
                    results.append(error_message)

            for func_name, future in futures:
                try:
                    result = future.result()
                    results.append(f"{func_name}: {result}")
                except Exception as e:
                    error_message = f"Error during execution of function '{func_name}': {e}"
                    logger.error(error_message)
                    results.append(error_message)

        if return_as_string:
            return "\n".join(results)

        logger.info(f"Results: {results}")

        return results

    return tool_executor


# function_schema = {
#     "name": "execute",
#     "description": "Executes code on the user's machine **in the users local environment** and returns the output",
#     "parameters": {
#         "type": "object",
#         "properties": {
#             "language": {
#                 "type": "string",
#                 "description": "The programming language (required parameter to the `execute` function)",
#                 "enum": [
#                     # This will be filled dynamically with the languages OI has access to.
#                 ],
#             },
#             "code": {
#                 "type": "string",
#                 "description": "The code to execute (required)",
#             },
#         },
#         "required": ["language", "code"],
#     },
# }


# def execute(language: str, code: str):
#     """
#     Executes code on the user's machine **in the users local environment** and returns the output

#     Args:
#         language (str): The programming language (required parameter to the `execute` function)
#         code (str): The code to execute (required)

#     Returns:
#         str: The output of the code execution
#     """
#     # This function will be implemented by the user
#     return "Code execution not implemented yet"


# # Example execution
# out = openai_tool_executor(
#     tools=[function_schema],
#     function_map={
#         "execute": execute,
#     },
#     return_as_string=True,
# )
# print(out)