From 5e0f1930fb630e4dce6908a23ddfa102b89a329b Mon Sep 17 00:00:00 2001 From: Stephen Paul Weber Date: Wed, 16 Oct 2024 22:56:49 -0500 Subject: [PATCH] Move timer to the main thread That way it can't get blocked by the PCM thread being slow, which would defeat the purpose of having the timer. --- snikket/jingle/PeerConnection.cpp.hx | 50 +++++++++++++++++++--------- 1 file changed, 35 insertions(+), 15 deletions(-) diff --git a/snikket/jingle/PeerConnection.cpp.hx b/snikket/jingle/PeerConnection.cpp.hx index 01b13f4..9536b41 100644 --- a/snikket/jingle/PeerConnection.cpp.hx +++ b/snikket/jingle/PeerConnection.cpp.hx @@ -349,6 +349,7 @@ class MediaStreamTrack { private var alive = true; private var waitForQ = false; private var bufferSizeInSeconds = 0.0; + private var mutex = new sys.thread.Mutex(); @:allow(snikket) private var media(get, default): StdOptional; @@ -366,23 +367,31 @@ class MediaStreamTrack { @:allow(snikket) private function new() { eventLoop = sys.thread.Thread.createWithEventLoop(() -> { - timer = new haxe.Timer(10); // This timer will stop when the audioloop for this track stops - timer.run = () -> { - if (untyped __cpp__("!_gthis->track")) return; - if (!alive || !track.ref.isOpen()) return; - - if (audioQ.length > 0 && audioQ[audioQ.length - 1].stamp <= haxe.Timer.stamp()) { - final packet = audioQ.pop(); - write(packet.payload, packet.payloadType, packet.clockRate); - advanceTimestamp(Std.int(packet.payload.length / packet.channels)); - } - if (waitForQ && audioQ.length < (50+50*bufferSizeInSeconds)) { - waitForQ = false; - notifyReadyForData(false); - } - }; while(alive) { sys.thread.Thread.processEvents(); sys.thread.Thread.current().events.wait(); } }).events; + + timer = new haxe.Timer(10); + timer.run = () -> { + mutex.acquire(); + if (untyped __cpp__("!_gthis->track")) { + mutex.release(); + return; + } + if (!alive || !track.ref.isOpen()) { + mutex.release(); + return; + } + if (audioQ.length > 0 && audioQ[audioQ.length - 1].stamp <= haxe.Timer.stamp()) { + final packet = audioQ.pop(); + write(packet.payload, packet.payloadType, packet.clockRate); + advanceTimestamp(Std.int(packet.payload.length / packet.channels)); + } + if (waitForQ && audioQ.length < (50+50*bufferSizeInSeconds)) { + waitForQ = false; + notifyReadyForData(false); + } + mutex.release(); + }; } private function get_media() { @@ -520,7 +529,9 @@ class MediaStreamTrack { if (readyForPCMCallback != null) { eventLoop.run(() -> { if (audioQ.length > (50+50*bufferSizeInSeconds)) { + mutex.acquire(); waitForQ = true; + mutex.release(); } else { readyForPCMCallback(); } @@ -540,15 +551,19 @@ class MediaStreamTrack { final format = Lambda.find(supportedAudioFormats, format -> format.clockRate == clockRate && format.channels == channels); if (format == null) throw "Unsupported audo format: " + clockRate + "/" + channels; eventLoop.run(() -> { + mutex.acquire(); final stamp = if (audioQ.length < 1) { bufferSizeInSeconds = Math.max(bufferSizeInSeconds, bufferSizeInSeconds + 0.1); haxe.Timer.stamp() + bufferSizeInSeconds; } else { audioQ[0].stamp + (pcm.length / (clockRate / 1000)) / 1000.0; } + mutex.release(); if (format.format == "PCMU") { final packet = { channels: channels, payloadType: format.payloadType, clockRate: clockRate, payload: pcm.map(pcmToUlaw), stamp: stamp }; + mutex.acquire(); audioQ.unshift(packet); + mutex.release(); } else if (format.format == "opus") { if (untyped __cpp__("!{0}", opusEncoder)) { opusEncoder = OpusEncoder.create(clockRate, channels, untyped __cpp__("OPUS_APPLICATION_VOIP"), null); // assume only one opus clockRate+channels for this track @@ -560,7 +575,9 @@ class MediaStreamTrack { final encoded = OpusEncoder.encode(opusEncoder, cpp.Pointer.ofArray(pcm), Std.int(pcm.length / channels), cpp.Pointer.ofArray(rawOpus), rawOpus.length); rawOpus.resize(encoded); final packet = { channels: channels, payloadType: format.payloadType, clockRate: clockRate, payload: rawOpus, stamp: stamp }; + mutex.acquire(); audioQ.unshift(packet); + mutex.release(); } else { trace("Ignoring audio meant to go out as", format.format, format.clockRate, format.channels); } @@ -590,12 +607,15 @@ class MediaStreamTrack { } public function stop() { + timer.stop(); + mutex.acquire(); alive = false; if (track.ref.isOpen()) track.ref.close(); if (untyped __cpp__("opus")) { OpusDecoder.destroy(opus); opus = null; } + mutex.release(); } }