Skip to content

Commit

Permalink
Merge pull request #33 from slkzgm/feature/speakers-idle
Browse files Browse the repository at this point in the history
feat: enhance speaker management, idle detection, and dynamic GPT config
  • Loading branch information
lalalune authored Dec 29, 2024
2 parents b9d6711 + 0587600 commit d8600df
Show file tree
Hide file tree
Showing 10 changed files with 635 additions and 257 deletions.
288 changes: 154 additions & 134 deletions package-lock.json

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions src/_module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,11 @@ export { Scraper } from './scraper';
export { SearchMode } from './search';
export type { QueryProfilesResponse, QueryTweetsResponse } from './timeline-v1';
export type { Tweet } from './tweets';

export { Space } from './spaces/core/Space'
export { SttTtsPlugin } from './spaces/plugins/SttTtsPlugin'
export { RecordToDiskPlugin } from './spaces/plugins/RecordToDiskPlugin'
export { MonitorAudioPlugin } from './spaces/plugins/MonitorAudioPlugin'
export { IdleMonitorPlugin } from './spaces/plugins/IdleMonitorPlugin'

export * from './types/spaces'
3 changes: 2 additions & 1 deletion src/spaces/core/JanusAudioSource.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
// src/core/audio.ts

import { EventEmitter } from 'events';
import { nonstandard } from '@roamhq/wrtc';
import wrtc from '@roamhq/wrtc';
const { nonstandard } = wrtc;
const { RTCAudioSource, RTCAudioSink } = nonstandard;

