Skip to content

Commit

Permalink
match behavior of Daily's on_first_participant_joined
Browse files Browse the repository at this point in the history
  • Loading branch information
joachimchauvet committed Oct 1, 2024
1 parent 447baad commit 973c992
Showing 1 changed file with 17 additions and 2 deletions.
19 changes: 17 additions & 2 deletions src/pipecat/transports/services/livekit.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class LiveKitCallbacks(BaseModel):
on_audio_track_subscribed: Callable[[str], Awaitable[None]]
on_audio_track_unsubscribed: Callable[[str], Awaitable[None]]
on_data_received: Callable[[bytes, str], Awaitable[None]]
on_first_participant_joined: Callable[[str], Awaitable[None]]


class LiveKitTransportClient:
Expand All @@ -83,6 +84,7 @@ def __init__(
self._audio_track: rtc.LocalAudioTrack | None = None
self._audio_tracks = {}
self._audio_queue = asyncio.Queue()
self._other_participant_has_joined = False

# Set up room event handlers
self._room.on("participant_connected")(self._on_participant_connected_wrapper)
Expand Down Expand Up @@ -126,6 +128,12 @@ async def connect(self):
await self._room.local_participant.publish_track(self._audio_track, options)

await self._callbacks.on_connected()

# Check if there are already participants in the room
participants = self.get_participants()
if participants and not self._other_participant_has_joined:
self._other_participant_has_joined = True
await self._callbacks.on_first_participant_joined(participants[0])
except Exception as e:
logger.error(f"Error connecting to {self._room_name}: {e}")
raise
Expand Down Expand Up @@ -230,10 +238,15 @@ def _on_disconnected_wrapper(self):
async def _async_on_participant_connected(self, participant: rtc.RemoteParticipant):
logger.info(f"Participant connected: {participant.identity}")
await self._callbacks.on_participant_connected(participant.sid)
if not self._other_participant_has_joined:
self._other_participant_has_joined = True
await self._callbacks.on_first_participant_joined(participant.sid)

async def _async_on_participant_disconnected(self, participant: rtc.RemoteParticipant):
logger.info(f"Participant disconnected: {participant.identity}")
await self._callbacks.on_participant_disconnected(participant.sid)
if len(self.get_participants()) == 0:
self._other_participant_has_joined = False

async def _async_on_track_subscribed(
self,
Expand Down Expand Up @@ -503,6 +516,7 @@ def _create_callbacks(self) -> LiveKitCallbacks:
on_audio_track_subscribed=self._on_audio_track_subscribed,
on_audio_track_unsubscribed=self._on_audio_track_unsubscribed,
on_data_received=self._on_data_received,
on_first_participant_joined=self._on_first_participant_joined,
)

def input(self) -> FrameProcessor:
Expand Down Expand Up @@ -554,8 +568,6 @@ async def _on_disconnected(self):

async def _on_participant_connected(self, participant_id: str):
await self._call_event_handler("on_participant_connected", participant_id)
if len(self.get_participants()) == 1:
await self._call_event_handler("on_first_participant_joined", participant_id)

async def _on_participant_disconnected(self, participant_id: str):
await self._call_event_handler("on_participant_disconnected", participant_id)
Expand Down Expand Up @@ -608,3 +620,6 @@ async def on_track_event(self, event):

async def _on_call_state_updated(self, state: str):
await self._call_event_handler("on_call_state_updated", self, state)

async def _on_first_participant_joined(self, participant_id: str):
await self._call_event_handler("on_first_participant_joined", participant_id)

0 comments on commit 973c992

Please sign in to comment.