Skip to content
This repository has been archived by the owner on Jul 20, 2024. It is now read-only.

Commit

Permalink
fix: stream view limiter not working (#44)
Browse files Browse the repository at this point in the history
  • Loading branch information
luongngocminh authored Jan 2, 2024
1 parent fa3b886 commit 2f3c315
Show file tree
Hide file tree
Showing 8 changed files with 119 additions and 115 deletions.
75 changes: 37 additions & 38 deletions samples/simulcast/app.js
Original file line number Diff line number Diff line change
@@ -1,47 +1,46 @@
async function onMyStreamAdded(stream) {
console.log('added mystream:', stream);
if(stream.kind == 'video') {
let receiver = await window.atm0sSession.takeReceiver('video');
let element = document.getElementById('my_video');
element.srcObject = receiver.stream;
element.receiver = receiver;
receiver.switch(stream);
}
console.log('added mystream:', stream);
if (stream.kind == 'video') {
let receiver = await window.atm0sSession.takeReceiver('video');
let element = document.getElementById('my_video');
element.srcObject = receiver.stream;
element.receiver = receiver;
receiver.switch(stream);
}

if(stream.kind == 'audio') {
let receiver = window.atm0sSession.takeReceiver('audio');
let element = document.getElementById('my_audio');
element.srcObject = receiver.stream;
receiver.switch(stream);
}
if (stream.kind == 'audio') {
let receiver = window.atm0sSession.takeReceiver('audio');
let element = document.getElementById('my_audio');
element.srcObject = receiver.stream;
receiver.switch(stream);
}
}

async function boot() {
const urlSearchParams = new URLSearchParams(window.location.search);
const params = Object.fromEntries(urlSearchParams.entries());
const urlSearchParams = new URLSearchParams(window.location.search);
const params = Object.fromEntries(urlSearchParams.entries());

let video_stream = await navigator.mediaDevices.getUserMedia({audio: false, video: true});
let session = Atm0s.createSession(params['server'] || [''], {
roomId: params['room'] || 'demo',
peerId: params['peer'] || 'echo-client-' + new Date().getTime(),
token: params['token'],
senders: [
{ stream: video_stream, name: 'video_main', kind: 'video', simulcast: true }
],
receivers: {
audio: 1,
video: 1
}
});
window.atm0sSession = session;
session.connect();
session.on('stream_added', onMyStreamAdded);
let video_stream = await navigator.mediaDevices.getUserMedia({ audio: false, video: true });
let session = Atm0s.createSession(params['server'] || [''], {
roomId: params['room'] || 'demo',
peerId: params['peer'] || 'echo-client-' + new Date().getTime(),
token: params['token'],
senders: [{ stream: video_stream, name: 'video_main', kind: 'video', simulcast: true }],
receivers: {
audio: 1,
video: 1,
},
logLevel: 5,
});
window.atm0sSession = session;
session.connect();
session.on('mystream_added', onMyStreamAdded);

document.getElementById('quality').onchange = (event) => {
let layers = event.target.value.split('-');
let element = document.getElementById('my_video');
element.receiver.limit(50, parseInt(layers[0]), parseInt(layers[1]));
}
document.getElementById('quality').onchange = (event) => {
let layers = event.target.value.split('-');
let element = document.getElementById('my_video');
element.receiver.limit(50, 0, 0, parseInt(layers[0]), parseInt(layers[1]));
};
}

boot();
boot();
26 changes: 6 additions & 20 deletions src/lib/consumer-pair.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import type { StreamConsumer } from './consumer';
import type { StreamReceiverState } from './interfaces';
import type { IConsumerCallbacks } from './interfaces/consumer';
import { TypedEventEmitter } from './utils/typed-event-emitter';
import type { RemoteStreamQuality } from './utils/types';
import type { RemoteStreamQuality, StreamLimit } from './utils/types';

export class StreamConsumerPair extends TypedEventEmitter<IConsumerCallbacks> {
private _combinedStream: MediaStream;
Expand Down Expand Up @@ -50,27 +50,13 @@ export class StreamConsumerPair extends TypedEventEmitter<IConsumerCallbacks> {
this.emit('quality', quality);
};

limit(
key: string,
priority: number = 50,
minSpatial: number = 0,
maxSpatial: number = 2,
minTemporal: number = 0,
maxTemporal: number = 2,
) {
this._videoConsumer.limit(key, priority, minSpatial, maxSpatial, minTemporal, maxTemporal);
limit(key: string, limit: StreamLimit) {
this._videoConsumer.limit(key, limit);
}

view(
key: string,
priority: number = 50,
minSpatial: number = 0,
maxSpatial: number = 2,
minTemporal: number = 0,
maxTemporal: number = 2,
): MediaStream {
const audioStream = this._audioConsumer.view(key);
const videoStream = this._videoConsumer.view(key, priority, minSpatial, maxSpatial, minTemporal, maxTemporal);
view(key: string, limit?: StreamLimit): MediaStream {
const audioStream = this._audioConsumer.view(key, limit);
const videoStream = this._videoConsumer.view(key, limit);
this._combinedStream.getTracks().forEach((track) => {
this._combinedStream.removeTrack(track);
});
Expand Down
50 changes: 21 additions & 29 deletions src/lib/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import type { IConsumerCallbacks, ViewInfo } from './interfaces/consumer';
import type { StreamRemote } from './remote';
import type { Session } from './session';
import { TypedEventEmitter } from './utils/typed-event-emitter';
import { StreamKinds, type RemoteStreamQuality } from './utils/types';
import { StreamKinds, type RemoteStreamQuality, type StreamLimit } from './utils/types';

/**
* Represents a stream consumer that sets up views for specific viewers and configures layer settings.
Expand All @@ -30,22 +30,22 @@ export class StreamConsumer extends TypedEventEmitter<IConsumerCallbacks> {
/**
* Sets up a view for a specific viewer key.
* @param key - The key of the viewer.
* @param priority - The priority of the view (default: 50).
* @param minSpatial - The minimum spatial layer for the view (default: 0).
* @param maxSpatial - The maximum spatial limit (default: 2).
* @param minTemporal - The minimum temporal layer for the view (default: 0).
* @param maxTemporal - The maximum temporal limit (default: 2).
* @param limit - The limit to set for the view.
*
* @returns The MediaStream of the view.
*/
public view(
key: string,
priority: number = 50,
minSpatial: number = 0,
maxSpatial: number = 2,
minTemporal: number = 0,
maxTemporal: number = 2,
limit: StreamLimit = { priority: 50, maxSpatial: 2, minSpatial: 0, maxTemporal: 2, minTemporal: 0 },
): MediaStream {
this.keys.set(key, { priority, maxSpatial, maxTemporal, minSpatial, minTemporal });
const { priority, maxSpatial, maxTemporal, minSpatial, minTemporal } = limit;
this.keys.set(key, {
priority,
maxSpatial,
maxTemporal,
minSpatial: minSpatial || 0,
minTemporal: minTemporal || 0,
});
if (!this.receiver) {
this.receiver = this._session.takeReceiver(this._remote.kind);
this.receiver.on('state', this.onReceiverStateChanged);
Expand All @@ -62,19 +62,11 @@ export class StreamConsumer extends TypedEventEmitter<IConsumerCallbacks> {
/**
* Sets the limit for a specific view by key.
* @param key - The key of the view to set the limit for.
* @param priority - The priority of the view (default: 50).
* @param minSpatial - The minimum spatial layer for the view (default: 0).
* @param maxSpatial - The maximum spatial limit (default: 2).
* @param minTemporal - The minimum temporal layer for the view (default: 0).
* @param maxTemporal - The maximum temporal limit (default: 2).
* @param limit - The limit to set for the view.
*/
public limit(
key: string,
priority: number = 50,
minSpatial: number = 0,
maxSpatial: number = 2,
minTemporal: number = 0,
maxTemporal: number = 2,
{ priority = 50, maxSpatial = 2, minSpatial = 0, maxTemporal = 2, minTemporal = 0 }: StreamLimit,
) {
this.keys.set(key, { priority, maxSpatial, maxTemporal, minSpatial, minTemporal });
this.configLayer();
Expand Down Expand Up @@ -120,13 +112,13 @@ export class StreamConsumer extends TypedEventEmitter<IConsumerCallbacks> {
selectedMinSpartial = Math.max(selectedMinSpartial, viewer.minSpatial);
selectedMinTemporal = Math.max(selectedMinTemporal, viewer.minTemporal);
});
this.receiver?.limit(
selectedPriority,
selectedMinSpartial,
selectedMaxSpartial,
selectedMinTemporal,
selectedMaxTemporal,
);
this.receiver?.limit({
priority: selectedPriority,
maxSpatial: selectedMaxSpartial,
maxTemporal: selectedMaxTemporal,
minTemporal: selectedMinTemporal,
minSpatial: selectedMinSpartial,
});
}

private onReceiverAudioLevelChanged = (level: number) => {
Expand Down
14 changes: 3 additions & 11 deletions src/lib/interfaces/receiver.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { StreamRemote } from '../remote';
import type { TypedEventEmitter } from '../utils/typed-event-emitter';
import type { Codecs, StreamKinds, LatencyMode, RemoteStreamQuality } from '../utils/types';
import type { Codecs, StreamKinds, LatencyMode, RemoteStreamQuality, StreamLimit } from '../utils/types';

/**
* Represents a stream receiver.
Expand All @@ -24,18 +24,10 @@ export interface IStreamReceiver extends TypedEventEmitter<IStreamReceiverCallba
* `spatial` is a value indicating the definition clarity of the stream.
* `temporal` is a value indicating the smoothness, or frame rate of the stream.
*
* @param priority The priority of the stream to limit.
* @param maxSpatial The maximum spatial value.
* @param maxTemporal The maximum temporal value.
* @param limit - The limit to set for the stream.
* @returns A promise that resolves to a boolean indicating whether the limit was successful.
*/
limit(
priority: number,
minSpatial: number,
maxSpatial: number,
minTemporal: number,
maxTemporal: number,
): Promise<boolean>;
limit(limit: StreamLimit): Promise<boolean>;

/**
* Stops the stream.
Expand Down
10 changes: 7 additions & 3 deletions src/lib/interfaces/rpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,13 @@ export type RpcResponse<T> = {
export type RpcRequests = {
'receiver.limit': {
id: string;
priority: number;
max_spatial: number;
max_temporal: number;
limit: {
priority: number;
min_spatial?: number;
min_temporal?: number;
max_spatial: number;
max_temporal: number;
};
};
'receiver.switch': {
id: string;
Expand Down
28 changes: 20 additions & 8 deletions src/lib/receiver.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,20 @@ describe('StreamReceiver', () => {

rpc.request.mockResolvedValueOnce({ status: true });

const result = await receiver.limit(50, 2, 3);
const result = await receiver.limit({
priority: 50,
maxSpatial: 2,
maxTemporal: 3,
});

expect(result).toBe(true);
expect(rpc.request).toHaveBeenCalledWith('receiver.limit', {
id: '123',
priority: 50,
max_spatial: 2,
max_temporal: 3,
limit: {
priority: 50,
max_spatial: 2,
max_temporal: 3,
},
});
});

Expand All @@ -135,14 +141,20 @@ describe('StreamReceiver', () => {

rpc.request.mockResolvedValueOnce({ status: false });

const result = await receiver.limit(50, 2, 3);
const result = await receiver.limit({
priority: 50,
maxSpatial: 2,
maxTemporal: 3,
});

expect(result).toBe(false);
expect(rpc.request).toHaveBeenCalledWith('receiver.limit', {
id: '123',
priority: 50,
max_spatial: 2,
max_temporal: 3,
limit: {
priority: 50,
max_spatial: 2,
max_temporal: 3,
},
});
});

Expand Down
16 changes: 10 additions & 6 deletions src/lib/receiver.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { TypedEventEmitter } from './utils/typed-event-emitter';
import { getLogger } from './utils/logger';
import type { AnyFunction, RemoteStreamQuality } from './utils/types';
import type { AnyFunction, RemoteStreamQuality, StreamLimit } from './utils/types';
import type { StreamRemote } from './remote';
import { type IStreamReceiverCallbacks, type IStreamReceiver, StreamReceiverState } from './interfaces/receiver';
import type { IRPC } from './interfaces/rpc';
Expand Down Expand Up @@ -137,15 +137,19 @@ export class StreamReceiver extends TypedEventEmitter<IStreamReceiverCallbacks>
return false;
}

async limit(priority: number, maxSpatial: number, maxTemporal: number): Promise<boolean> {
this.logger.log('limit stream', priority, maxSpatial, maxTemporal);
async limit(limit: StreamLimit): Promise<boolean> {
this.logger.log('limit stream', limit.priority, limit.maxSpatial, limit.maxTemporal);
await this.internalReady();
if (this._track.stream) {
const res = await this._rpc.request('receiver.limit', {
id: this.remoteId,
priority,
max_spatial: maxSpatial,
max_temporal: maxTemporal,
limit: {
priority: limit.priority,
min_spatial: limit.minSpatial,
min_temporal: limit.minTemporal,
max_spatial: limit.maxSpatial,
max_temporal: limit.maxTemporal,
},
});
if (res.status === true) {
return true;
Expand Down
15 changes: 15 additions & 0 deletions src/lib/utils/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,18 @@ export type RemoteStreamQuality = {
};

export type SessionEvent = keyof ISessionCallbacks;

/**
* @param priority - The priority of the stream (default: 50).
* @param minSpatial - The minimum spatial limit for the stream (default: 0).
* @param maxSpatial - The maximum spatial limit (default: 2).
* @param minTemporal - The minimum temporal layer for the stream (default: 0).
* @param maxTemporal - The maximum temporal limit (default: 2).
*/
export type StreamLimit = {
priority: number;
minSpatial?: number;
maxSpatial: number;
minTemporal?: number;
maxTemporal: number;
};

0 comments on commit 2f3c315

Please sign in to comment.