Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve websocket based TTS service reconnection logic #962

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 22 additions & 9 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- Added a new `WebsocketService` based class for TTS services, containing
base functions and retry logic.

- Added a new foundational example `07e-interruptible-playht-http.py` for easy
testing of `PlayHTHttpTTSService`.

Expand All @@ -20,34 +23,44 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Added `enable_prejoin_ui`, `max_participants` and `start_video_off` params
to `DailyRoomProperties`.

- Added `session_timeout` to `FastAPIWebsocketTransport` and `WebsocketServerTransport`
for configuring session timeouts (in seconds). Triggers `on_session_timeout` for custom timeout handling.
See [examples/websocket-server/bot.py](https://github.com/pipecat-ai/pipecat/blob/main/examples/websocket-server/bot.py).
- Added `session_timeout` to `FastAPIWebsocketTransport` and
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Miscellaneous CHANGELOG cleanup.

`WebsocketServerTransport` for configuring session timeouts (in seconds).
Triggers `on_session_timeout` for custom timeout handling. See
[examples/websocket-server/bot.py](https://github.com/pipecat-ai/pipecat/blob/main/examples/websocket-server/bot.py).

- Added the new modalities option and helper function to set Gemini output modalities.
- Added the new modalities option and helper function to set Gemini output
modalities.

- Added `examples/foundational/26d-gemini-multimodal-live-text.py` which is using Gemini as TEXT modality and using another TTS provider for TTS process.
- Added `examples/foundational/26d-gemini-multimodal-live-text.py` which is
using Gemini as TEXT modality and using another TTS provider for TTS process.

### Changed

- Changed the default model for `PlayHTHttpTTSService` to `Play3.0-mini-http`.

- api_key, aws_access_key_id and region are no longer required parameters for the PollyTTSService (AWSTTSService)
- `api_key`, `aws_access_key_id` and `region` are no longer required parameters
for the `PollyTTSService` (AWSTTSService).

- Added `session_timeout` example in `examples/websocket-server/bot.py` to handle session timeout event.
- Added `session_timeout` example in `examples/websocket-server/bot.py` to
handle session timeout event.

- Changed `InputParams` in `src/pipecat/services/gemini_multimodal_live/gemini.py` to support different modalities.
- Changed `InputParams` in `src/pipecat/services/gemini_multimodal_live/gemini.py`
to support different modalities.

### Fixed

- Fixed an issue where websocket based TTS services could incorrectly terminate
their connection due to a retry counter not resetting.

- Fixed an import issue for `PlayHTHttpTTSService`.

- Fixed an issue where languages couldn't be used with the `PlayHTHttpTTSService`.

- Fixed an issue where `OpenAIRealtimeBetaLLMService` audio chunks were hitting
an error when truncating audio content.

- Fixed an issue where setting the voice and model for `RimeHttpTTSService` wasn't working.
- Fixed an issue where setting the voice and model for `RimeHttpTTSService`
wasn't working.

## [0.0.52] - 2024-12-24

Expand Down
34 changes: 5 additions & 29 deletions src/pipecat/services/cartesia.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,13 @@
# SPDX-License-Identifier: BSD 2-Clause License
#

import asyncio
import base64
import json
import uuid
from typing import AsyncGenerator, List, Optional, Union

from loguru import logger
from pydantic import BaseModel
from tenacity import AsyncRetrying, RetryCallState, stop_after_attempt, wait_exponential

from pipecat.frames.frames import (
BotStoppedSpeakingFrame,
Expand All @@ -30,6 +28,7 @@
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.ai_services import TTSService, WordTTSService
from pipecat.services.websocket_service import WebsocketService
from pipecat.transcriptions.language import Language

# See .env.example for Cartesia configuration needed
Expand Down Expand Up @@ -76,7 +75,7 @@ def language_to_cartesia_language(language: Language) -> str | None:
return result


class CartesiaTTSService(WordTTSService):
class CartesiaTTSService(WordTTSService, WebsocketService):
class InputParams(BaseModel):
language: Optional[Language] = Language.EN
speed: Optional[Union[str, float]] = ""
Expand Down Expand Up @@ -106,12 +105,14 @@ def __init__(
# if we're interrupted. Cartesia gives us word-by-word timestamps. We
# can use those to generate text frames ourselves aligned with the
# playout timing of the audio!
super().__init__(
WordTTSService.__init__(
self,
aggregate_sentences=True,
push_text_frames=False,
sample_rate=sample_rate,
**kwargs,
)
WebsocketService.__init__(self)

self._api_key = api_key
self._cartesia_version = cartesia_version
Expand All @@ -131,7 +132,6 @@ def __init__(
self.set_model_name(model)
self.set_voice(voice_id)

self._websocket = None
self._context_id = None
self._receive_task = None

Expand Down Expand Up @@ -275,30 +275,6 @@ async def _receive_messages(self):
else:
logger.error(f"{self} error, unknown message type: {msg}")

async def _reconnect_websocket(self, retry_state: RetryCallState):
logger.warning(f"{self} reconnecting (attempt: {retry_state.attempt_number})")
await self._disconnect_websocket()
await self._connect_websocket()

async def _receive_task_handler(self):
while True:
try:
async for attempt in AsyncRetrying(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10),
before_sleep=self._reconnect_websocket,
reraise=True,
):
with attempt:
await self._receive_messages()
except asyncio.CancelledError:
break
except Exception as e:
message = f"{self} error receiving messages: {e}"
logger.error(message)
await self.push_error(ErrorFrame(message, fatal=True))
break

async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)

Expand Down
35 changes: 5 additions & 30 deletions src/pipecat/services/elevenlabs.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,11 @@

from loguru import logger
from pydantic import BaseModel, model_validator
from tenacity import AsyncRetrying, RetryCallState, stop_after_attempt, wait_exponential

from pipecat.frames.frames import (
BotStoppedSpeakingFrame,
CancelFrame,
EndFrame,
ErrorFrame,
Frame,
LLMFullResponseEndFrame,
StartFrame,
Expand All @@ -29,6 +27,7 @@
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.ai_services import WordTTSService
from pipecat.services.websocket_service import WebsocketService
from pipecat.transcriptions.language import Language

# See .env.example for ElevenLabs configuration needed
Expand Down Expand Up @@ -133,7 +132,7 @@ def calculate_word_times(
return word_times


class ElevenLabsTTSService(WordTTSService):
class ElevenLabsTTSService(WordTTSService, WebsocketService):
class InputParams(BaseModel):
language: Optional[Language] = Language.EN
optimize_streaming_latency: Optional[str] = None
Expand Down Expand Up @@ -178,14 +177,16 @@ def __init__(
# Finally, ElevenLabs doesn't provide information on when the bot stops
# speaking for a while, so we want the parent class to send TTSStopFrame
# after a short period not receiving any audio.
super().__init__(
WordTTSService.__init__(
self,
aggregate_sentences=True,
push_text_frames=False,
push_stop_frames=True,
stop_frame_timeout_s=2.0,
sample_rate=sample_rate_from_output_format(output_format),
**kwargs,
)
WebsocketService.__init__(self)

self._api_key = api_key
self._url = url
Expand All @@ -206,8 +207,6 @@ def __init__(
self.set_voice(voice_id)
self._voice_settings = self._set_voice_settings()

# Websocket connection to ElevenLabs.
self._websocket = None
# Indicates if we have sent TTSStartedFrame. It will reset to False when
# there's an interruption or TTSStoppedFrame.
self._started = False
Expand Down Expand Up @@ -377,30 +376,6 @@ async def _receive_messages(self):
await self.add_word_timestamps(word_times)
self._cumulative_time = word_times[-1][1]

async def _reconnect_websocket(self, retry_state: RetryCallState):
logger.warning(f"{self} reconnecting (attempt: {retry_state.attempt_number})")
await self._disconnect_websocket()
await self._connect_websocket()

async def _receive_task_handler(self):
while True:
try:
async for attempt in AsyncRetrying(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10),
before_sleep=self._reconnect_websocket,
reraise=True,
):
with attempt:
await self._receive_messages()
except asyncio.CancelledError:
break
except Exception as e:
message = f"{self} error receiving messages: {e}"
logger.error(message)
await self.push_error(ErrorFrame(message, fatal=True))
break

async def _keepalive_task_handler(self):
while True:
try:
Expand Down
Loading
Loading