File size: 3,542 Bytes
d8d14f1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import asyncio
import multiprocessing as mp
import time
from functools import partial
from typing import Any, Dict, Union


class HighSpeedExecutor:
    def __init__(self, num_processes: int = None):
        """
        Initialize the executor with configurable number of processes.
        If num_processes is None, it uses CPU count.
        """
        self.num_processes = num_processes or mp.cpu_count()

    async def _worker(
        self,
        queue: asyncio.Queue,
        func: Any,
        *args: Any,
        **kwargs: Any,
    ):
        """Async worker that processes tasks from the queue"""
        while True:
            try:
                # Non-blocking get from queue
                await queue.get()
                await asyncio.get_event_loop().run_in_executor(
                    None, partial(func, *args, **kwargs)
                )
                queue.task_done()
            except asyncio.CancelledError:
                break

    async def _distribute_tasks(
        self, num_tasks: int, queue: asyncio.Queue
    ):
        """Distribute tasks across the queue"""
        for i in range(num_tasks):
            await queue.put(i)

    async def execute_batch(
        self,
        func: Any,
        num_executions: int,
        *args: Any,
        **kwargs: Any,
    ) -> Dict[str, Union[int, float]]:
        """
        Execute the given function multiple times concurrently.

        Args:
            func: The function to execute
            num_executions: Number of times to execute the function
            *args, **kwargs: Arguments to pass to the function

        Returns:
            A dictionary containing the number of executions, duration, and executions per second.
        """
        queue = asyncio.Queue()

        # Create worker tasks
        workers = [
            asyncio.create_task(
                self._worker(queue, func, *args, **kwargs)
            )
            for _ in range(self.num_processes)
        ]

        # Start timing
        start_time = time.perf_counter()

        # Distribute tasks
        await self._distribute_tasks(num_executions, queue)

        # Wait for all tasks to complete
        await queue.join()

        # Cancel workers
        for worker in workers:
            worker.cancel()

        # Wait for all workers to finish
        await asyncio.gather(*workers, return_exceptions=True)

        end_time = time.perf_counter()
        duration = end_time - start_time

        return {
            "executions": num_executions,
            "duration": duration,
            "executions_per_second": num_executions / duration,
        }

    def run(
        self,
        func: Any,
        num_executions: int,
        *args: Any,
        **kwargs: Any,
    ):
        return asyncio.run(
            self.execute_batch(func, num_executions, *args, **kwargs)
        )


# def example_function(x: int = 0) -> int:
#     """Example function to execute"""
#     return x * x


# async def main():
#     # Create executor with number of CPU cores
#     executor = HighSpeedExecutor()

#     # Execute the function 1000 times
#     result = await executor.execute_batch(
#         example_function, num_executions=1000, x=42
#     )

#     print(
#         f"Completed {result['executions']} executions in {result['duration']:.2f} seconds"
#     )
#     print(
#         f"Rate: {result['executions_per_second']:.2f} executions/second"
#     )


# if __name__ == "__main__":
#     # Run the async main function
#     asyncio.run(main())