Skip to content

Commit

Permalink
change gateway code as RFC-0003
Browse files Browse the repository at this point in the history
  • Loading branch information
giangndm committed Feb 4, 2024
1 parent a74da8a commit 91af3e9
Show file tree
Hide file tree
Showing 24 changed files with 374 additions and 276 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,19 +127,19 @@ After that, we can access `http://localhost:3000/samples` to see all embedded sa

In cluster mode, each module needs to be on a separate node. This setup can run on a single machine or multiple machines, whether they are connected via a public or private network.

The Inner-Gateway module routes user traffic to the most suitable media server node.
The Gateway node routes user traffic to the most suitable media server node.
```bash
atm0s-media-server --node-id 10 --sdn-port 10010 --http-port 3000 gateway
```

Afterward, the gateway prints out its address in the format: 10@/ip4/127.0.0.1/udp/10001/ip4/127.0.0.1/tcp/10001. This address serves as the seed node for other nodes joining the cluster.
Afterward, the gateway prints out its address in the format: 10@/ip4/127.0.0.1/udp/10010/ip4/127.0.0.1/tcp/10010. This address serves as the seed node for other nodes joining the cluster.

The WebRTC module serves users with either an SDK or a Whip, Whep client.
The WebRTC node serves users with either an SDK or a Whip, Whep client.
```bash
atm0s-media-server --node-id 21 --http-port 3001 --seeds ABOVE_GATEWAY_ADDR webrtc
```

The RTMP module serves users with an RTMP broadcaster such as OBS or Streamyard.
The RTMP node serves users with an RTMP broadcaster such as OBS or Streamyard.
```bash
atm0s-media-server --node-id 30 --seeds ABOVE_GATEWAY_ADDR rtmp
```
Expand Down
3 changes: 1 addition & 2 deletions packages/cluster/src/define/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ where
fn build(&mut self, room_id: &str, peer_id: &str) -> C;
}