export class JanusAudioSource extends EventEmitter {
Expand Down
27 changes: 16 additions & 11 deletions src/spaces/core/JanusClient.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
// src/core/JanusClient.ts

import { EventEmitter } from 'events';
import { RTCPeerConnection, MediaStream } from '@roamhq/wrtc';
import wrtc from '@roamhq/wrtc';
const { RTCPeerConnection, MediaStream } = wrtc;
import { JanusAudioSink, JanusAudioSource } from './JanusAudioSource';
import type { AudioDataWithUser, TurnServersInfo } from '../types';

Expand Down Expand Up @@ -112,6 +113,7 @@ export class JanusClient extends EventEmitter {
}
const feedId = pub.id;
console.log('[JanusClient] found feedId =>', feedId);
this.emit('subscribedSpeaker', { userId, feedId });

// 3) "join" as subscriber with "streams: [{ feed, mid: '0', send: true }]"
const joinBody = {
Expand Down Expand Up @@ -161,14 +163,14 @@ export class JanusClient extends EventEmitter {
console.log('[JanusClient] subscriber track =>', evt.track.kind);

// TODO: REMOVE DEBUG
console.log(
'[JanusClient] subscriber track => kind=',
evt.track.kind,
'readyState=',
evt.track.readyState,
'muted=',
evt.track.muted,
);
// console.log(
// '[JanusClient] subscriber track => kind=',
// evt.track.kind,
// 'readyState=',
// evt.track.readyState,
// 'muted=',
// evt.track.muted,
// );

const sink = new JanusAudioSink(evt.track);
sink.on('audioData', (frame) => {
Expand Down Expand Up @@ -506,7 +508,8 @@ export class JanusClient extends EventEmitter {
}

private handleJanusEvent(evt: any) {
console.log('[JanusClient] handleJanusEvent =>', JSON.stringify(evt));
// TODO: REMOVE DEBUG
// console.log('[JanusClient] handleJanusEvent =>', JSON.stringify(evt));

if (!evt.janus) return;
if (evt.janus === 'keepalive') {
Expand Down Expand Up @@ -550,7 +553,9 @@ export class JanusClient extends EventEmitter {
if (!this.pc) return;

this.pc.addEventListener('iceconnectionstatechange', () => {
console.log('[JanusClient] ICE state =>', this.pc?.iceConnectionState);
// TODO: REMOVE DEBUG
// console.log('[JanusClient] ICE state =>', this.pc?.iceConnectionState);

if (this.pc?.iceConnectionState === 'failed') {
this.emit('error', new Error('ICE connection failed'));
}
Expand Down
160 changes: 146 additions & 14 deletions src/spaces/core/Space.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import type {
Plugin,
AudioDataWithUser,
PluginRegistration,
SpeakerInfo,
} from '../types';
import { Scraper } from '../../scraper';

Expand All @@ -34,6 +35,7 @@ export class Space extends EventEmitter {
private broadcastInfo?: BroadcastCreated;
private isInitialized = false;
private plugins = new Set<PluginRegistration>();
private speakers = new Map<string, SpeakerInfo>();

constructor(private readonly scraper: Scraper) {
super();
Expand Down Expand Up @@ -105,6 +107,22 @@ export class Space extends EventEmitter {
// You can store or forward to a plugin, run STT, etc.
});

this.janusClient.on('subscribedSpeaker', ({ userId, feedId }) => {
const speaker = this.speakers.get(userId);
if (!speaker) {
console.log(
'[Space] subscribedSpeaker => speaker not found for userId=',
userId,
);
return;
}

speaker.janusParticipantId = feedId;
console.log(
`[Space] updated speaker info => userId=${userId}, feedId=${feedId}`,
);
});

// 7) Publish the broadcast
console.log('[Space] Publishing broadcast...');
await publishBroadcast({
Expand Down Expand Up @@ -171,6 +189,11 @@ export class Space extends EventEmitter {
throw new Error('[Space] No auth token available');
}

this.speakers.set(userId, {
userId,
sessionUUID,
});

// 1) Call the "request/approve" endpoint
await this.callApproveEndpoint(
this.broadcastInfo,
Expand Down Expand Up @@ -221,6 +244,111 @@ export class Space extends EventEmitter {
console.log('[Space] Speaker approved =>', userId);
}

/**
* Removes a speaker (userId) on the Twitter side (audiospace/stream/eject)
* then unsubscribes in Janus if needed.
*/
public async removeSpeaker(userId: string) {
if (!this.isInitialized || !this.broadcastInfo) {
throw new Error('[Space] Not initialized or no broadcastInfo');
}
if (!this.authToken) {
throw new Error('[Space] No auth token available');
}
if (!this.janusClient) {
throw new Error('[Space] No Janus client initialized');
}

const speaker = this.speakers.get(userId);
if (!speaker) {
throw new Error(
`[Space] removeSpeaker => no speaker found for userId=${userId}`,
);
}

const sessionUUID = speaker.sessionUUID;
const janusParticipantId = speaker.janusParticipantId;
console.log(sessionUUID, janusParticipantId, speaker);
if (!sessionUUID || janusParticipantId === undefined) {
throw new Error(
`[Space] removeSpeaker => missing sessionUUID or feedId for userId=${userId}`,
);
}

const janusHandleId = this.janusClient.getHandleId();
const janusSessionId = this.janusClient.getSessionId();

if (!janusHandleId || !janusSessionId) {
throw new Error(
`[Space] removeSpeaker => missing Janus handle or sessionId for userId=${userId}`,
);
}

// 1) Call the Twitter eject endpoint
await this.callRemoveEndpoint(
this.broadcastInfo,
this.authToken,
sessionUUID,
janusParticipantId,
this.broadcastInfo.room_id,
janusHandleId,
janusSessionId,
);

// 2) Remove from local speakers map
this.speakers.delete(userId);

console.log(`[Space] removeSpeaker => removed userId=${userId}`);
}

/**
* Calls the audiospace/stream/eject endpoint to remove a speaker on Twitter
*/
private async callRemoveEndpoint(
broadcast: BroadcastCreated,
authorizationToken: string,
sessionUUID: string,
janusParticipantId: number,
janusRoomId: string,
webrtcHandleId: number,
webrtcSessionId: number,
): Promise<void> {
const endpoint = 'https://guest.pscp.tv/api/v1/audiospace/stream/eject';

const headers = {
'Content-Type': 'application/json',
Referer: 'https://x.com/',
Authorization: authorizationToken,
};

const body = {
ntpForBroadcasterFrame: '2208988800024000300',
ntpForLiveFrame: '2208988800024000300',
session_uuid: sessionUUID,
chat_token: broadcast.access_token,
janus_room_id: janusRoomId,
janus_participant_id: janusParticipantId,
webrtc_handle_id: webrtcHandleId,
webrtc_session_id: webrtcSessionId,
};

console.log('[Space] Removing speaker =>', endpoint, body);
const resp = await fetch(endpoint, {
method: 'POST',
headers,
body: JSON.stringify(body),
});

if (!resp.ok) {
const error = await resp.text();
throw new Error(
`[Space] Failed to remove speaker => ${resp.status}: ${error}`,
);
}

console.log('[Space] Speaker removed => sessionUUID=', sessionUUID);
}

pushAudio(samples: Int16Array, sampleRate: number) {
this.janusClient?.pushLocalAudio(samples, sampleRate);
}
Expand All @@ -240,34 +368,34 @@ export class Space extends EventEmitter {
* Gracefully end the Space (stop broadcast, destroy Janus room, etc.)
*/
public async finalizeSpace(): Promise<void> {
console.log('[Space] finalizeSpace => stopping broadcast gracefully');
console.log('[Space] finalizeSpace => stopping broadcast gracefully');

const tasks: Array<Promise<any>> = [];
const tasks: Array<Promise<any>> = [];

if (this.janusClient) {
tasks.push(
this.janusClient.destroyRoom().catch((err) => {
console.error('[Space] destroyRoom error =>', err);
}),
this.janusClient.destroyRoom().catch((err) => {
console.error('[Space] destroyRoom error =>', err);
}),
);
}

if (this.broadcastInfo) {
tasks.push(
this.endAudiospace({
broadcastId: this.broadcastInfo.room_id,
chatToken: this.broadcastInfo.access_token,
}).catch((err) => {
console.error('[Space] endAudiospace error =>', err);
}),
this.endAudiospace({
broadcastId: this.broadcastInfo.room_id,
chatToken: this.broadcastInfo.access_token,
}).catch((err) => {
console.error('[Space] endAudiospace error =>', err);
}),
);
}

if (this.janusClient) {
tasks.push(
this.janusClient.leaveRoom().catch((err) => {
console.error('[Space] leaveRoom error =>', err);
}),
this.janusClient.leaveRoom().catch((err) => {
console.error('[Space] leaveRoom error =>', err);
}),
);
}

Expand Down Expand Up @@ -309,6 +437,10 @@ export class Space extends EventEmitter {
console.log('[Space] endAudiospace => success =>', json);
}

public getSpeakers(): SpeakerInfo[] {
return Array.from(this.speakers.values());
}

async stop() {
console.log('[Space] Stopping...');

Expand Down
81 changes: 81 additions & 0 deletions src/spaces/plugins/IdleMonitorPlugin.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// src/plugins/IdleMonitorPlugin.ts

import { Plugin, AudioDataWithUser } from '../types';
import { Space } from '../core/Space';

/**
* Plugin that tracks the last speaker audio timestamp
* and the last local audio timestamp to detect overall silence.
*/
export class IdleMonitorPlugin implements Plugin {
private space?: Space;
private lastSpeakerAudioMs = Date.now();
private lastLocalAudioMs = Date.now();
private checkInterval?: NodeJS.Timeout;

/**
* @param idleTimeoutMs How many ms of silence before triggering idle (default 60s)
* @param checkEveryMs Interval for checking silence (default 10s)
*/
constructor(
private idleTimeoutMs: number = 60_000,
private checkEveryMs: number = 10_000,
) {}

onAttach(space: Space) {
this.space = space;
console.log('[IdleMonitorPlugin] onAttach => plugin attached');
}

init(params: { space: Space; pluginConfig?: Record<string, any> }): void {
this.space = params.space;
console.log('[IdleMonitorPlugin] init => setting up idle checks');

// Update lastSpeakerAudioMs on incoming speaker audio
this.space.on('audioDataFromSpeaker', (data: AudioDataWithUser) => {
this.lastSpeakerAudioMs = Date.now();
});

// Patch space.pushAudio to update lastLocalAudioMs
const originalPushAudio = this.space.pushAudio.bind(this.space);
this.space.pushAudio = (samples, sampleRate) => {
this.lastLocalAudioMs = Date.now();
originalPushAudio(samples, sampleRate);
};

// Periodically check for silence
this.checkInterval = setInterval(() => this.checkIdle(), this.checkEveryMs);
}

private checkIdle() {
const now = Date.now();
const lastAudio = Math.max(this.lastSpeakerAudioMs, this.lastLocalAudioMs);
const idleMs = now - lastAudio;

if (idleMs >= this.idleTimeoutMs) {
console.log(
'[IdleMonitorPlugin] idleTimeout => no audio for',
idleMs,
'ms',
);
this.space?.emit('idleTimeout', { idleMs });
}
}

/**
* Returns how many ms have passed since any audio was detected.
*/
public getIdleTimeMs(): number {
const now = Date.now();
const lastAudio = Math.max(this.lastSpeakerAudioMs, this.lastLocalAudioMs);
return now - lastAudio;
}

cleanup(): void {
console.log('[IdleMonitorPlugin] cleanup => stopping idle checks');
if (this.checkInterval) {
clearInterval(this.checkInterval);
this.checkInterval = undefined;
}
}
}
Loading

0 comments on commit d8600df

Please sign in to comment.