sohojoe commited on
Commit
207e993
·
1 Parent(s): c490c32

remove old respond_to_prompt_actor

Browse files
Files changed (1) hide show
  1. respond_to_prompt_actor.py +0 -188
respond_to_prompt_actor.py DELETED
@@ -1,188 +0,0 @@
1
- import ray
2
- from ray.util.queue import Queue
3
- from dotenv import load_dotenv
4
- from local_speaker_service import LocalSpeakerService
5
- from text_to_speech_service import TextToSpeechService
6
- from chat_service import ChatService
7
- import asyncio
8
- # from ray.actor import ActorHandle
9
- from ffmpeg_converter_actor import FFMpegConverterActor
10
- from agent_response import AgentResponse
11
- from environment_state_actor import EnvironmentStateActor
12
- import json
13
-
14
- @ray.remote
15
- class PromptToLLMActor:
16
- def __init__(
17
- self,
18
- environment_state_actor:EnvironmentStateActor,
19
- input_queue:Queue,
20
- output_queue:Queue):
21
- load_dotenv()
22
- self.input_queue = input_queue
23
- self.output_queue = output_queue
24
- self.chat_service = ChatService()
25
- self.cancel_event = None
26
- self.environment_state_actor = environment_state_actor
27
-
28
- async def run(self):
29
- while True:
30
- prompt = await self.input_queue.get_async()
31
- self.cancel_event = asyncio.Event()
32
- agent_response = AgentResponse(prompt)
33
- async for text, is_complete_sentance in self.chat_service.get_responses_as_sentances_async(prompt, self.cancel_event):
34
- if self.chat_service.ignore_sentence(text):
35
- is_complete_sentance = False
36
- if not is_complete_sentance:
37
- agent_response['llm_preview'] = text
38
- await self.environment_state_actor.set_llm_preview.remote(text)
39
- continue
40
- agent_response['llm_preview'] = ''
41
- agent_response['llm_sentence'] = text
42
- agent_response['llm_sentences'].append(text)
43
- await self.environment_state_actor.add_llm_response_and_clear_llm_preview.remote(text)
44
- print(f"{agent_response['llm_sentence']} id: {agent_response['llm_sentence_id']} from prompt: {agent_response['prompt']}")
45
- sentence_response = agent_response.make_copy()
46
- await self.output_queue.put_async(sentence_response)
47
- agent_response['llm_sentence_id'] += 1
48
-
49
- async def cancel(self):
50
- if self.cancel_event:
51
- self.cancel_event.set()
52
- while not self.input_queue.empty():
53
- await self.input_queue.get_async()
54
- while not self.output_queue.empty():
55
- await self.output_queue.get_async()
56
-
57
- @ray.remote
58
- class LLMSentanceToSpeechActor:
59
- def __init__(
60
- self,
61
- environment_state_actor:EnvironmentStateActor,
62
- input_queue,
63
- output_queue,
64
- voice_id):
65
- load_dotenv()
66
- self.input_queue = input_queue
67
- self.output_queue = output_queue
68
- self.tts_service = TextToSpeechService(voice_id=voice_id)
69
- self.cancel_event = None
70
- self.environment_state_actor = environment_state_actor
71
-
72
- async def run(self):
73
- while True:
74
- sentence_response = await self.input_queue.get_async()
75
- self.cancel_event = asyncio.Event()
76
- chunk_count = 0
77
- async for chunk_response in self.tts_service.get_speech_chunks_async(sentence_response, self.cancel_event):
78
- chunk_response = chunk_response.make_copy()
79
- await self.output_queue.put_async(chunk_response)
80
- chunk_response = {
81
- 'prompt': sentence_response['prompt'],
82
- 'llm_sentence_id': sentence_response['llm_sentence_id'],
83
- 'chunk_count': chunk_count,
84
- }
85
- chunk_id_json = json.dumps(chunk_response)
86
- await self.environment_state_actor.add_tts_raw_chunk_id.remote(chunk_id_json)
87
- chunk_count += 1
88
-
89
- async def cancel(self):
90
- if self.cancel_event:
91
- self.cancel_event.set()
92
- while not self.input_queue.empty():
93
- await self.input_queue.get_async()
94
- while not self.output_queue.empty():
95
- await self.output_queue.get_async()
96
-
97
-
98
- # legacy code for playing from local speaker
99
- # @ray.remote
100
- # class SpeechToSpeakerActor:
101
- # def __init__(self, input_queue, voice_id):
102
- # load_dotenv()
103
- # self.input_queue = input_queue
104
- # self.speaker_service = LocalSpeakerService()
105
-
106
- # async def run(self):
107
- # while True:
108
- # audio_chunk = await self.input_queue.get_async()
109
- # # print (f"Got audio chunk {len(audio_chunk)}")
110
- # self.speaker_service.add_audio_stream([audio_chunk])
111
-
112
- # async def cancel(self):
113
- # while not self.input_queue.empty():
114
- # await self.input_queue.get_async()
115
-
116
- @ray.remote
117
- class SpeechToConverterActor:
118
- def __init__(
119
- self,
120
- input_queue:Queue,
121
- ffmpeg_converter_actor:FFMpegConverterActor):
122
- load_dotenv()
123
- self.input_queue = input_queue
124
- self.ffmpeg_converter_actor = ffmpeg_converter_actor
125
-
126
- async def run(self):
127
- await self.ffmpeg_converter_actor.start_process.remote()
128
- self.ffmpeg_converter_actor.run.remote()
129
- while True:
130
- chunk_response = await self.input_queue.get_async()
131
- audio_chunk_ref = chunk_response['tts_raw_chunk_ref']
132
- audio_chunk = ray.get(audio_chunk_ref)
133
- await self.ffmpeg_converter_actor.push_chunk.remote(audio_chunk)
134
-
135
- async def cancel(self):
136
- while not self.input_queue.empty():
137
- await self.input_queue.get_async()
138
-
139
-
140
- @ray.remote
141
- class RespondToPromptActor:
142
- def __init__(
143
- self,
144
- environment_state_actor:EnvironmentStateActor,
145
- audio_output_queue):
146
- voice_id="2OviOUQc1JsQRQgNkVBj"
147
- self.prompt_queue = Queue(maxsize=100)
148
- self.llm_sentence_queue = Queue(maxsize=100)
149
- self.speech_chunk_queue = Queue(maxsize=100)
150
- self.environment_state_actor = environment_state_actor
151
-
152
- self.ffmpeg_converter_actor = FFMpegConverterActor.remote(audio_output_queue)
153
-
154
- self.prompt_to_llm = PromptToLLMActor.remote(
155
- self.environment_state_actor,
156
- self.prompt_queue,
157
- self.llm_sentence_queue)
158
- self.llm_sentence_to_speech = LLMSentanceToSpeechActor.remote(
159
- self.environment_state_actor,
160
- self.llm_sentence_queue,
161
- self.speech_chunk_queue,
162
- voice_id)
163
- # self.speech_output = SpeechToSpeakerActor.remote(self.speech_chunk_queue, voice_id)
164
- self.speech_output = SpeechToConverterActor.remote(
165
- self.speech_chunk_queue,
166
- self.ffmpeg_converter_actor)
167
-
168
- # Start the pipeline components.
169
- self.prompt_to_llm.run.remote()
170
- self.llm_sentence_to_speech.run.remote()
171
- self.speech_output.run.remote()
172
-
173
- async def enqueue_prompt(self, prompt):
174
- print("flush anything queued")
175
- prompt_to_llm_future = self.prompt_to_llm.cancel.remote()
176
- llm_sentence_to_speech_future = self.llm_sentence_to_speech.cancel.remote()
177
- speech_output_future = self.speech_output.cancel.remote()
178
- ffmpeg_converter_future = self.ffmpeg_converter_actor.flush_output_queue.remote()
179
- await asyncio.gather(
180
- prompt_to_llm_future,
181
- llm_sentence_to_speech_future,
182
- speech_output_future,
183
- ffmpeg_converter_future,
184
- )
185
- if len(prompt) > 0: # handles case where we just want to flush
186
- await self.prompt_queue.put_async(prompt)
187
- print("Enqueued prompt")
188
-