pub const GLOBAL_GATEWAY_SERVICE: u8 = 100;
pub const INNER_GATEWAY_SERVICE: u8 = 101;
pub const GATEWAY_SERVICE: u8 = 101;
pub const MEDIA_SERVER_SERVICE: u8 = 102;
pub const CONNECTOR_SERVICE: u8 = 103;
3 changes: 2 additions & 1 deletion packages/cluster/src/define/rpc/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub struct ServiceInfo {
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, IntoVecU8, TryFromSliceU8)]
pub struct NodePing {
pub node_id: u32,
pub group: String,
pub zone: String,
pub location: Option<(F32<2>, F32<2>)>,
pub webrtc: Option<ServiceInfo>,
pub rtmp: Option<ServiceInfo>,
Expand Down Expand Up @@ -54,6 +54,7 @@ pub struct QueryBestNodesRequest {
#[derive(Debug, Serialize, Deserialize, Object, PartialEq, Eq, IntoVecU8, TryFromSliceU8, Clone)]
pub struct QueryBestNodesResponse {
pub nodes: Vec<NodeId>,
pub service_id: u8,
}

//TODO test this
8 changes: 4 additions & 4 deletions packages/cluster/src/implement/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ mod tests {

#[async_std::test]
async fn subscribe_room() {
let (mut server, _rpc) = ServerSdn::new(
let (mut server, _rpc, _pubsub) = ServerSdn::new(
1,
0,
100,
Expand Down Expand Up @@ -104,7 +104,7 @@ mod tests {

#[async_std::test]
async fn subscribe_peer() {
let (mut server, _rpc) = ServerSdn::new(
let (mut server, _rpc, _pubsub) = ServerSdn::new(
2,
0,
100,
Expand Down Expand Up @@ -182,7 +182,7 @@ mod tests {

#[async_std::test]
async fn subscribe_stream() {
let (mut server, _rpc) = ServerSdn::new(
let (mut server, _rpc, _pubsub) = ServerSdn::new(
3,
0,
100,
Expand Down Expand Up @@ -258,7 +258,7 @@ mod tests {

#[async_std::test]
async fn rpc() {
let (_server, mut rpc) = ServerSdn::new(
let (_server, mut rpc, _pubsub) = ServerSdn::new(
4,
0,
100,
Expand Down
5 changes: 3 additions & 2 deletions packages/cluster/src/implement/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub struct ServerSdn {
}

impl ServerSdn {
pub async fn new(node_id: NodeId, port: u16, service_id: u8, config: ServerSdnConfig) -> (Self, RpcEndpointSdn) {
pub async fn new(node_id: NodeId, port: u16, service_id: u8, config: ServerSdnConfig) -> (Self, RpcEndpointSdn, PubsubSdk) {
let mut node_addr_builder = NodeAddrBuilder::new(node_id);
let udp_socket = UdpTransport::prepare(port, &mut node_addr_builder).await;
let tcp_listener = TcpTransport::prepare(port, &mut node_addr_builder).await;
Expand Down Expand Up @@ -103,12 +103,13 @@ impl ServerSdn {
Self {
node_id,
node_addr: node_addr_builder.addr(),
pubsub_sdk,
pubsub_sdk: pubsub_sdk.clone(),
kv_sdk,
join_handler: Some(join_handler),
rpc_emitter: rpc_box.emitter(),
},
RpcEndpointSdn { rpc_box },
pubsub_sdk,
)
}
}
Expand Down
1 change: 1 addition & 0 deletions servers/media-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ md5 = {version = "0.7.0", optional = true }
rand = "0.8.5"
yaque = { version = "0.6.6", optional = true }
maxminddb = { version = "0.24.0", optional = true }
bincode = { version = "1" }

[dev-dependencies]
md5 = "0.7.0"
Expand Down
9 changes: 9 additions & 0 deletions servers/media-server/scripts/gateway.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
cargo run --package atm0s-media-server -- \
--node-id 11 \
--http-port 8011 \
--sdn-port 10011 \
--sdn-zone zone1 \
gateway \
--lat 37.7749 \
--lng 122.4194 \
--geoip-db ../../../maxminddb-data/GeoLite2-City.mmdb
7 changes: 0 additions & 7 deletions servers/media-server/scripts/gateway_global.sh

This file was deleted.

11 changes: 0 additions & 11 deletions servers/media-server/scripts/gateway_inner.sh

This file was deleted.

10 changes: 10 additions & 0 deletions servers/media-server/scripts/gateway_other.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
cargo run --package atm0s-media-server -- \
--node-id 12 \
--http-port 8012 \
--sdn-port 10012 \
--sdn-zone zone2 \
--seeds 11@/ip4/127.0.0.1/udp/10011/ip4/127.0.0.1/tcp/10011 \
gateway \
--lat 47.7749 \
--lng 112.4194 \
--geoip-db ../../../maxminddb-data/GeoLite2-City.mmdb
2 changes: 1 addition & 1 deletion servers/media-server/scripts/media_rtmp.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@ cargo run --package atm0s-media-server -- \
--node-id 21 \
--http-port 8021 \
--sdn-port 10021 \
--sdn-group group1 \
--sdn-zone zone1 \
--seeds 11@/ip4/127.0.0.1/udp/10011/ip4/127.0.0.1/tcp/10011 \
rtmp
2 changes: 1 addition & 1 deletion servers/media-server/scripts/media_sip.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@ cargo run --package atm0s-media-server -- \
--node-id 31 \
--http-port 8031 \
--sdn-port 10031 \
--sdn-group group1 \
--sdn-zone zone1 \
--seeds 11@/ip4/127.0.0.1/udp/10011/ip4/127.0.0.1/tcp/10011 \
sip --addr 127.0.0.1:5060
2 changes: 1 addition & 1 deletion servers/media-server/scripts/media_webrtc.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@ cargo run --package atm0s-media-server -- \
--node-id 41 \
--http-port 8041 \
--sdn-port 10041 \
--sdn-group group1 \
--sdn-zone zone1 \
--seeds 11@/ip4/127.0.0.1/udp/10011/ip4/127.0.0.1/tcp/10011 \
webrtc
56 changes: 21 additions & 35 deletions servers/media-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ mod server;
use cluster::{
atm0s_sdn::SystemTimer,
implement::{NodeAddr, NodeId, ServerSdn, ServerSdnConfig},
CONNECTOR_SERVICE, GLOBAL_GATEWAY_SERVICE, INNER_GATEWAY_SERVICE, MEDIA_SERVER_SERVICE,
CONNECTOR_SERVICE, GATEWAY_SERVICE, MEDIA_SERVER_SERVICE,
};

#[cfg(feature = "connector")]
Expand All @@ -26,8 +26,6 @@ use server::webrtc::run_webrtc_server;

use tracing_subscriber::{fmt, prelude::*, EnvFilter};

use crate::server::gateway::GatewayMode;

/// Media Server
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
Expand All @@ -44,9 +42,9 @@ struct Args {
#[arg(env, long, default_value_t = 0)]
sdn_port: u16,

/// Sdn group
/// Sdn Zone
#[arg(env, long, default_value = "local")]
sdn_group: String,
sdn_zone: String,

/// Current Node ID
#[arg(env, long, default_value_t = 1)]
Expand Down Expand Up @@ -105,76 +103,64 @@ async fn main() {
#[cfg(feature = "gateway")]
Servers::Gateway(opts) => {
use server::MediaServerContext;
match opts.mode {
GatewayMode::Global => {
config.local_tags = vec!["gateway-global".to_string()];
config.connect_tags = vec!["gateway-global".to_string()];
}
GatewayMode::Inner => {
config.local_tags = vec![format!("gateway-inner-{}", args.sdn_group)];
config.connect_tags = vec!["gateway-global".to_string(), format!("gateway-inner-{}", args.sdn_group)];
}
}
config.local_tags = vec![format!("gateway-zone-{}", args.sdn_zone), "gateway".to_string()];
config.connect_tags = vec!["gateway-global".to_string()];

let token = Arc::new(cluster::implement::jwt_static::JwtStaticToken::new(&args.secret));
let ctx = MediaServerContext::<()>::new(args.node_id, 0, Arc::new(SystemTimer()), token.clone(), token);
let rpc_service_id = match opts.mode {
GatewayMode::Inner => INNER_GATEWAY_SERVICE,
GatewayMode::Global => GLOBAL_GATEWAY_SERVICE,
};
let (cluster, rpc_endpoint) = ServerSdn::new(args.node_id, args.sdn_port, rpc_service_id, config).await;
if let Err(e) = run_gateway_server(args.http_port, args.http_tls, opts, ctx, cluster, rpc_endpoint).await {
let (cluster, rpc_endpoint, pubsub) = ServerSdn::new(args.node_id, args.sdn_port, GATEWAY_SERVICE, config).await;
if let Err(e) = run_gateway_server(args.http_port, args.http_tls, &args.sdn_zone, opts, ctx, cluster, rpc_endpoint, pubsub).await {
log::error!("[GatewayServer] error {}", e);
}
}
#[cfg(feature = "webrtc")]
Servers::Webrtc(opts) => {
use server::MediaServerContext;
config.local_tags = vec![format!("media-webrtc-{}", args.sdn_group)];
config.connect_tags = vec![format!("gateway-inner-{}", args.sdn_group)];
config.local_tags = vec![format!("media-webrtc-{}", args.sdn_zone)];
config.connect_tags = vec![format!("gateway-zone-{}", args.sdn_zone)];

let token = Arc::new(cluster::implement::jwt_static::JwtStaticToken::new(&args.secret));
let ctx = MediaServerContext::new(args.node_id, opts.max_conn, Arc::new(SystemTimer()), token.clone(), token);
let (cluster, rpc_endpoint) = ServerSdn::new(args.node_id, args.sdn_port, MEDIA_SERVER_SERVICE, config).await;
if let Err(e) = run_webrtc_server(args.http_port, args.http_tls, opts, ctx, cluster, rpc_endpoint).await {
let (cluster, rpc_endpoint, _pubsub) = ServerSdn::new(args.node_id, args.sdn_port, MEDIA_SERVER_SERVICE, config).await;
if let Err(e) = run_webrtc_server(args.http_port, args.http_tls, &args.sdn_zone, opts, ctx, cluster, rpc_endpoint).await {
log::error!("[WebrtcServer] error {}", e);
}
}
#[cfg(feature = "rtmp")]
Servers::Rtmp(opts) => {
use server::MediaServerContext;
config.local_tags = vec![format!("media-rtmp-{}", args.sdn_group)];
config.connect_tags = vec![format!("gateway-inner-{}", args.sdn_group)];
config.local_tags = vec![format!("media-rtmp-{}", args.sdn_zone)];
config.connect_tags = vec![format!("gateway-zone-{}", args.sdn_zone)];

let token = Arc::new(cluster::implement::jwt_static::JwtStaticToken::new(&args.secret));
let ctx = MediaServerContext::new(args.node_id, opts.max_conn, Arc::new(SystemTimer()), token.clone(), token);
let (cluster, rpc_endpoint) = ServerSdn::new(args.node_id, args.sdn_port, MEDIA_SERVER_SERVICE, config).await;
if let Err(e) = run_rtmp_server(args.http_port, args.http_tls, opts, ctx, cluster, rpc_endpoint).await {
let (cluster, rpc_endpoint, _pubsub) = ServerSdn::new(args.node_id, args.sdn_port, MEDIA_SERVER_SERVICE, config).await;
if let Err(e) = run_rtmp_server(args.http_port, args.http_tls, &args.sdn_zone, opts, ctx, cluster, rpc_endpoint).await {
log::error!("[RtmpServer] error {}", e);
}
}
#[cfg(feature = "sip")]
Servers::Sip(opts) => {
use server::MediaServerContext;
config.local_tags = vec![format!("media-sip-{}", args.sdn_group)];
config.connect_tags = vec![format!("gateway-inner-{}", args.sdn_group)];
config.local_tags = vec![format!("media-sip-{}", args.sdn_zone)];
config.connect_tags = vec![format!("gateway-zone-{}", args.sdn_zone)];

let token = Arc::new(cluster::implement::jwt_static::JwtStaticToken::new(&args.secret));
let ctx = MediaServerContext::new(args.node_id, opts.max_conn, Arc::new(SystemTimer()), token.clone(), token);
let (cluster, rpc_endpoint) = ServerSdn::new(args.node_id, args.sdn_port, MEDIA_SERVER_SERVICE, config).await;
let (cluster, rpc_endpoint, _pubsub) = ServerSdn::new(args.node_id, args.sdn_port, MEDIA_SERVER_SERVICE, config).await;
if let Err(e) = run_sip_server(args.http_port, args.http_tls, opts, ctx, cluster, rpc_endpoint).await {
log::error!("[RtmpServer] error {}", e);
}
}
#[cfg(feature = "connector")]
Servers::Connector(opts) => {
use server::MediaServerContext;
config.local_tags = vec![format!("connector-{}", args.sdn_group)];
config.connect_tags = vec![format!("gateway-inner-{}", args.sdn_group)];
config.local_tags = vec![format!("connector-{}", args.sdn_zone)];
config.connect_tags = vec![format!("gateway-zone-{}", args.sdn_zone)];

let token = Arc::new(cluster::implement::jwt_static::JwtStaticToken::new(&args.secret));
let ctx = MediaServerContext::new(args.node_id, opts.max_conn, Arc::new(SystemTimer()), token.clone(), token);
let (cluster, rpc_endpoint) = ServerSdn::new(args.node_id, args.sdn_port, CONNECTOR_SERVICE, config).await;
let (cluster, rpc_endpoint, _pubsub) = ServerSdn::new(args.node_id, args.sdn_port, CONNECTOR_SERVICE, config).await;
if let Err(e) = run_connector_server(args.http_port, args.http_tls, opts, ctx, cluster, rpc_endpoint).await {
log::error!("[ConnectorServer] error {}", e);
}
Expand Down
Loading

0 comments on commit 91af3e9

Please sign in to comment.