From 27cef7cd702d50dc11054bf2a0eff4d4fcfde293 Mon Sep 17 00:00:00 2001 From: Chad Bailey Date: Thu, 4 Apr 2024 20:45:23 +0000 Subject: [PATCH] add endframe to transport receive queue --- src/dailyai/transports/threaded_transport.py | 45 ++++++++++++-------- 1 file changed, 28 insertions(+), 17 deletions(-) diff --git a/src/dailyai/transports/threaded_transport.py b/src/dailyai/transports/threaded_transport.py index 037ba8f10..5e63dbfda 100644 --- a/src/dailyai/transports/threaded_transport.py +++ b/src/dailyai/transports/threaded_transport.py @@ -32,12 +32,15 @@ def int2float(sound): - abs_max = np.abs(sound).max() - sound = sound.astype("float32") - if abs_max > 0: - sound *= 1 / 32768 - sound = sound.squeeze() # depends on the use case - return sound + try: + abs_max = np.abs(sound).max() + sound = sound.astype("float32") + if abs_max > 0: + sound *= 1 / 32768 + sound = sound.squeeze() # depends on the use case + return sound + except ValueError: + return sound class VADState(Enum): @@ -266,17 +269,21 @@ def _prerun(self): pass def _silero_vad_analyze(self): - audio_chunk = self.read_audio_frames(self._vad_samples) - audio_int16 = np.frombuffer(audio_chunk, np.int16) - audio_float32 = int2float(audio_int16) - new_confidence = self.model( - torch.from_numpy(audio_float32), 16000).item() - # yeses = int(new_confidence * 20.0) - # nos = 20 - yeses - # out = "!" * yeses + "." * nos - # print(f"!!! confidence: {out}") - speaking = new_confidence > 0.5 - return speaking + try: + audio_chunk = self.read_audio_frames(self._vad_samples) + audio_int16 = np.frombuffer(audio_chunk, np.int16) + audio_float32 = int2float(audio_int16) + new_confidence = self.model( + torch.from_numpy(audio_float32), 16000).item() + # yeses = int(new_confidence * 20.0) + # nos = 20 - yeses + # out = "!" * yeses + "." * nos + # print(f"!!! confidence: {out}") + speaking = new_confidence > 0.5 + return speaking + except BaseException: + # This comes from an empty audio array + return False def _vad(self): @@ -426,6 +433,10 @@ def _frame_consumer(self): asyncio.run_coroutine_threadsafe( self.completed_queue.put(frame), self._loop ) + # Also send the EndFrame to the pipeline so it can stop + asyncio.run_coroutine_threadsafe( + self.receive_queue.put(frame), self._loop + ) return # if interrupted, we just pull frames off the queue and