Skip to content

Commit

Permalink
Feat connector server (#120)
Browse files Browse the repository at this point in the history
* feat: added connector server and gateway fire event

* endpoint middleware and simple endpoint state event
  • Loading branch information
giangndm authored Dec 20, 2023
1 parent 697af35 commit 51e4228
Show file tree
Hide file tree
Showing 35 changed files with 875 additions and 175 deletions.
12 changes: 6 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions packages/cluster/src/define/endpoint.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use transport::TrackId;

use crate::{
ClusterEndpointError, ClusterLocalTrackIncomingEvent, ClusterLocalTrackOutgoingEvent, ClusterPeerId, ClusterRemoteTrackIncomingEvent, ClusterRemoteTrackOutgoingEvent, ClusterTrackMeta,
ClusterTrackName, ClusterTrackUuid,
rpc::connector::MediaEndpointLogRequest, ClusterEndpointError, ClusterLocalTrackIncomingEvent, ClusterLocalTrackOutgoingEvent, ClusterPeerId, ClusterRemoteTrackIncomingEvent,
ClusterRemoteTrackOutgoingEvent, ClusterTrackMeta, ClusterTrackName, ClusterTrackUuid,
};

#[async_trait::async_trait]
Expand All @@ -19,6 +19,7 @@ pub enum ClusterEndpointOutgoingEvent {
UnsubscribePeer(ClusterPeerId),
LocalTrackEvent(TrackId, ClusterLocalTrackOutgoingEvent),
RemoteTrackEvent(TrackId, ClusterTrackUuid, ClusterRemoteTrackOutgoingEvent),
MediaEndpointLog(MediaEndpointLogRequest),
}

#[derive(Clone, PartialEq, Eq, Debug)]
Expand Down
3 changes: 3 additions & 0 deletions packages/cluster/src/define/rpc.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::fmt::Debug;

pub mod connector;
pub mod gateway;
pub mod general;
pub mod webrtc;
Expand Down Expand Up @@ -57,3 +58,5 @@ pub const RPC_WHEP_CONNECT: &str = "RPC_WHEP_CONNECT";

pub const RPC_NODE_PING: &str = "RPC_NODE_PING";
pub const RPC_NODE_HEALTHCHECK: &str = "RPC_NODE_HEALTHCHECK";

pub const RPC_MEDIA_ENDPOINT_LOG: &str = "RPC_MEDIA_ENDPOINT_LOG";
165 changes: 165 additions & 0 deletions packages/cluster/src/define/rpc/connector.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
use std::net::SocketAddr;

use atm0s_sdn::NodeId;
use media_utils::F32;
use proc_macro::{IntoVecU8, TryFromSliceU8};
use serde::{Deserialize, Serialize};
use transport::MediaKind;

#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub enum MediaStreamIssueType {
Connectivity { mos: F32<2>, lost_percents: F32<2>, jitter_ms: F32<2>, rtt_ms: u32 },
}

#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub enum MediaEndpointEvent {
Routing {
user_agent: String,
gateway_node_id: NodeId,
},
RoutingError {
reason: String,
gateway_node_id: NodeId,
media_node_ids: Vec<NodeId>,
},
Routed {
media_node_id: NodeId,
after_ms: u32,
},
Connecting {
user_agent: String,
remote: Option<SocketAddr>,
},
ConnectError {
remote: Option<SocketAddr>,
error_code: String,
error_message: String,
},
Connected {
after_ms: u32,
remote: Option<SocketAddr>,
},
Reconnecting {
reason: String,
},
Reconnected {
remote: Option<SocketAddr>,
},
Disconnected {
error: Option<String>,
sent_bytes: u64,
received_bytes: u64,
duration_ms: u64,
rtt: F32<2>,
},
SessionStats {
received_bytes: u64,
receive_limit_bitrate: u32,
sent_bytes: u64,
send_est_bitrate: u32,
rtt: u16,
},
}

#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub enum MediaReceiveStreamEvent {
StreamStarted {
name: String,
kind: MediaKind,
remote_peer: String,
remote_stream: String,
},
StreamIssue {
name: String,
kind: MediaKind,
remote_peer: String,
remote_stream: String,
issue: MediaStreamIssueType,
},
StreamStats {
name: String,
kind: MediaKind,
limit_bitrate: u32,
received_bytes: u64,
freeze: bool,
mos: Option<F32<2>>,
rtt: Option<u32>,
jitter: Option<F32<2>>,
lost: Option<F32<2>>,
},
StreamEnded {
name: String,
kind: MediaKind,
sent_bytes: u64,
freeze_count: u32,
duration_ms: u64,
mos: Option<(F32<2>, F32<2>, F32<2>)>,
rtt: Option<(F32<2>, F32<2>, F32<2>)>,
jitter: Option<(F32<2>, F32<2>, F32<2>)>,
lost: Option<(F32<2>, F32<2>, F32<2>)>,
},
}

