Skip to content

Commit add38b6

Browse files
authored
fix: correctly unsubscribe to the ffi queue (#72)
1 parent fd48955 commit add38b6

File tree

4 files changed

+19
-12
lines changed

4 files changed

+19
-12
lines changed

examples/publish_wave.py

+1-10
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import asyncio
2+
import time
23
import logging
34
from signal import SIGINT, SIGTERM
45

@@ -12,30 +13,22 @@
1213
SAMPLE_RATE = 48000
1314
NUM_CHANNELS = 1
1415

15-
1616
async def publish_frames(source: livekit.AudioSource, frequency: int):
1717
amplitude = 32767 # for 16-bit audio
1818
samples_per_channel = 480 # 10ms at 48kHz
1919
time = np.arange(samples_per_channel) / SAMPLE_RATE
2020
total_samples = 0
21-
2221
audio_frame = livekit.AudioFrame.create(
2322
SAMPLE_RATE, NUM_CHANNELS, samples_per_channel)
24-
2523
audio_data = np.ctypeslib.as_array(audio_frame.data)
26-
2724
while True:
2825
time = (total_samples + np.arange(samples_per_channel)) / SAMPLE_RATE
29-
3026
sine_wave = (amplitude * np.sin(2 * np.pi *
3127
frequency * time)).astype(np.int16)
3228
np.copyto(audio_data, sine_wave)
33-
3429
await source.capture_frame(audio_frame)
35-
3630
total_samples += samples_per_channel
3731

38-
3932
async def main(room: livekit.Room) -> None:
4033

4134
@room.on("participant_disconnected")
@@ -45,11 +38,9 @@ def on_participant_disconnect(participant: livekit.Participant, *_):
4538
logging.info("connecting to %s", URL)
4639
try:
4740
e2ee_options = livekit.E2EEOptions()
48-
e2ee_options.key_provider_options.shared_key = b"password"
4941

5042
await room.connect(URL, TOKEN, options=livekit.RoomOptions(
5143
auto_subscribe=True,
52-
e2ee=e2ee_options
5344
))
5445
logging.info("connected to room %s", room.name)
5546
except livekit.ConnectError as e:

livekit/audio_stream.py

+4
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@ def __init__(self, track: Track,
4444

4545
self._task = self._loop.create_task(self._run())
4646

47+
def __del__(self) -> None:
48+
ffi_client.queue.unsubscribe(self._ffi_queue)
49+
4750
async def _run(self):
4851
while True:
4952
event = await self._ffi_queue.wait_for(self._is_event)
@@ -57,6 +60,7 @@ async def _run(self):
5760
break
5861

5962
async def close(self):
63+
ffi_client.queue.unsubscribe(self._ffi_queue)
6064
del self._ffi_handle
6165
await self._task
6266

livekit/room.py

+10-2
Original file line numberDiff line numberDiff line change
@@ -61,13 +61,16 @@ def __init__(self, loop: Optional[asyncio.AbstractEventLoop] = None) -> None:
6161

6262
self._ffi_handle: Optional[FfiHandle] = None
6363
self._loop = loop or asyncio.get_event_loop()
64-
self._ffi_queue = ffi_client.queue.subscribe(self._loop)
6564
self._room_queue = BroadcastQueue[proto_ffi.FfiEvent]()
6665
self._info = proto_room.RoomInfo()
6766

6867
self.participants: dict[str, RemoteParticipant] = {}
6968
self.connection_state = ConnectionState.CONN_DISCONNECTED
7069

70+
def __del__(self) -> None:
71+
if self._ffi_handle is not None:
72+
ffi_client.queue.unsubscribe(self._ffi_queue)
73+
7174
@property
7275
def sid(self) -> str:
7376
return self._info.sid
@@ -120,15 +123,19 @@ async def connect(self,
120123
req.connect.options.rtc_config.ice_servers.extend(
121124
options.rtc_config.ice_servers)
122125

126+
# subscribe before connecting so we don't miss any events
127+
self._ffi_queue = ffi_client.queue.subscribe(self._loop)
128+
123129
try:
124-
queue = ffi_client.queue.subscribe(self._loop)
130+
queue = ffi_client.queue.subscribe()
125131
resp = ffi_client.request(req)
126132
cb = await queue.wait_for(lambda e: e.connect.async_id ==
127133
resp.connect.async_id)
128134
finally:
129135
ffi_client.queue.unsubscribe(queue)
130136

131137
if cb.connect.error:
138+
ffi_client.queue.unsubscribe(self._ffi_queue)
132139
raise ConnectError(cb.connect.error)
133140

134141
self._ffi_handle = FfiHandle(cb.connect.room.handle.id)
@@ -169,6 +176,7 @@ async def disconnect(self) -> None:
169176
ffi_client.queue.unsubscribe(queue)
170177

171178
await self._task
179+
ffi_client.queue.unsubscribe(self._ffi_queue)
172180

173181
async def _listen_task(self) -> None:
174182
# listen to incoming room events

livekit/video_stream.py

+4
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@ def __init__(self, track: Track,
4444

4545
self._task = self._loop.create_task(self._run())
4646

47+
def __del__(self) -> None:
48+
ffi_client.queue.unsubscribe(self._ffi_queue)
49+
4750
async def _run(self):
4851
while True:
4952
event = await self._ffi_queue.wait_for(self._is_event)
@@ -60,6 +63,7 @@ async def _run(self):
6063
break
6164

6265
async def close(self):
66+
ffi_client.queue.unsubscribe(self._ffi_queue)
6367
del self._ffi_handle
6468
await self._task
6569

0 commit comments

Comments
 (0)