Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

StreamPlayer use TextToAudioStream queue instead of engine's one #232

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion RealtimeTTS/engines/base_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def get_stream_info(self):
"The get_stream_info method must be implemented by the derived class."
)

def synthesize(self, text: str) -> bool:
def synthesize(self, text: str, *args, **kwargs) -> bool:
"""
Synthesizes text to audio stream.

Expand Down
27 changes: 18 additions & 9 deletions RealtimeTTS/engines/coqui_engine.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import queue
from .base_engine import BaseEngine
import torch.multiprocessing as mp
from threading import Lock, Thread
Expand Down Expand Up @@ -212,8 +213,10 @@ def __init__(
# Start the worker process
try:
# Only set the start method if it hasn't been set already
# Check the current platform and set the start method
if sys.platform.startswith('linux') or sys.platform == 'darwin': # For Linux or macOS
# Check the current platform and set the start method
if (
sys.platform.startswith("linux") or sys.platform == "darwin"
): # For Linux or macOS
mp.set_start_method("spawn")
elif mp.get_start_method(allow_none=True) is None:
mp.set_start_method("spawn")
Expand Down Expand Up @@ -737,8 +740,15 @@ def get_user_data_dir(appname):
# wait only if we are faster than realtime, meaning
# that chunk_production_seconds is smaller than generated_audio_seconds
if load_balancing:
if chunk_production_seconds < (generated_audio_seconds + load_balancing_buffer_length):
waiting_time = generated_audio_seconds - chunk_production_seconds - load_balancing_cut_off
if chunk_production_seconds < (
generated_audio_seconds
+ load_balancing_buffer_length
):
waiting_time = (
generated_audio_seconds
- chunk_production_seconds
- load_balancing_cut_off
)
if waiting_time > 0:
print(f"Waiting for {waiting_time} seconds")
time.sleep(waiting_time)
Expand All @@ -759,7 +769,6 @@ def get_user_data_dir(appname):
print(f"Realtime Factor: {realtime_factor}")
print(f"Raw Inference Factor: {raw_inference_factor}")


# Send silent audio
sample_rate = config.audio.sample_rate

Expand Down Expand Up @@ -796,7 +805,7 @@ def get_user_data_dir(appname):
print(f"Error: {e}")

conn.send(("error", str(e)))

sys.stdout = sys.__stdout__
sys.stderr = sys.__stderr__

Expand Down Expand Up @@ -908,7 +917,7 @@ def _prepare_text_for_synthesis(self, text: str):
text = text[:-2]
elif len(text) > 3 and text[-2] in ["!", "?", ","]:
text = text[:-2] + " " + text[-2]

except Exception as e:
logging.warning(
f'Error fixing sentence end punctuation: {e}, Text: "{text}"'
Expand All @@ -920,7 +929,7 @@ def _prepare_text_for_synthesis(self, text: str):

return text

def synthesize(self, text: str) -> bool:
def synthesize(self, text: str, audio_queue: queue.Queue) -> bool:
"""
Synthesizes text to audio stream.

Expand All @@ -947,7 +956,7 @@ def synthesize(self, text: str) -> bool:
logging.error(f"Error: {result}")
return False

self.queue.put(result)
audio_queue.put(result)
status, result = self.parent_synthesize_pipe.recv()

return True
Expand Down
20 changes: 13 additions & 7 deletions RealtimeTTS/text_to_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ def __init__(
# Handle the case where engine is a single BaseEngine instance
self.engines = [engine]

self.audio_queue = queue.Queue()

self.load_engine(self.engines[self.engine_index])

def load_engine(self, engine: BaseEngine):
Expand Down Expand Up @@ -127,7 +129,7 @@ def load_engine(self, engine: BaseEngine):
)

self.player = StreamPlayer(
self.engine.queue, config, on_playback_start=self._on_audio_stream_start
self.audio_queue, config, on_playback_start=self._on_audio_stream_start
)

