sohojoe commited on
Commit
b6ba8eb
·
1 Parent(s): 3e4f32c

refactor - move ffmpeg_converter_actor to within respond_to_prompt_actor

Browse files
charles_actor.py CHANGED
@@ -21,32 +21,25 @@ class CharlesActor:
21
 
22
  async def _initalize_resources(self):
23
  # Initialize resources
24
- print("000")
25
  from streamlit_av_queue import StreamlitAVQueue
26
  self._streamlit_av_queue = StreamlitAVQueue()
27
  self._out_audio_queue = self._streamlit_av_queue.get_out_audio_queue()
28
 
29
- print("001")
30
- from ffmpeg_converter_actor import FFMpegConverterActor
31
- self._ffmpeg_converter_actor = FFMpegConverterActor.remote(self._out_audio_queue)
32
- await self._ffmpeg_converter_actor.start_process.remote()
33
- self._ffmpeg_converter_actor.run.remote()
34
 
35
- print("002")
36
  from speech_to_text_vosk_actor import SpeechToTextVoskActor
37
  self._speech_to_text_actor = SpeechToTextVoskActor.remote()
38
-
39
- print("003")
40
- from respond_to_prompt_actor import RespondToPromptActor
41
- self._respond_to_prompt_actor = RespondToPromptActor.remote(self._ffmpeg_converter_actor)
42
 
43
  self._debug_queue = [
44
  # "hello, how are you today?",
45
  # "hmm, interesting, tell me more about that.",
46
  ]
47
 
48
- print("004")
49
- print("creating prototypes")
50
  from prototypes import Prototypes
51
  self._prototypes = Prototypes()
52
  print("010")
@@ -85,7 +78,7 @@ class CharlesActor:
85
  if len(process_speech_to_text_future) > 0:
86
  ready, _ = ray.wait([process_speech_to_text_future[0]], timeout=0)
87
  if ready:
88
- prompt, speaker_finished = await process_speech_to_text_future[0]
89
  del process_speech_to_text_future[0]
90
 
91
  if speaker_finished and len(prompt) > 0:
 
21
 
22
  async def _initalize_resources(self):
23
  # Initialize resources
24
+ print("000 - create StreamlitAVQueue")
25
  from streamlit_av_queue import StreamlitAVQueue
26
  self._streamlit_av_queue = StreamlitAVQueue()
27
  self._out_audio_queue = self._streamlit_av_queue.get_out_audio_queue()
28
 
29
+ print("001 - create RespondToPromptActor")
30
+ from respond_to_prompt_actor import RespondToPromptActor
31
+ self._respond_to_prompt_actor = RespondToPromptActor.remote(self._out_audio_queue)
 
 
32
 
33
+ print("002 - create SpeechToTextVoskActor")
34
  from speech_to_text_vosk_actor import SpeechToTextVoskActor
35
  self._speech_to_text_actor = SpeechToTextVoskActor.remote()
 
 
 
 
36
 
37
  self._debug_queue = [
38
  # "hello, how are you today?",
39
  # "hmm, interesting, tell me more about that.",
40
  ]
41
 
42
+ print("003 - create Prototypes")
 
43
  from prototypes import Prototypes
44
  self._prototypes = Prototypes()
45
  print("010")
 
78
  if len(process_speech_to_text_future) > 0:
79
  ready, _ = ray.wait([process_speech_to_text_future[0]], timeout=0)
80
  if ready:
81
+ prompt, speaker_finished, raw_json = await process_speech_to_text_future[0]
82
  del process_speech_to_text_future[0]
83
 
84
  if speaker_finished and len(prompt) > 0:
respond_to_prompt_actor.py CHANGED
@@ -60,22 +60,23 @@ class LLMSentanceToSpeechActor:
60
  await self.output_queue.get_async()
61
 
62
 
63
- @ray.remote
64
- class SpeechToSpeakerActor:
65
- def __init__(self, input_queue, voice_id):
66
- load_dotenv()
67
- self.input_queue = input_queue
68
- self.speaker_service = LocalSpeakerService()
 
69
 
70
- async def run(self):
71
- while True:
72
- audio_chunk = await self.input_queue.get_async()
73
- # print (f"Got audio chunk {len(audio_chunk)}")
74
- self.speaker_service.add_audio_stream([audio_chunk])
75
 
76
- async def cancel(self):
77
- while not self.input_queue.empty():
78
- await self.input_queue.get_async()
79
 
80
  @ray.remote
81
  class SpeechToConverterActor:
@@ -85,6 +86,8 @@ class SpeechToConverterActor:
85
  self.ffmpeg_converter_actor = ffmpeg_converter_actor
86
 
87
  async def run(self):
 
 
88
  while True:
89
  audio_chunk = await self.input_queue.get_async()
90
  # print (f"Got audio chunk {len(audio_chunk)}")
@@ -97,17 +100,18 @@ class SpeechToConverterActor:
97
 
98
  @ray.remote
99
  class RespondToPromptActor:
100
- def __init__(self, ffmpeg_converter_actor):
101
  voice_id="2OviOUQc1JsQRQgNkVBj"
102
  self.prompt_queue = Queue(maxsize=100)
103
  self.llm_sentence_queue = Queue(maxsize=100)
104
  self.speech_chunk_queue = Queue(maxsize=100)
105
- self.ffmepg_converter_actor = ffmpeg_converter_actor
 
106
 
107
  self.prompt_to_llm = PromptToLLMActor.remote(self.prompt_queue, self.llm_sentence_queue)
108
  self.llm_sentence_to_speech = LLMSentanceToSpeechActor.remote(self.llm_sentence_queue, self.speech_chunk_queue, voice_id)
109
  # self.speech_output = SpeechToSpeakerActor.remote(self.speech_chunk_queue, voice_id)
110
- self.speech_output = SpeechToConverterActor.remote(self.speech_chunk_queue, ffmpeg_converter_actor)
111
 
112
  # Start the pipeline components.
113
  self.prompt_to_llm.run.remote()
@@ -119,7 +123,7 @@ class RespondToPromptActor:
119
  prompt_to_llm_future = self.prompt_to_llm.cancel.remote()
120
  llm_sentence_to_speech_future = self.llm_sentence_to_speech.cancel.remote()
121
  speech_output_future = self.speech_output.cancel.remote()
122
- ffmpeg_converter_future = self.ffmepg_converter_actor.flush_output_queue.remote()
123
  await asyncio.gather(
124
  prompt_to_llm_future,
125
  llm_sentence_to_speech_future,
 
60
  await self.output_queue.get_async()
61
 
62
 
63
+ # legacy code for playing from local speaker
64
+ # @ray.remote
65
+ # class SpeechToSpeakerActor:
66
+ # def __init__(self, input_queue, voice_id):
67
+ # load_dotenv()
68
+ # self.input_queue = input_queue
69
+ # self.speaker_service = LocalSpeakerService()
70
 
71
+ # async def run(self):
72
+ # while True:
73
+ # audio_chunk = await self.input_queue.get_async()
74
+ # # print (f"Got audio chunk {len(audio_chunk)}")
75
+ # self.speaker_service.add_audio_stream([audio_chunk])
76
 
77
+ # async def cancel(self):
78
+ # while not self.input_queue.empty():
79
+ # await self.input_queue.get_async()
80
 
81
  @ray.remote
82
  class SpeechToConverterActor:
 
86
  self.ffmpeg_converter_actor = ffmpeg_converter_actor
87
 
88
  async def run(self):
89
+ await self.ffmpeg_converter_actor.start_process.remote()
90
+ self.ffmpeg_converter_actor.run.remote()
91
  while True:
92
  audio_chunk = await self.input_queue.get_async()
93
  # print (f"Got audio chunk {len(audio_chunk)}")
 
100
 
101
  @ray.remote
102
  class RespondToPromptActor:
103
+ def __init__(self, out_audio_queue):
104
  voice_id="2OviOUQc1JsQRQgNkVBj"
105
  self.prompt_queue = Queue(maxsize=100)
106
  self.llm_sentence_queue = Queue(maxsize=100)
107
  self.speech_chunk_queue = Queue(maxsize=100)
108
+
109
+ self.ffmpeg_converter_actor = FFMpegConverterActor.remote(out_audio_queue)
110
 
111
  self.prompt_to_llm = PromptToLLMActor.remote(self.prompt_queue, self.llm_sentence_queue)
112
  self.llm_sentence_to_speech = LLMSentanceToSpeechActor.remote(self.llm_sentence_queue, self.speech_chunk_queue, voice_id)
113
  # self.speech_output = SpeechToSpeakerActor.remote(self.speech_chunk_queue, voice_id)
114
+ self.speech_output = SpeechToConverterActor.remote(self.speech_chunk_queue, self.ffmpeg_converter_actor)
115
 
116
  # Start the pipeline components.
117
  self.prompt_to_llm.run.remote()
 
123
  prompt_to_llm_future = self.prompt_to_llm.cancel.remote()
124
  llm_sentence_to_speech_future = self.llm_sentence_to_speech.cancel.remote()
125
  speech_output_future = self.speech_output.cancel.remote()
126
+ ffmpeg_converter_future = self.ffmpeg_converter_actor.flush_output_queue.remote()
127
  await asyncio.gather(
128
  prompt_to_llm_future,
129
  llm_sentence_to_speech_future,
speech_to_text_vosk_actor.py CHANGED
@@ -33,7 +33,7 @@ class SpeechToTextVoskActor:
33
  result = self.vosk.PartialResult()
34
  result_json = json.loads(result)
35
  text = result_json['partial']
36
- return text, speaker_finished
37
 
38
 
39
  def add_speech_bytes(self, data: bytearray):
 
33
  result = self.vosk.PartialResult()
34
  result_json = json.loads(result)
35
  text = result_json['partial']
36
+ return text, speaker_finished, result_json
37
 
38
 
39
  def add_speech_bytes(self, data: bytearray):