Skip to content

Commit

Permalink
feat: gateway global (#185)
Browse files Browse the repository at this point in the history
This PR add global gateway mode, which are used to route user connect request to closest zone
  • Loading branch information
giangndm authored Jan 24, 2024
1 parent 00c35a5 commit 3e64300
Show file tree
Hide file tree
Showing 39 changed files with 921 additions and 193 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,8 @@ jobs:
needs: build-release
runs-on: ubuntu-latest
steps:
- name: Download maxminddb
run: sh download-geodata.sh
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
- name: Set up Docker Buildx
Expand Down
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
/target
tarpaulin-report.html
.atm0s
.atm0s
/maxminddb-data
.vscode
22 changes: 22 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ RUN case $TARGETPLATFORM in \

FROM ubuntu:22.04

COPY maxminddb-data /maxminddb-data
COPY --from=base /atm0s-media-server /atm0s-media-server

ENTRYPOINT ["/atm0s-media-server"]
11 changes: 11 additions & 0 deletions docs/servers/gateway.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Gateway Server

The Gateway Server holds a list of resources. There are two types of gateways: zone-level gateways, which hold all servers within their zone, and global gateways, which hold all zone gateways. Gateways act as routers, directing user requests to the best or destination node based on the request type and parameters.

### Inner Gateway

This gateway is used to route a request to the best node inside a zone.

### Global Gateway

This gateway is used to route a request to the closest zone to the user. To determine the user's location, we use the free maxmind-db geo-ip. You can download it by running the script `download-geodata.sh`.
4 changes: 4 additions & 0 deletions download-geodata.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
mkdir -p maxminddb-data
cd maxminddb-data
rm -i GeoLite2-City.mmdb
wget https://github.com/P3TERX/GeoLite.mmdb/raw/download/GeoLite2-City.mmdb || { echo "Download GeoLite2-City database failed"; exit 1; }
21 changes: 20 additions & 1 deletion packages/cluster/src/define/rpc/gateway.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
use std::net::SocketAddr;
use std::net::{IpAddr, SocketAddr};

use atm0s_sdn::NodeId;
use media_utils::F32;
use poem_openapi::Object;
use proc_macro::{IntoVecU8, TryFromSliceU8};
use serde::{Deserialize, Serialize};

use super::general::MediaSessionProtocol;

#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct ServiceInfo {
pub usage: u8,
Expand All @@ -15,6 +20,8 @@ pub struct ServiceInfo {
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, IntoVecU8, TryFromSliceU8)]
pub struct NodePing {
pub node_id: u32,
pub group: String,
pub location: Option<(F32<2>, F32<2>)>,
pub webrtc: Option<ServiceInfo>,
pub rtmp: Option<ServiceInfo>,
pub sip: Option<ServiceInfo>,
Expand All @@ -37,4 +44,16 @@ pub struct NodeHealthcheckResponse {
pub success: bool,
}

#[derive(Debug, Serialize, Deserialize, Object, PartialEq, Eq, IntoVecU8, TryFromSliceU8, Clone)]
pub struct QueryBestNodesRequest {
pub ip_addr: IpAddr,
pub protocol: MediaSessionProtocol,
pub size: usize,
}

#[derive(Debug, Serialize, Deserialize, Object, PartialEq, Eq, IntoVecU8, TryFromSliceU8, Clone)]
pub struct QueryBestNodesResponse {
pub nodes: Vec<NodeId>,
}

//TODO test this
4 changes: 2 additions & 2 deletions packages/cluster/src/define/rpc/general.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use poem_openapi::Object;
use poem_openapi::{Enum, Object};
use proc_macro::{IntoVecU8, TryFromSliceU8};
use serde::{Deserialize, Serialize};

Expand All @@ -12,7 +12,7 @@ pub struct MediaEndpointCloseResponse {
pub success: bool,
}

#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Copy)]
#[derive(Debug, Serialize, Deserialize, Enum, PartialEq, Eq, Clone, Copy)]
pub enum MediaSessionProtocol {
Whip,
Whep,
Expand Down
6 changes: 3 additions & 3 deletions packages/cluster/src/define/rpc/webrtc.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::fmt::Debug;
use std::{fmt::Debug, net::IpAddr};

use crate::{ClusterEndpointPublishScope, ClusterEndpointSubscribeScope, MediaSessionToken, VerifyObject};

Expand Down Expand Up @@ -26,8 +26,8 @@ pub struct WebrtcConnectRequestSender {
#[derive(Debug, Serialize, Deserialize, Object, PartialEq, Eq, IntoVecU8, TryFromSliceU8, Clone)]
pub struct WebrtcConnectRequest {
pub session_uuid: Option<u64>,
pub ip_addr: Option<String>,
pub user_agent: Option<String>,
pub ip_addr: IpAddr,
pub user_agent: String,
pub version: Option<String>,
pub room: String,
pub peer: String,
Expand Down
4 changes: 3 additions & 1 deletion packages/cluster/src/define/rpc/whep.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::net::IpAddr;

use proc_macro::{IntoVecU8, TryFromSliceU8};
use serde::{Deserialize, Serialize};

Expand All @@ -6,7 +8,7 @@ use crate::VerifyObject;
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, IntoVecU8, TryFromSliceU8, Clone)]
pub struct WhepConnectRequest {
pub session_uuid: u64,
pub ip_addr: String,
pub ip_addr: IpAddr,
pub user_agent: String,
pub token: String,
pub sdp: Option<String>,
Expand Down
4 changes: 3 additions & 1 deletion packages/cluster/src/define/rpc/whip.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::net::IpAddr;

use proc_macro::{IntoVecU8, TryFromSliceU8};
use serde::{Deserialize, Serialize};

Expand All @@ -6,7 +8,7 @@ use crate::{MediaSessionToken, VerifyObject};
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, IntoVecU8, TryFromSliceU8, Clone)]
pub struct WhipConnectRequest {
pub session_uuid: u64,
pub ip_addr: String,
pub ip_addr: IpAddr,
pub user_agent: String,
pub token: String,
pub sdp: Option<String>,
Expand Down
8 changes: 8 additions & 0 deletions packages/cluster/src/implement/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ mod tests {
ServerSdnConfig {
secret: "static_key".to_string(),
seeds: vec![],
connect_tags: vec!["local".to_string()],
local_tags: vec!["local".to_string()],
},
)
.await;
Expand Down Expand Up @@ -109,6 +111,8 @@ mod tests {
ServerSdnConfig {
secret: "static_key".to_string(),
seeds: vec![],
connect_tags: vec!["local".to_string()],
local_tags: vec!["local".to_string()],
},
)
.await;
Expand Down Expand Up @@ -185,6 +189,8 @@ mod tests {
ServerSdnConfig {
secret: "static_key".to_string(),
seeds: vec![],
connect_tags: vec!["local".to_string()],
local_tags: vec!["local".to_string()],
},
)
.await;
Expand Down Expand Up @@ -259,6 +265,8 @@ mod tests {
ServerSdnConfig {
secret: "static_key".to_string(),
seeds: vec![],
connect_tags: vec!["local".to_string()],
local_tags: vec!["local".to_string()],
},
)
.await;
Expand Down
6 changes: 4 additions & 2 deletions packages/cluster/src/implement/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ pub(crate) enum NodeSdkEvent {
pub struct ServerSdnConfig {
pub secret: String,
pub seeds: Vec<NodeAddr>,
pub local_tags: Vec<String>,
pub connect_tags: Vec<String>,
}

compose_transport!(UdpTcpTransport, udp: UdpTransport, tcp: TcpTransport);
Expand Down Expand Up @@ -64,8 +66,8 @@ impl ServerSdn {
let manual = ManualBehavior::new(ManualBehaviorConf {
node_id,
node_addr: node_addr_builder.addr(),
local_tags: vec!["media-server".to_string()],
connect_tags: vec!["inner-gateway".to_string()],
local_tags: config.local_tags,
connect_tags: config.connect_tags,
seeds: config.seeds,
});

Expand Down
2 changes: 1 addition & 1 deletion packages/endpoint/src/endpoint/internal/local_track.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl LocalTrack {
}

if let Some(pkt) = self.filter.process(now_ms, pkt) {
log::info!("[LocalTrack {}] media from cluster pkt {:?} {}", self.track_name, pkt.codec, pkt.seq_no);
log::debug!("[LocalTrack {}] media from cluster pkt {:?} {}", self.track_name, pkt.codec, pkt.seq_no);
self.out_actions.push_back(LocalTrackOutput::Transport(LocalTrackOutgoingEvent::MediaPacket(pkt)));
}
}
Expand Down
2 changes: 1 addition & 1 deletion packages/media-utils/src/float.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use serde::{Deserialize, Serialize};

/// This store float f32 as u32, and can be used as key in HashMap
/// ACCURACY is the number of digits after the decimal point
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
pub struct F32<const ACCURACY: usize>(u32);

impl<const ACCURACY: usize> F32<ACCURACY> {
Expand Down
3 changes: 2 additions & 1 deletion servers/media-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ reqwest = { version = "0.11.23", features = ["default-tls", "json"], optional =
md5 = {version = "0.7.0", optional = true }
rand = "0.8.5"
yaque = { version = "0.6.6", optional = true }
maxminddb = { version = "0.24.0", optional = true }

[dev-dependencies]
md5 = "0.7.0"
Expand All @@ -46,6 +47,6 @@ embed-samples = ["rust-embed"]
webrtc = ["transport-webrtc"]
rtmp = ["transport-rtmp"]
sip = ["rsip", "transport-sip", "reqwest", "md5"]
gateway = []
gateway = ["maxminddb"]
connector = ["nats", "prost", "yaque"]
token_generate = []
Empty file.
7 changes: 7 additions & 0 deletions servers/media-server/scripts/gateway_global.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
cargo run --package atm0s-media-server -- \
--node-id 1 \
--http-port 8001 \
--sdn-port 10001 \
gateway \
--mode global \
--geoip-db ../../../maxminddb-data/GeoLite2-City.mmdb
11 changes: 11 additions & 0 deletions servers/media-server/scripts/gateway_inner.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
cargo run --package atm0s-media-server -- \
--node-id 11 \
--http-port 8011 \
--sdn-port 10011 \
--sdn-group group1 \
--seeds 1@/ip4/127.0.0.1/udp/10001/ip4/127.0.0.1/tcp/10001 \
gateway \
--mode inner \
--group local \
--lat 37.7749 \
--lng 122.4194
7 changes: 7 additions & 0 deletions servers/media-server/scripts/media_rtmp.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
cargo run --package atm0s-media-server -- \
--node-id 21 \
--http-port 8021 \
--sdn-port 10021 \
--sdn-group group1 \
--seeds 11@/ip4/127.0.0.1/udp/10011/ip4/127.0.0.1/tcp/10011 \
rtmp
7 changes: 7 additions & 0 deletions servers/media-server/scripts/media_sip.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
cargo run --package atm0s-media-server -- \
--node-id 31 \
--http-port 8031 \
--sdn-port 10031 \
--sdn-group group1 \
--seeds 11@/ip4/127.0.0.1/udp/10011/ip4/127.0.0.1/tcp/10011 \
sip --addr 127.0.0.1:5060
7 changes: 7 additions & 0 deletions servers/media-server/scripts/media_webrtc.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
cargo run --package atm0s-media-server -- \
--node-id 41 \
--http-port 8041 \
--sdn-port 10041 \
--sdn-group group1 \
--seeds 11@/ip4/127.0.0.1/udp/10011/ip4/127.0.0.1/tcp/10011 \
webrtc
Loading

0 comments on commit 3e64300

Please sign in to comment.