#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub enum MediaSendStreamEvent {
StreamStarted {
name: String,
kind: MediaKind,
meta: String,
scaling: String,
},
StreamIssue {
name: String,
kind: MediaKind,
issue: MediaStreamIssueType,
},
StreamStats {
name: String,
kind: MediaKind,
sent_bytes: u64,
freeze: bool,
mos: Option<F32<2>>,
rtt: Option<u32>,
jitter: Option<F32<2>>,
lost: Option<F32<2>>,
},
StreamEnded {
name: String,
kind: MediaKind,
received_bytes: u64,
duration_ms: u64,
freeze_count: u32,
mos: Option<(F32<2>, F32<2>, F32<2>)>,
rtt: Option<(F32<2>, F32<2>, F32<2>)>,
jitter: Option<(F32<2>, F32<2>, F32<2>)>,
lost: Option<(F32<2>, F32<2>, F32<2>)>,
},
}

#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize, IntoVecU8, TryFromSliceU8)]
pub enum MediaEndpointLogRequest {
SessionEvent {
ip: String,
version: Option<String>,
location: Option<(F32<2>, F32<2>)>,
token: Vec<u8>,
ts: u64,
session_uuid: u64,
event: MediaEndpointEvent,
},
ReceiveStreamEvent {
token: Vec<u8>,
ts: u64,
session_uuid: u64,
event: MediaReceiveStreamEvent,
},
SendStreamEvent {
token: Vec<u8>,
ts: u64,
session_uuid: u64,
event: MediaSendStreamEvent,
},
}