logging.info(f"loaded engine {self.engine.engine_name}")
Expand Down Expand Up @@ -319,7 +321,6 @@ def play(
self.engine.synthesize(self.char_iter)

finally:

try:
if self.player:
self.player.stop()
Expand Down Expand Up @@ -395,7 +396,9 @@ def synthesize_worker():

synthesis_successful = False
if log_synthesized_text:
print(f"\033[96m\033[1m⚡ synthesizing\033[0m \033[37m→ \033[2m'\033[22m{sentence}\033[2m'\033[0m")
print(
f"\033[96m\033[1m⚡ synthesizing\033[0m \033[37m→ \033[2m'\033[22m{sentence}\033[2m'\033[0m"
)

while not synthesis_successful:
try:
Expand All @@ -404,7 +407,9 @@ def synthesize_worker():

if before_sentence_synthesized:
before_sentence_synthesized(sentence)
success = self.engine.synthesize(sentence)
success = self.engine.synthesize(
sentence, audio_queue=self.audio_queue
)
if success:
if on_sentence_synthesized:
on_sentence_synthesized(sentence)
Expand Down Expand Up @@ -486,10 +491,11 @@ def synthesize_worker():
self.wf.close()
self.wf = None

if (len(self.char_iter.items) > 0
if (
len(self.char_iter.items) > 0
and self.char_iter.iterated_text == ""
and not self.char_iter.immediate_stop.is_set()):

and not self.char_iter.immediate_stop.is_set()
):
# new text was feeded while playing audio but after the last character was processed
# we need to start another play() call (!recursively!)
self.play(
Expand Down
4 changes: 2 additions & 2 deletions example_fast_api/async_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@
"openai",
"elevenlabs",
"system",
# "coqui", #multiple queries are not supported on coqui engine right now, comment coqui out for tests where you need server start often
"coqui", # multiple queries are not supported on coqui engine right now, comment coqui out for tests where you need server start often
]

# change start engine by moving engine name
# to the first position in SUPPORTED_ENGINES
START_ENGINE = SUPPORTED_ENGINES[0]
START_ENGINE = SUPPORTED_ENGINES[-1]

BROWSER_IDENTIFIERS = [
"mozilla",
Expand Down
149 changes: 149 additions & 0 deletions example_fast_api/client_multi_queries_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
import uuid
import wave
import requests
import pyaudio
import time
import threading
import argparse
from queue import Queue

# Argument parser setup
parser = argparse.ArgumentParser(description="Run the TTS client.")
parser.add_argument(
"-p",
"--port",
type=int,
default=8000,
help="Port of the TTS server (default: 8000)",
)
parser.add_argument(
"-t",
"--text",
type=str,
default="Hello! This is a default text to speech demo text!",
help="Text to convert to speech (default: 'Hello! This is a default text-to-speech demonstration. Enjoy the experience!')",
)
parser.add_argument(
"-w", "--write", action="store_true", help="Save output to a WAV file"
)
args = parser.parse_args()

port = args.port
text_to_tts = args.text
write_to_file = args.write

# Configuration

SERVER_URL = f"http://127.0.0.1:{port}/tts"
AUDIO_FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 16000 # coqui (24000), azure (16000), openai (22050), system (22050)

# Initialize PyAudio
pyaudio_instance = pyaudio.PyAudio()

# Queue for chunk storage
chunk_queue = Queue()


# Thread function for playing audio
def play_audio():
global start_time
buffer = b""
played_out = False
got_first_chunk = False

frame_size = pyaudio_instance.get_sample_size(AUDIO_FORMAT) * CHANNELS
min_buffer_size = 1024 * 6 # Adjust as needed

# Initial buffering
while len(buffer) < min_buffer_size:
chunk = chunk_queue.get()
if chunk is None:
break
if not got_first_chunk:
got_first_chunk = True
time_to_first_token = time.time() - start_time
print(f"Time to first token: {time_to_first_token}")
buffer += chunk

# Now start playback
while True:
# Write data if buffer has enough frames
if len(buffer) >= frame_size:
num_frames = len(buffer) // frame_size
bytes_to_write = num_frames * frame_size
if not played_out:
played_out = True
time_to_first_token = time.time() - start_time
# print(f"Time to first playout: {time_to_first_token}")
# stream.write(buffer[:bytes_to_write])
buffer = buffer[bytes_to_write:]
else:
# Get more data
chunk = chunk_queue.get()
if chunk is None:
# Write any remaining data
if len(buffer) > 0:
# Truncate buffer to multiple of frame size if necessary
if len(buffer) % frame_size != 0:
buffer = buffer[: -(len(buffer) % frame_size)]

if not played_out:
played_out = True
time_to_first_token = time.time() - start_time
# print(f"Time to first playout: {time_to_first_token}")
# stream.write(buffer)
break
buffer += chunk


# Function to request text-to-speech conversion and retrieve chunks
def request_tts(text):
# Optionally set up WAV file
if write_to_file:
output_wav_file = f"output_audio_{uuid.uuid4()}.wav"
wav_file = wave.open(output_wav_file, "wb")
wav_file.setnchannels(CHANNELS)
wav_file.setsampwidth(pyaudio_instance.get_sample_size(AUDIO_FORMAT))
wav_file.setframerate(RATE)

global start_time
start_time = time.time()
try:
response = requests.get(
SERVER_URL, params={"text": text}, stream=True, timeout=10
)
response.raise_for_status() # Raises an HTTPError if the response status is 4xx/5xx

# Read data as it becomes available
for chunk in response.iter_content(chunk_size=None):
if chunk and write_to_file:
wav_file.writeframes(chunk)
chunk_queue.put(chunk)

# Signal the end of the stream
chunk_queue.put(None)

except requests.exceptions.RequestException as e:
print(f"Error occurred: {e}")
chunk_queue.put(None) # Ensure that playback thread exits gracefully


# Start audio playback thread
playback_thread = threading.Thread(target=play_audio)
playback_thread.start()

request1_thread = threading.Thread(target=request_tts, args=("One, " + text_to_tts,))
request2_thread = threading.Thread(target=request_tts, args=("Two, " + text_to_tts,))

# Retrieve and queue chunks in the main thread
try:
request1_thread.start()
request2_thread.start()

finally:
request1_thread.join()
request2_thread.join()
playback_thread.join()
pyaudio_instance.terminate()