Skip to content

Commit

Permalink
feat: add more react hooks (#39)
Browse files Browse the repository at this point in the history
* feat: add more react hooks

* fix example
  • Loading branch information
giangndm authored Nov 11, 2024
1 parent 6ab3df4 commit 0f8d4cd
Show file tree
Hide file tree
Showing 12 changed files with 169 additions and 41 deletions.
12 changes: 10 additions & 2 deletions apps/web/app/react_samples/echo_fast/content.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ import { useCallback, useEffect, useRef, useState } from "react";
import {
useConsumer,
usePublisher,
useRemoteVideoTracks,
useSession,
RemoteTrack,
useConsumerStatus,
Atm0sMediaProvider,
useMessageChannel,
useSessionStatus,
usePublisherStatus,
useLocalVideoTracks,
} from "@atm0s-media-sdk/react-hooks";

import { Kind, MessageChannelEvent } from "@atm0s-media-sdk/core";
Expand Down Expand Up @@ -60,13 +62,16 @@ interface Message {
function EchoContent(): JSX.Element {
const previewVideoRef = useRef<HTMLVideoElement>(null);
const session = useSession();
const sessionStatus = useSessionStatus();
const router = useRouter();
const audio_sender = usePublisher("audio_main", Kind.AUDIO);
const video_sender = usePublisher("video_main", Kind.VIDEO);
const audio_status = usePublisherStatus(audio_sender);
const video_status = usePublisherStatus(video_sender);
const [chats, setChats] = useState<Message[]>([]);
const [view, setView] = useState(true);

const video_tracks = useRemoteVideoTracks();
const video_tracks = useLocalVideoTracks();
const msgChannel = useMessageChannel("test", (e: MessageChannelEvent) => {
setChats((chats) => [
...chats,
Expand Down Expand Up @@ -123,6 +128,9 @@ function EchoContent(): JSX.Element {
<audio autoPlay id="audio-echo" />
{view &&
video_tracks.map((t) => <EchoViewer key={t.peer} track={t} />)}
<div>Status: {sessionStatus}</div>
<div>Audio: {audio_status}</div>
<div>Video: {video_status}</div>
</div>
</div>
{/* This is for control buttons */}
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"private": true,
"scripts": {
"build": "turbo build",
"build:sdk": "turbo build --filter=@atm0s-media-sdk/*",
"dev": "turbo dev",
"lint": "turbo lint",
"format": "prettier --write \"**/*.{ts,tsx,md}\""
Expand Down
12 changes: 10 additions & 2 deletions packages/sdk-core/src/data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,13 @@ import {
} from "./generated/protobuf/features.mixer";
import { EventEmitter, ReadyWaiter } from "./utils";

export enum DatachannelState {
CONNECTED = "connected",
DISCONNECTED = "disconnected",
}

export enum DatachannelEvent {
STATE = "event.state",
ROOM = "event.room",
SESSION = "event.session",
SENDER = "event.sender.",
Expand All @@ -40,6 +46,7 @@ export class Datachannel extends EventEmitter {
dc.onopen = () => {
console.log("[Datachannel] on open");
this.waiter.setReady();
this.emit(DatachannelEvent.STATE, DatachannelState.CONNECTED);
};
dc.onmessage = (e) => {
const msg = ServerEvent.decode(new Uint8Array(e.data));
Expand Down Expand Up @@ -79,6 +86,7 @@ export class Datachannel extends EventEmitter {
};
dc.onclose = () => {
console.log("[Datachannel] on close");
this.emit(DatachannelEvent.STATE, DatachannelState.DISCONNECTED);
// TODO: Cleanup all requests
};
}
Expand All @@ -87,8 +95,8 @@ export class Datachannel extends EventEmitter {
return this.dc.readyState == "open";
}

public async ready(): Promise<void> {
return this.waiter.waitReady();
public async ready(timeout?: number): Promise<void> {
return this.waiter.waitReady(timeout);
}

public async requestSession(req: Request_Session): Promise<Response_Session> {
Expand Down
9 changes: 2 additions & 7 deletions packages/sdk-core/src/lib.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,6 @@ export { TrackReceiver, TrackReceiverEvent } from "./receiver";
export * from "./events";
export { EventEmitter } from "./utils";

import {
SessionConfig as SessionConfigRaw,
JoinInfo as JoinInfoRaw,
} from "./session";

export type {
AudioMixerConfig,
AudioMixer,
Expand All @@ -33,8 +28,8 @@ export {
kindToString,
Kind,
} from "./types";
export type SessionConfig = SessionConfigRaw;
export type JoinInfo = JoinInfoRaw;
export { SessionStatus } from "./session";
export type { SessionConfig, JoinInfo } from "./session";

export {
Receiver_Status as TrackReceiverStatus,
Expand Down
2 changes: 1 addition & 1 deletion packages/sdk-core/src/sender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ export class TrackSender extends EventEmitter {
}

public get status(): TrackSenderStatus | undefined {
return this.status;
return this._status;
}

public get attached() {
Expand Down
73 changes: 55 additions & 18 deletions packages/sdk-core/src/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {
import { TrackReceiver } from "./receiver";
import { TrackSender, TrackSenderConfig } from "./sender";
import { EventEmitter, postProtobuf } from "./utils";
import { Datachannel, DatachannelEvent } from "./data";
import { Datachannel, DatachannelEvent, DatachannelState } from "./data";
import {
Request_Session_UpdateSdp,
ServerEvent_Room,
Expand All @@ -34,7 +34,19 @@ export interface SessionConfig {
join?: JoinInfo;
}

export enum SessionStatus {
NEW = "new",
CONNECTING = "connecting",
CONNECTED = "connected",
//TODO handle reconnect
RECONNECTING = "reconnecting",
RECONNECTED = "reconnected",
DISCONNECTED = "disconnected",
ERROR = "error",
}

export enum SessionEvent {
SESSION_CHANGED = "session.changed",
ROOM_CHANGED = "room.changed",
ROOM_PEER_JOINED = "room.peer.joined",
ROOM_PEER_UPDATED = "room.peer.updated",
Expand All @@ -48,6 +60,7 @@ export class Session extends EventEmitter {
peer: RTCPeerConnection;
dc: Datachannel;

state: SessionStatus = SessionStatus.NEW;
ice_lite: boolean = false;
restarting_ice: boolean = false;
created_at: number;
Expand Down Expand Up @@ -89,18 +102,29 @@ export class Session extends EventEmitter {
}
});

this.dc.on(DatachannelEvent.STATE, (state: DatachannelState) => {
if (state == DatachannelState.DISCONNECTED) {
this.setState(SessionStatus.DISCONNECTED);
}
});

//TODO add await to throtle for avoiding too much update in short time
this.peer.onnegotiationneeded = (event) => {
console.log("[Session] RTCPeer negotiation needed", event);
if (this.dc.connected && !this.restarting_ice)
this.syncSdp().then(console.log).catch(console.error);
};

this.peer.onconnectionstatechange = (_event) => {
this.peer.onconnectionstatechange = (event) => {
console.log(
"[Session] RTCPeer connection state changed",
this.peer.connectionState,
);
switch (this.peer.connectionState) {
case "disconnected":
this.setState(SessionStatus.DISCONNECTED);
break;
}
};

this.peer.oniceconnectionstatechange = (_event) => {
Expand Down Expand Up @@ -193,6 +217,7 @@ export class Session extends EventEmitter {
if (!this.prepareState) {
throw new Error("Not in prepare state");
}
this.setState(SessionStatus.CONNECTING);
this.prepareState = false;
this.version = version;
console.info("Prepare senders and receivers to connect");
Expand Down Expand Up @@ -228,22 +253,28 @@ export class Session extends EventEmitter {
sdp: local_desc.sdp,
});
console.log("Connecting");
const res = await postProtobuf(
ConnectRequest,
ConnectResponse,
this.gateway + "/webrtc/connect",
req,
{
Authorization: "Bearer " + this.cfg.token,
"Content-Type": "application/grpc",
},
);
this.conn_id = res.connId;
this.ice_lite = res.iceLite;
await this.peer.setLocalDescription(local_desc);
await this.peer.setRemoteDescription({ type: "answer", sdp: res.sdp });
await this.dc.ready();
console.log("Connected");
try {
const res = await postProtobuf(
ConnectRequest,
ConnectResponse,
this.gateway + "/webrtc/connect",
req,
{
Authorization: "Bearer " + this.cfg.token,
"Content-Type": "application/grpc",
},
);
this.conn_id = res.connId;
this.ice_lite = res.iceLite;
await this.peer.setLocalDescription(local_desc);
await this.peer.setRemoteDescription({ type: "answer", sdp: res.sdp });
await this.dc.ready(30000); //connect timeout at 30s
this.setState(SessionStatus.CONNECTED);
console.log("Connected");
} catch (e) {
this.setState(SessionStatus.ERROR);
console.error("Connect error", e);
}
}

async restartIce() {
Expand Down Expand Up @@ -401,5 +432,11 @@ export class Session extends EventEmitter {
});
console.info("Disconnected session", this.created_at);
this.peer.close();
this.setState(SessionStatus.DISCONNECTED);
}

private setState(state: SessionStatus) {
this.state = state;
this.emit(SessionEvent.SESSION_CHANGED, this.state);
}
}
20 changes: 18 additions & 2 deletions packages/sdk-core/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,28 @@ export class ReadyWaiter {
this.waits = [];
};

waitReady = () => {
waitReady = (timeout?: number) => {
if (this.ready) {
return Promise.resolve();
} else {
return new Promise<void>((resolve, reject) => {
this.waits.push([resolve, reject]);
if (timeout) {
let timeout_id: any = setTimeout(() => {
reject(new Error("Timeout"));
}, timeout);
this.waits.push([
() => {
if (timeout_id) {
clearTimeout(timeout_id);
timeout_id = undefined;
resolve();
}
},
reject,
]);
} else {
this.waits.push([resolve, reject]);
}
});
}
};
Expand Down
6 changes: 6 additions & 0 deletions packages/sdk-react-hooks/src/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ import {
Sender_Config,
stringToKind,
JoinInfo,
SessionStatus,
} from "@atm0s-media-sdk/core";

export enum ContextEvent {
SessionUpdated = "session.updated",
RoomUpdated = "room.updated",
PeersUpdated = "peers.updated",
TracksUpdated = "tracks.updated",
Expand Down Expand Up @@ -86,6 +88,10 @@ export class Context extends EventEmitter {
this.free_video_receivers.push(this.session.receiver(Kind.VIDEO));
}

this.session.on(SessionEvent.SESSION_CHANGED, (state: SessionStatus) => {
this.emit(ContextEvent.SessionUpdated, state);
});

this.session.on(SessionEvent.ROOM_PEER_JOINED, (peer: RoomPeerJoined) => {
this.peers.set(peer.peer, peer);
this.emit(ContextEvent.PeersUpdated);
Expand Down
37 changes: 34 additions & 3 deletions packages/sdk-react-hooks/src/hooks/meta.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Kind } from "@atm0s-media-sdk/core";
import { useContext, useEffect, useState } from "react";
import { Atm0sMediaContext } from "../provider";
import { ContextEvent } from "../context";
import { useRoom } from "./session";

export interface RemotePeer {
peer: string;
Expand All @@ -13,7 +14,7 @@ export interface RemoteTrack {
kind: Kind;
}

export function useRemotePeers(): RemotePeer[] {
export function usePeers(): RemotePeer[] {
const ctx = useContext(Atm0sMediaContext);
const [peers, setPeers] = useState(() => Array.from(ctx.peers.values()));
useEffect(() => {
Expand All @@ -32,7 +33,7 @@ export function useRemotePeers(): RemotePeer[] {
return peers;
}

export function useRemoteTracks(peer?: string, kind?: Kind): RemoteTrack[] {
export function useTracks(peer?: string, kind?: Kind): RemoteTrack[] {
const ctx = useContext(Atm0sMediaContext);
const [tracks, setTracks] = useState(() =>
Array.from(ctx.tracks.values()).filter(
Expand Down Expand Up @@ -65,8 +66,38 @@ export function useRemoteTracks(peer?: string, kind?: Kind): RemoteTrack[] {
}, [ctx, peer]);
return tracks;
}
export function useAudioTracks(peer?: string): RemoteTrack[] {
return useTracks(peer, Kind.AUDIO);
}
export function useVideoTracks(peer?: string): RemoteTrack[] {
return useTracks(peer, Kind.VIDEO);
}

export function useLocalPeer() {
const room = useRoom();
return usePeers().filter((p) => p.peer === room?.peer);
}
export function useLocalTracks(kind?: Kind): RemoteTrack[] {
const room = useRoom();
return useTracks(room?.peer, kind).filter((t) => t.peer === room?.peer);
}
export function useLocalAudioTracks(): RemoteTrack[] {
return useLocalTracks(Kind.AUDIO);
}
export function useLocalVideoTracks(): RemoteTrack[] {
return useLocalTracks(Kind.VIDEO);
}

export function useRemotePeers(): RemotePeer[] {
const room = useRoom();
return usePeers().filter((p) => p.peer !== room?.peer);
}
export function useRemoteTracks(peer?: string, kind?: Kind): RemoteTrack[] {
const room = useRoom();
return useTracks(peer, kind).filter((t) => t.peer !== room?.peer);
}
export function useRemoteAudioTracks(peer?: string): RemoteTrack[] {
return useRemoteTracks(peer, Kind.AUDIO);
return useRemoteTracks(peer, Kind.AUDIO)
}
export function useRemoteVideoTracks(peer?: string): RemoteTrack[] {
return useRemoteTracks(peer, Kind.VIDEO);
Expand Down
Loading

0 comments on commit 0f8d4cd

Please sign in to comment.