Skip to content

Commit

Permalink
WIP: allow simple attach remote stream from sdk
Browse files Browse the repository at this point in the history
  • Loading branch information
giangndm committed May 6, 2024
1 parent a63b843 commit ae02391
Show file tree
Hide file tree
Showing 10 changed files with 288 additions and 55 deletions.
73 changes: 65 additions & 8 deletions packages/media_core/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
use std::{marker::PhantomData, time::Instant};

use media_server_protocol::{
endpoint::{PeerId, PeerMeta, RoomId, RoomInfoPublish, RoomInfoSubscribe, TrackMeta, TrackName, TrackPriority},
endpoint::{BitrateControlMode, PeerId, PeerMeta, RoomId, RoomInfoPublish, RoomInfoSubscribe, TrackMeta, TrackName, TrackPriority},
media::MediaPacket,
protobuf,
transport::RpcResult,
};
use sans_io_runtime::{
Expand All @@ -26,21 +27,77 @@ mod middleware;

pub struct EndpointSession(pub u64);

pub enum EndpointRemoteTrackReq {}
pub struct EndpointRemoteTrackConfig {
pub priority: TrackPriority,
pub control: Option<BitrateControlMode>,
}

impl From<protobuf::shared::sender::Config> for EndpointRemoteTrackConfig {
fn from(value: protobuf::shared::sender::Config) -> Self {
Self {
priority: value.priority.into(),
control: value.bitrate.map(|v| protobuf::shared::BitrateControlMode::try_from(v).ok()).flatten().map(|v| v.into()),
}
}
}

pub enum EndpointRemoteTrackReq {
Config(EndpointRemoteTrackConfig),
}

pub enum EndpointRemoteTrackRes {
Config(RpcResult<()>),
}

pub struct EndpointLocalTrackSource {
pub peer: PeerId,
pub track: TrackName,
}

pub enum EndpointRemoteTrackRes {}
impl From<protobuf::shared::receiver::Source> for EndpointLocalTrackSource {
fn from(value: protobuf::shared::receiver::Source) -> Self {
Self {
peer: value.peer.into(),
track: value.track.into(),
}
}
}

pub struct EndpointLocalTrackConfig {
pub priority: TrackPriority,
pub max_spatial: u8,
pub max_temporal: u8,
pub min_spatial: Option<u8>,
pub min_temporal: Option<u8>,
}

impl From<protobuf::shared::receiver::Config> for EndpointLocalTrackConfig {
fn from(value: protobuf::shared::receiver::Config) -> Self {
Self {
priority: value.priority.into(),
max_spatial: value.max_spatial as u8,
max_temporal: value.max_temporal as u8,
min_spatial: value.min_spatial.map(|m| m as u8),
min_temporal: value.min_temporal.map(|m| m as u8),
}
}
}

pub enum EndpointLocalTrackReq {
Switch(Option<(PeerId, TrackName, TrackPriority)>),
Attach(EndpointLocalTrackSource, EndpointLocalTrackConfig),
Detach(),
Config(EndpointLocalTrackConfig),
}

pub enum EndpointLocalTrackRes {
Switch(RpcResult<()>),
Attach(RpcResult<()>),
Detach(RpcResult<()>),
Config(RpcResult<()>),
}

pub struct EndpointReqId(pub u64);
impl From<u64> for EndpointReqId {
fn from(value: u64) -> Self {
pub struct EndpointReqId(pub u32);
impl From<u32> for EndpointReqId {
fn from(value: u32) -> Self {
Self(value)
}
}
Expand Down
24 changes: 16 additions & 8 deletions packages/media_core/src/endpoint/internal/local_track.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ pub struct EndpointLocalTrack {

impl EndpointLocalTrack {
pub fn new(kind: MediaKind, room: Option<ClusterRoomHash>) -> Self {
log::info!("[EndpointLocalTrack] track {kind}, room {:?}", room);
Self {
kind,
room,
Expand Down Expand Up @@ -113,40 +114,47 @@ impl EndpointLocalTrack {

fn on_rpc_req(&mut self, _now: Instant, req_id: EndpointReqId, req: EndpointLocalTrackReq) -> Option<Output> {
match req {
EndpointLocalTrackReq::Switch(Some((peer, track, priority))) => {
EndpointLocalTrackReq::Attach(source, config) => {
//TODO process config here
if let Some(room) = self.room.as_ref() {
let peer = source.peer;
let track = source.track;
log::info!("[EndpointLocalTrack] view room {room} peer {peer} track {track}");
if let Some((_peer, _track)) = self.bind.take() {
log::info!("[EndpointLocalTrack] view room {room} peer {peer} track {track} => unsubscribe current {_peer} {_track}");
self.queue.push_back(Output::Cluster(*room, ClusterLocalTrackControl::Unsubscribe));
self.queue.push_back(Output::Stopped(self.kind));
}
self.bind = Some((peer.clone(), track.clone()));
self.queue.push_back(Output::Started(self.kind, priority));
self.queue.push_back(Output::Started(self.kind, config.priority));
self.queue.push_back(Output::Cluster(*room, ClusterLocalTrackControl::Subscribe(peer, track)));
self.selector.reset();
Some(Output::RpcRes(req_id, EndpointLocalTrackRes::Switch(Ok(()))))
Some(Output::RpcRes(req_id, EndpointLocalTrackRes::Attach(Ok(()))))
} else {
log::warn!("[EndpointLocalTrack] view but not in room");
Some(Output::RpcRes(req_id, EndpointLocalTrackRes::Switch(Err(RpcError::new2(EndpointErrors::EndpointNotInRoom)))))
Some(Output::RpcRes(req_id, EndpointLocalTrackRes::Attach(Err(RpcError::new2(EndpointErrors::EndpointNotInRoom)))))
}
}
EndpointLocalTrackReq::Switch(None) => {
EndpointLocalTrackReq::Detach() => {
//TODO process config here
if let Some(room) = self.room.as_ref() {
if let Some((peer, track)) = self.bind.take() {
self.queue.push_back(Output::Stopped(self.kind));
self.queue.push_back(Output::Cluster(*room, ClusterLocalTrackControl::Unsubscribe));
log::info!("[EndpointLocalTrack] unview room {room} peer {peer} track {track}");
Some(Output::RpcRes(req_id, EndpointLocalTrackRes::Switch(Ok(()))))
Some(Output::RpcRes(req_id, EndpointLocalTrackRes::Detach(Ok(()))))
} else {
log::warn!("[EndpointLocalTrack] unview but not bind to any source");
Some(Output::RpcRes(req_id, EndpointLocalTrackRes::Switch(Err(RpcError::new2(EndpointErrors::LocalTrackNotPinSource)))))
Some(Output::RpcRes(req_id, EndpointLocalTrackRes::Detach(Err(RpcError::new2(EndpointErrors::LocalTrackNotPinSource)))))
}
} else {
log::warn!("[EndpointLocalTrack] unview but not in room");
Some(Output::RpcRes(req_id, EndpointLocalTrackRes::Switch(Err(RpcError::new2(EndpointErrors::EndpointNotInRoom)))))
Some(Output::RpcRes(req_id, EndpointLocalTrackRes::Detach(Err(RpcError::new2(EndpointErrors::EndpointNotInRoom)))))
}
}
EndpointLocalTrackReq::Config(config) => {

Check warning on line 155 in packages/media_core/src/endpoint/internal/local_track.rs

View workflow job for this annotation

GitHub Actions / build-release (macos x64)

unused variable: `config`

Check warning on line 155 in packages/media_core/src/endpoint/internal/local_track.rs

View workflow job for this annotation

GitHub Actions / build-release (linux musl aarch64)

unused variable: `config`

Check warning on line 155 in packages/media_core/src/endpoint/internal/local_track.rs

View workflow job for this annotation

GitHub Actions / build-release (linux musl x64)

unused variable: `config`

Check warning on line 155 in packages/media_core/src/endpoint/internal/local_track.rs

View workflow job for this annotation

GitHub Actions / build-release (linux gnu x64)

unused variable: `config`

Check warning on line 155 in packages/media_core/src/endpoint/internal/local_track.rs

View workflow job for this annotation

GitHub Actions / build-release (macos aarch64)

unused variable: `config`
todo!()
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ impl PacketSelector {
self.queue.push_back(Action::RequestKeyFrame);
self.need_key_frame = true;
self.last_key_frame_ts = Some(now_ms);
} else {
log::info!("[LocalTrack/PacketSelector] video source changed and first pkt is key");
}
}

Expand Down
2 changes: 1 addition & 1 deletion packages/media_core/src/endpoint/internal/remote_track.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ impl EndpointRemoteTrack {
}

fn on_rpc_req(&mut self, _now: Instant, _req_id: EndpointReqId, _req: EndpointRemoteTrackReq) -> Option<Output> {
None
todo!()
}

fn on_bitrate_allocation_action(&mut self, _now: Instant, action: IngressAction) -> Option<Output> {
Expand Down
55 changes: 41 additions & 14 deletions packages/protocol/proto/conn.proto
Original file line number Diff line number Diff line change
Expand Up @@ -33,26 +33,38 @@ message Request {
}

message Sender {
message Switch {
optional shared.Sender.Source source = 1;
message Attach {
required shared.Sender.Source source = 1;
required shared.Sender.Config config = 2;
}

message Detach {

}

required string name = 1;
oneof request {
Switch switch = 2;
shared.Sender.Config config = 3;
Attach attach = 2;
Detach detach = 3;
shared.Sender.Config config = 4;
}
}

message Receiver {
message Switch {
optional shared.Receiver.Source source = 1;
message Attach {
required shared.Receiver.Source source = 1;
required shared.Receiver.Config config = 2;
}

message Detach {

}

required string name = 1;
oneof request {
Switch switch = 2;
shared.Receiver.Config config = 3;
Attach attach = 2;
Detach detach = 3;
shared.Receiver.Config config = 4;
}
}

Expand Down Expand Up @@ -91,27 +103,42 @@ message Response {
}

message Sender {
message Switch {
message Attach {

}

message Detach {

}

message Config {

}

oneof response {
Switch switch = 1;
Attach attach = 1;
Detach detach = 2;
Config config = 3;
}
}

message Receiver {
message Switch {
message Attach {

}

message Detach {

}

message Limit {
message Config {

}

oneof response {
Switch switch = 1;
Limit limit = 2;
Attach attach = 1;
Detach detach = 2;
Config config = 3;
}
}

Expand Down
2 changes: 1 addition & 1 deletion packages/protocol/src/media.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize};

use crate::protobuf::shared::Kind;

#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, derive_more::Display)]
pub enum MediaKind {
Audio,
Video,
Expand Down
Loading

0 comments on commit ae02391

Please sign in to comment.