Skip to content

Commit

Permalink
twiliohandle interruption (pipecat-ai#422)
Browse files Browse the repository at this point in the history
* add interuption handler in twilio serializer

* fix autopep8

* revert ruff autoformatting

* address pr comments

* change interruption frame to user started frame in serializer

* remove overrrident handle interrupt

* remove unused import

* change userstarted to interuption frame
  • Loading branch information
aashsach authored Sep 2, 2024
1 parent aba5f89 commit 51cd7fd
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 18 deletions.
31 changes: 17 additions & 14 deletions src/pipecat/serializers/twilio.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from pydantic import BaseModel

from pipecat.frames.frames import AudioRawFrame, Frame
from pipecat.frames.frames import AudioRawFrame, Frame, StartInterruptionFrame
from pipecat.serializers.base_serializer import FrameSerializer
from pipecat.utils.audio import ulaw_to_pcm, pcm_to_ulaw

Expand All @@ -28,22 +28,25 @@ def __init__(self, stream_sid: str, params: InputParams = InputParams()):
self._params = params

def serialize(self, frame: Frame) -> str | bytes | None:
if not isinstance(frame, AudioRawFrame):
return None

data = frame.audio
if isinstance(frame, AudioRawFrame):
data = frame.audio

serialized_data = pcm_to_ulaw(data, frame.sample_rate, self._params.twilio_sample_rate)
payload = base64.b64encode(serialized_data).decode("utf-8")
answer = {
"event": "media",
"streamSid": self._stream_sid,
"media": {
"payload": payload
serialized_data = pcm_to_ulaw(
data, frame.sample_rate, self._params.twilio_sample_rate)
payload = base64.b64encode(serialized_data).decode("utf-8")
answer = {
"event": "media",
"streamSid": self._stream_sid,
"media": {
"payload": payload
}
}
}

return json.dumps(answer)
return json.dumps(answer)

if isinstance(frame, StartInterruptionFrame):
answer = {"event": "clear", "streamSid": self._stream_sid}
return json.dumps(answer)

def deserialize(self, data: str | bytes) -> Frame | None:
message = json.loads(data)
Expand Down
21 changes: 17 additions & 4 deletions src/pipecat/transports/network/fastapi_websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
from typing import Awaitable, Callable
from pydantic.main import BaseModel

from pipecat.frames.frames import AudioRawFrame, CancelFrame, EndFrame, StartFrame
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.frames.frames import AudioRawFrame, CancelFrame, EndFrame, Frame, StartFrame, StartInterruptionFrame
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.serializers.base_serializer import FrameSerializer
from pipecat.transports.base_input import BaseInputTransport
from pipecat.transports.base_output import BaseOutputTransport
Expand Down Expand Up @@ -93,11 +93,18 @@ def __init__(self, websocket: WebSocket, params: FastAPIWebsocketParams, **kwarg
self._params = params
self._websocket_audio_buffer = bytes()

async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)

if isinstance(frame, StartInterruptionFrame):
await self._write_frame(frame)

async def write_raw_audio_frames(self, frames: bytes):
self._websocket_audio_buffer += frames
while len(self._websocket_audio_buffer) >= self._params.audio_frame_size:
frame = AudioRawFrame(
audio=self._websocket_audio_buffer[:self._params.audio_frame_size],
audio=self._websocket_audio_buffer[:
self._params.audio_frame_size],
sample_rate=self._params.audio_out_sample_rate,
num_channels=self._params.audio_out_channels
)
Expand All @@ -121,7 +128,13 @@ async def write_raw_audio_frames(self, frames: bytes):
if payload and self._websocket.client_state == WebSocketState.CONNECTED:
await self._websocket.send_text(payload)

self._websocket_audio_buffer = self._websocket_audio_buffer[self._params.audio_frame_size:]
self._websocket_audio_buffer = self._websocket_audio_buffer[
self._params.audio_frame_size:]

async def _write_frame(self, frame: Frame):
payload = self._params.serializer.serialize(frame)
if payload and self._websocket.client_state == WebSocketState.CONNECTED:
await self._websocket.send_text(payload)


class FastAPIWebsocketTransport(BaseTransport):
Expand Down

0 comments on commit 51cd7fd

Please sign in to comment.