#[derive(PartialEq, Clone, Debug, Serialize, Deserialize, IntoVecU8, TryFromSliceU8)]
pub struct MediaEndpointLogResponse {}
3 changes: 3 additions & 0 deletions packages/cluster/src/define/rpc/webrtc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ pub struct WebrtcConnectRequestSender {

#[derive(Debug, Serialize, Deserialize, Object, PartialEq, Eq, IntoVecU8, TryFromSliceU8, Clone)]
pub struct WebrtcConnectRequest {
pub session_uuid: Option<u64>,
pub ip_addr: Option<String>,
pub user_agent: Option<String>,
pub version: Option<String>,
pub room: String,
pub peer: String,
Expand Down
3 changes: 3 additions & 0 deletions packages/cluster/src/define/rpc/whep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ use serde::{Deserialize, Serialize};

#[derive(Debug, Serialize, Deserialize, Object, PartialEq, Eq, IntoVecU8, TryFromSliceU8, Clone)]
pub struct WhepConnectRequest {
pub session_uuid: u64,
pub ip_addr: String,
pub user_agent: String,
pub token: String,
pub sdp: Option<String>,
pub compressed_sdp: Option<Vec<u8>>,
Expand Down
3 changes: 3 additions & 0 deletions packages/cluster/src/define/rpc/whip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ use serde::{Deserialize, Serialize};

#[derive(Debug, Serialize, Deserialize, Object, PartialEq, Eq, IntoVecU8, TryFromSliceU8, Clone)]
pub struct WhipConnectRequest {
pub session_uuid: u64,
pub ip_addr: String,
pub user_agent: String,
pub token: String,
pub sdp: Option<String>,
pub compressed_sdp: Option<Vec<u8>>,
Expand Down
27 changes: 22 additions & 5 deletions packages/cluster/src/implement/endpoint.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
use std::collections::HashMap;

use crate::{
generate_cluster_track_uuid, ClusterEndpoint, ClusterEndpointError, ClusterEndpointIncomingEvent, ClusterEndpointOutgoingEvent, ClusterLocalTrackIncomingEvent, ClusterLocalTrackOutgoingEvent,
ClusterRemoteTrackIncomingEvent, ClusterRemoteTrackOutgoingEvent,
generate_cluster_track_uuid,
rpc::{connector::MediaEndpointLogResponse, RPC_MEDIA_ENDPOINT_LOG},
ClusterEndpoint, ClusterEndpointError, ClusterEndpointIncomingEvent, ClusterEndpointOutgoingEvent, ClusterLocalTrackIncomingEvent, ClusterLocalTrackOutgoingEvent, ClusterRemoteTrackIncomingEvent,
ClusterRemoteTrackOutgoingEvent, CONNECTOR_SERVICE,
};
use async_std::channel::{bounded, Receiver, Sender};
use atm0s_sdn::{ChannelUuid, ConsumerRaw, Feedback, FeedbackType, KeyId, KeySource, KeyValueSdk, KeyVersion, LocalSubId, NodeId, NumberInfo, PublisherRaw, PubsubSdk, SubKeyId, ValueType};
use atm0s_sdn::{
ChannelUuid, ConsumerRaw, Feedback, FeedbackType, KeyId, KeySource, KeyValueSdk, KeyVersion, LocalSubId, NodeId, NumberInfo, PublisherRaw, PubsubSdk, RouteRule, RpcEmitter, SubKeyId, ValueType,
};
use bytes::Bytes;
use futures::{select, FutureExt};
use media_utils::hash_str;
use media_utils::{hash_str, ErrorDebugger};
use transport::RequestKeyframeKind;

use super::types::{from_room_value, to_room_key, to_room_value, TrackData};
Expand Down Expand Up @@ -50,10 +54,11 @@ pub struct ClusterEndpointSdn {
peer_sub: HashMap<String, ()>,
track_pub: HashMap<ChannelUuid, (u16, PublisherRaw)>,
remote_track_cached: HashMap<u64, (String, String)>,
rpc_emitter: RpcEmitter,
}

impl ClusterEndpointSdn {
pub(crate) fn new(room_id: &str, peer_id: &str, pubsub_sdk: PubsubSdk, kv_sdk: KeyValueSdk) -> Self {
pub(crate) fn new(room_id: &str, peer_id: &str, pubsub_sdk: PubsubSdk, kv_sdk: KeyValueSdk, rpc_emitter: RpcEmitter) -> Self {
let (kv_tx, kv_rx) = bounded(100);
let (data_tx, data_rx) = bounded(1000);
let (data_fb_tx, data_fb_rx) = bounded(100);
Expand All @@ -77,6 +82,7 @@ impl ClusterEndpointSdn {
peer_sub: Default::default(),
track_pub: Default::default(),
remote_track_cached: Default::default(),
rpc_emitter,
}
}

Expand Down Expand Up @@ -249,6 +255,17 @@ impl ClusterEndpoint for ClusterEndpointSdn {
}
}
}
ClusterEndpointOutgoingEvent::MediaEndpointLog(event) => {
log::info!("[Atm0sClusterEndpoint] log event {:?}", event);
let emitter = self.rpc_emitter.clone();
async_std::task::spawn_local(async move {
emitter
.request::<_, MediaEndpointLogResponse>(CONNECTOR_SERVICE, RouteRule::ToService(0), RPC_MEDIA_ENDPOINT_LOG, event, 5000)
.await
.log_error("Should ok");
});
Ok(())
}
}
}

Expand Down
6 changes: 4 additions & 2 deletions packages/cluster/src/implement/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::Cluster;
use atm0s_sdn::{
convert_enum, KeyValueBehavior, KeyValueBehaviorEvent, KeyValueHandlerEvent, KeyValueSdk, KeyValueSdkEvent, LayersSpreadRouterSyncBehavior, LayersSpreadRouterSyncBehaviorEvent,
LayersSpreadRouterSyncHandlerEvent, ManualBehavior, ManualBehaviorConf, ManualBehaviorEvent, ManualHandlerEvent, NetworkPlane, NetworkPlaneConfig, NodeAddr, NodeAddrBuilder, NodeId, Protocol,
PubsubSdk, PubsubServiceBehaviour, PubsubServiceBehaviourEvent, PubsubServiceHandlerEvent, RpcBox, SharedRouter, SystemTimer, UdpTransport,
PubsubSdk, PubsubServiceBehaviour, PubsubServiceBehaviourEvent, PubsubServiceHandlerEvent, RpcBox, RpcEmitter, SharedRouter, SystemTimer, UdpTransport,
};

use super::{endpoint, rpc::RpcEndpointSdn};
Expand Down Expand Up @@ -39,6 +39,7 @@ pub struct ServerSdn {
join_handler: Option<async_std::task::JoinHandle<()>>,
pubsub_sdk: PubsubSdk,
kv_sdk: KeyValueSdk,
rpc_emitter: RpcEmitter,
}

impl ServerSdn {
Expand Down Expand Up @@ -92,6 +93,7 @@ impl ServerSdn {
pubsub_sdk,
kv_sdk,
join_handler: Some(join_handler),
rpc_emitter: rpc_box.emitter(),
},
RpcEndpointSdn { rpc_box },
)
Expand All @@ -104,7 +106,7 @@ impl Cluster<endpoint::ClusterEndpointSdn> for ServerSdn {
}

fn build(&mut self, room_id: &str, peer_id: &str) -> endpoint::ClusterEndpointSdn {
endpoint::ClusterEndpointSdn::new(room_id, peer_id, self.pubsub_sdk.clone(), self.kv_sdk.clone())
endpoint::ClusterEndpointSdn::new(room_id, peer_id, self.pubsub_sdk.clone(), self.kv_sdk.clone(), self.rpc_emitter.clone())
}
}

Expand Down
Loading

0 comments on commit 51e4228

Please sign in to comment.