Skip to content

Commit

Permalink
Move timer to the main thread
Browse files Browse the repository at this point in the history
That way it can't get blocked by the PCM thread being slow, which would
defeat the purpose of having the timer.
  • Loading branch information
singpolyma committed Oct 17, 2024
1 parent 805e664 commit 5e0f193
Showing 1 changed file with 35 additions and 15 deletions.
50 changes: 35 additions & 15 deletions snikket/jingle/PeerConnection.cpp.hx
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ class MediaStreamTrack {
private var alive = true;
private var waitForQ = false;
private var bufferSizeInSeconds = 0.0;
private var mutex = new sys.thread.Mutex();

@:allow(snikket)
private var media(get, default): StdOptional<DescriptionMedia>;
Expand All @@ -366,23 +367,31 @@ class MediaStreamTrack {
@:allow(snikket)
private function new() {
eventLoop = sys.thread.Thread.createWithEventLoop(() -> {
timer = new haxe.Timer(10); // This timer will stop when the audioloop for this track stops
timer.run = () -> {
if (untyped __cpp__("!_gthis->track")) return;
if (!alive || !track.ref.isOpen()) return;

if (audioQ.length > 0 && audioQ[audioQ.length - 1].stamp <= haxe.Timer.stamp()) {
final packet = audioQ.pop();
write(packet.payload, packet.payloadType, packet.clockRate);
advanceTimestamp(Std.int(packet.payload.length / packet.channels));
}
if (waitForQ && audioQ.length < (50+50*bufferSizeInSeconds)) {
waitForQ = false;
notifyReadyForData(false);
}
};
while(alive) { sys.thread.Thread.processEvents(); sys.thread.Thread.current().events.wait(); }
}).events;

timer = new haxe.Timer(10);
timer.run = () -> {
mutex.acquire();
if (untyped __cpp__("!_gthis->track")) {
mutex.release();
return;
}
if (!alive || !track.ref.isOpen()) {
mutex.release();
return;
}
if (audioQ.length > 0 && audioQ[audioQ.length - 1].stamp <= haxe.Timer.stamp()) {
final packet = audioQ.pop();
write(packet.payload, packet.payloadType, packet.clockRate);
advanceTimestamp(Std.int(packet.payload.length / packet.channels));
}
if (waitForQ && audioQ.length < (50+50*bufferSizeInSeconds)) {
waitForQ = false;
notifyReadyForData(false);
}
mutex.release();
};
}

private function get_media() {
Expand Down Expand Up @@ -520,7 +529,9 @@ class MediaStreamTrack {
if (readyForPCMCallback != null) {
eventLoop.run(() -> {
if (audioQ.length > (50+50*bufferSizeInSeconds)) {
mutex.acquire();
waitForQ = true;
mutex.release();
} else {
readyForPCMCallback();
}
Expand All @@ -540,15 +551,19 @@ class MediaStreamTrack {
final format = Lambda.find(supportedAudioFormats, format -> format.clockRate == clockRate && format.channels == channels);
if (format == null) throw "Unsupported audo format: " + clockRate + "/" + channels;
eventLoop.run(() -> {
mutex.acquire();
final stamp = if (audioQ.length < 1) {
bufferSizeInSeconds = Math.max(bufferSizeInSeconds, bufferSizeInSeconds + 0.1);
haxe.Timer.stamp() + bufferSizeInSeconds;
} else {
audioQ[0].stamp + (pcm.length / (clockRate / 1000)) / 1000.0;
}
mutex.release();
if (format.format == "PCMU") {
final packet = { channels: channels, payloadType: format.payloadType, clockRate: clockRate, payload: pcm.map(pcmToUlaw), stamp: stamp };
mutex.acquire();
audioQ.unshift(packet);
mutex.release();
} else if (format.format == "opus") {
if (untyped __cpp__("!{0}", opusEncoder)) {
opusEncoder = OpusEncoder.create(clockRate, channels, untyped __cpp__("OPUS_APPLICATION_VOIP"), null); // assume only one opus clockRate+channels for this track
Expand All @@ -560,7 +575,9 @@ class MediaStreamTrack {
final encoded = OpusEncoder.encode(opusEncoder, cpp.Pointer.ofArray(pcm), Std.int(pcm.length / channels), cpp.Pointer.ofArray(rawOpus), rawOpus.length);
rawOpus.resize(encoded);
final packet = { channels: channels, payloadType: format.payloadType, clockRate: clockRate, payload: rawOpus, stamp: stamp };
mutex.acquire();
audioQ.unshift(packet);
mutex.release();
} else {
trace("Ignoring audio meant to go out as", format.format, format.clockRate, format.channels);
}
Expand Down Expand Up @@ -590,12 +607,15 @@ class MediaStreamTrack {
}

public function stop() {
timer.stop();
mutex.acquire();
alive = false;
if (track.ref.isOpen()) track.ref.close();
if (untyped __cpp__("opus")) {
OpusDecoder.destroy(opus);
opus = null;
}
mutex.release();
}
}

Expand Down

0 comments on commit 5e0f193

Please sign in to comment.