Skip to content

Commit

Permalink
transports(daily): fix completion callbacks handling
Browse files Browse the repository at this point in the history
  • Loading branch information
aconchillo committed Jul 23, 2024
1 parent 060a22f commit b583f51
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 16 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@ All notable changes to **pipecat** will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.0.39] - 2024-07-23

### Fixed

- Fixed a regression introduced in 0.0.38 that would cause Daily transcription
to stop the Pipeline.

## [0.0.38] - 2024-07-23

### Added
Expand Down
26 changes: 10 additions & 16 deletions src/pipecat/transports/services/daily.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,10 @@ class DailyCallbacks(BaseModel):
def completion_callback(future):
def _callback(*args):
if not future.cancelled():
future.get_loop().call_soon_threadsafe(future.set_result, *args)
if len(args) > 1:
future.get_loop().call_soon_threadsafe(future.set_result, args)
else:
future.get_loop().call_soon_threadsafe(future.set_result, *args)
return _callback


Expand Down Expand Up @@ -282,11 +285,12 @@ async def join(self):
await self._callbacks.on_error(error_msg)

async def _start_transcription(self):
future = self._loop.create_future()
logger.info(f"Enabling transcription with settings {self._params.transcription_settings}")

future = self._loop.create_future()
self._client.start_transcription(
settings=self._params.transcription_settings.model_dump(exclude_none=True),
completion=lambda error: future.set_result(error)
completion=completion_callback(future)
)
error = await future
if error:
Expand All @@ -295,14 +299,10 @@ async def _start_transcription(self):
async def _join(self):
future = self._loop.create_future()

def handle_join_response(data, error):
if not future.cancelled():
future.get_loop().call_soon_threadsafe(future.set_result, (data, error))

self._client.join(
self._room_url,
self._token,
completion=handle_join_response,
completion=completion_callback(future),
client_settings={
"inputs": {
"camera": {
Expand Down Expand Up @@ -370,20 +370,14 @@ async def leave(self):

async def _stop_transcription(self):
future = self._loop.create_future()
self._client.stop_transcription(completion=lambda error: future.set_result(error))
self._client.stop_transcription(completion=completion_callback(future))
error = await future
if error:
logger.error(f"Unable to stop transcription: {error}")

async def _leave(self):
future = self._loop.create_future()

def handle_leave_response(error):
if not future.cancelled():
future.get_loop().call_soon_threadsafe(future.set_result, error)

self._client.leave(completion=handle_leave_response)

self._client.leave(completion=completion_callback(future))
return await asyncio.wait_for(future, timeout=10)

async def cleanup(self):
Expand Down

0 comments on commit b583f51

Please sign in to comment.