Skip to content

Commit

Permalink
feat: rtsp proxy (#37)
Browse files Browse the repository at this point in the history
  • Loading branch information
giangndm authored Aug 17, 2024
1 parent 48773a4 commit 795b9fa
Show file tree
Hide file tree
Showing 23 changed files with 470 additions and 196 deletions.
28 changes: 15 additions & 13 deletions crates/agent/examples/benchmark_clients.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use std::{
net::SocketAddr,
sync::Arc,
time::{Duration, Instant},
};

use atm0s_reverse_proxy_agent::{
run_tunnel_connection, Connection, Protocol, QuicConnection, SubConnection, TcpConnection,
run_tunnel_connection, Connection, Protocol, QuicConnection, ServiceRegistry,
SimpleServiceRegistry, SubConnection, TcpConnection,
};
use base64::{engine::general_purpose::URL_SAFE, Engine as _};
use clap::Parser;
Expand Down Expand Up @@ -67,10 +69,14 @@ async fn main() {
.with(tracing_subscriber::EnvFilter::from_default_env())
.init();

let registry = SimpleServiceRegistry::new(args.http_dest, args.https_dest);
let registry = Arc::new(registry);

for client in 0..args.clients {
let args2 = args.clone();
let args_c = args.clone();
let registry = registry.clone();
async_std::task::spawn(async move {
async_std::task::spawn_local(connect(client, args2));
async_std::task::spawn_local(connect(client, args_c, registry));
});
async_std::task::sleep(Duration::from_millis(args.connect_wait_ms)).await;
}
Expand All @@ -80,7 +86,7 @@ async fn main() {
}
}

async fn connect(client: usize, args: Args) {
async fn connect(client: usize, args: Args, registry: Arc<dyn ServiceRegistry>) {
let default_tunnel_cert_buf = include_bytes!("../../../certs/tunnel.cert");
let default_tunnel_cert = CertificateDer::from(default_tunnel_cert_buf.to_vec());

Expand Down Expand Up @@ -112,7 +118,7 @@ async fn connect(client: usize, args: Args) {
conn.response()
);
println!("{client} connected after {:?}", started.elapsed());
run_connection_loop(conn, args.http_dest, args.https_dest).await;
run_connection_loop(conn, registry.clone()).await;
}
Err(e) => {
log::error!("Connect to connector via tcp error: {}", e);
Expand All @@ -134,7 +140,7 @@ async fn connect(client: usize, args: Args) {
conn.response()
);
println!("{client} connected after {:?}", started.elapsed());
run_connection_loop(conn, args.http_dest, args.https_dest).await;
run_connection_loop(conn, registry.clone()).await;
}
Err(e) => {
log::error!("Connect to connector via quic error: {}", e);
Expand All @@ -149,8 +155,7 @@ async fn connect(client: usize, args: Args) {

async fn run_connection_loop<S, R, W>(
mut connection: impl Connection<S, R, W>,
http_dest: SocketAddr,
https_dest: SocketAddr,
registry: Arc<dyn ServiceRegistry>,
) where
S: SubConnection<R, W> + 'static,
R: AsyncRead + Send + Unpin + 'static,
Expand All @@ -160,11 +165,8 @@ async fn run_connection_loop<S, R, W>(
match connection.recv().await {
Ok(sub_connection) => {
log::info!("recv sub_connection");
async_std::task::spawn_local(run_tunnel_connection(
sub_connection,
http_dest,
https_dest,
));
let registry = registry.clone();
async_std::task::spawn_local(run_tunnel_connection(sub_connection, registry));
}
Err(e) => {
log::error!("recv sub_connection error: {}", e);
Expand Down
1 change: 1 addition & 0 deletions crates/agent/node_local_quic.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ RUST_LOG=info cargo run -- \
--connector-addr https://127.0.0.1:13001 \
--http-dest 127.0.0.1:8080 \
--https-dest 127.0.0.1:8443 \
--rtsp-dest 10.10.30.90:554 \
--allow-quic-insecure
1 change: 1 addition & 0 deletions crates/agent/node_local_tcp.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ RUST_LOG=info cargo run -- \
--connector-addr tcp://127.0.0.1:13001 \
--http-dest 127.0.0.1:8080 \
--https-dest 127.0.0.1:8443 \
--rtsp-dest 10.10.30.90:554 \
--allow-quic-insecure
58 changes: 42 additions & 16 deletions crates/agent/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use std::net::SocketAddr;
use std::sync::Arc;

use async_std::io::WriteExt;

use futures::{select, AsyncRead, AsyncReadExt, AsyncWrite, FutureExt};
use local_tunnel::tcp::LocalTcpTunnel;
use protocol::cluster::AgentTunnelRequest;

use crate::local_tunnel::LocalTunnel;

Expand All @@ -15,32 +16,54 @@ pub use connection::{
tcp::{TcpConnection, TcpSubConnection},
Connection, Protocol, SubConnection,
};
pub use local_tunnel::{registry::SimpleServiceRegistry, ServiceRegistry};

pub async fn run_tunnel_connection<S, R, W>(
sub_connection: S,
http_dest: SocketAddr,
https_dest: SocketAddr,
) where
pub async fn run_tunnel_connection<S, R, W>(sub_connection: S, registry: Arc<dyn ServiceRegistry>)
where
S: SubConnection<R, W> + 'static,
R: AsyncRead + Send + Unpin,
W: AsyncWrite + Send + Unpin,
{
log::info!("sub_connection pipe to local_tunnel start");
let (mut reader1, mut writer1) = sub_connection.split();
let mut first_pkt = [0u8; 4096];
let (local_tunnel, first_pkt_len) = match reader1.read(&mut first_pkt).await {
let (local_tunnel, first_pkt_start, first_pkt_end) = match reader1.read(&mut first_pkt).await {
Ok(first_pkt_len) => {
log::info!("first pkt size: {}", first_pkt_len);
if first_pkt_len == 0 {
log::error!("first pkt size is 0 => close");
if first_pkt_len < 2 {
log::error!("first pkt size is < 4 => close");
return;
}
if first_pkt[0] == 0x16 {
log::info!("create tunnel to https dest {}", https_dest);
(LocalTcpTunnel::new(https_dest).await, first_pkt_len)
} else {
log::info!("create tunnel to http dest {}", http_dest);
(LocalTcpTunnel::new(http_dest).await, first_pkt_len)
let handshake_len = u16::from_be_bytes([first_pkt[0], first_pkt[1]]) as usize;
if handshake_len + 2 > first_pkt_len {
log::error!("first pkt size is < handshake {handshake_len} + 2 => close");
return;
}
match AgentTunnelRequest::try_from(&first_pkt[2..(handshake_len + 2)]) {
Ok(handshake) => {
if let Some(dest) =
registry.dest_for(handshake.tls, handshake.service, &handshake.domain)
{
log::info!("create tunnel to dest {}", dest);
(
LocalTcpTunnel::new(dest).await,
handshake_len + 2,
first_pkt_len,
)
} else {
log::warn!(
"dest for service {:?} tls {} domain {} not found",
handshake.service,
handshake.tls,
handshake.domain
);
return;
}
}
Err(e) => {
log::error!("handshake parse error: {}", e);
return;
}
}
}
Err(e) => {
Expand All @@ -59,7 +82,10 @@ pub async fn run_tunnel_connection<S, R, W>(

let (mut reader2, mut writer2) = local_tunnel.split();

if let Err(e) = writer2.write_all(&first_pkt[..first_pkt_len]).await {
if let Err(e) = writer2
.write_all(&first_pkt[first_pkt_start..first_pkt_end])
.await
{
log::error!("write first pkt to local_tunnel error: {}", e);
return;
}
Expand Down
7 changes: 7 additions & 0 deletions crates/agent/src/local_tunnel.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
use std::net::SocketAddr;

use futures::{AsyncRead, AsyncWrite};

pub mod registry;
pub mod tcp;

pub trait LocalTunnel<R: AsyncRead + Unpin, W: AsyncWrite + Unpin>: Send + Sync {
fn split(self) -> (R, W);
}

pub trait ServiceRegistry {
fn dest_for(&self, tls: bool, service: Option<u16>, domain: &str) -> Option<SocketAddr>;
}
40 changes: 40 additions & 0 deletions crates/agent/src/local_tunnel/registry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
use std::{collections::HashMap, net::SocketAddr};

use super::ServiceRegistry;

pub struct SimpleServiceRegistry {
default_tcp: SocketAddr,
default_tls: SocketAddr,
tcp_services: HashMap<u16, SocketAddr>,
tls_services: HashMap<u16, SocketAddr>,
}

impl SimpleServiceRegistry {
pub fn new(default_tcp: SocketAddr, default_tls: SocketAddr) -> Self {
Self {
default_tcp,
default_tls,
tcp_services: HashMap::new(),
tls_services: HashMap::new(),
}
}

pub fn set_tcp_service(&mut self, service: u16, dest: SocketAddr) {
self.tcp_services.insert(service, dest);
}

pub fn set_tls_service(&mut self, service: u16, dest: SocketAddr) {
self.tls_services.insert(service, dest);
}
}

impl ServiceRegistry for SimpleServiceRegistry {
fn dest_for(&self, tls: bool, service: Option<u16>, _domain: &str) -> Option<SocketAddr> {
match (tls, service) {
(false, None) => Some(self.default_tcp),
(true, None) => Some(self.default_tls),
(false, Some(service)) => self.tcp_services.get(&service).cloned(),
(true, Some(service)) => self.tls_services.get(&service).cloned(),
}
}
}
33 changes: 22 additions & 11 deletions crates/agent/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use std::{alloc::System, net::SocketAddr};
use std::{alloc::System, net::SocketAddr, sync::Arc};

use atm0s_reverse_proxy_agent::{
run_tunnel_connection, Connection, Protocol, QuicConnection, SubConnection, TcpConnection,
run_tunnel_connection, Connection, Protocol, QuicConnection, ServiceRegistry,
SimpleServiceRegistry, SubConnection, TcpConnection,
};
use base64::{engine::general_purpose::URL_SAFE, Engine as _};
use clap::Parser;
use futures::{AsyncRead, AsyncWrite};
use protocol::services::SERVICE_RTSP;
use protocol_ed25519::AgentLocalKey;
use rustls::pki_types::CertificateDer;
use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
Expand Down Expand Up @@ -34,6 +36,14 @@ struct Args {
#[arg(env, long, default_value = "127.0.0.1:8443")]
https_dest: SocketAddr,

/// Rtsp proxy dest
#[arg(env, long, default_value = "127.0.0.1:554")]
rtsp_dest: SocketAddr,

/// Sni-https proxy dest
#[arg(env, long, default_value = "127.0.0.1:5443")]
rtsps_dest: SocketAddr,

/// Persistent local key
#[arg(env, long, default_value = "local_key.pem")]
local_key: String,
Expand Down Expand Up @@ -107,6 +117,11 @@ async fn main() {
}
};

let mut registry = SimpleServiceRegistry::new(args.http_dest, args.https_dest);
registry.set_tcp_service(SERVICE_RTSP, args.rtsp_dest);
registry.set_tls_service(SERVICE_RTSP, args.rtsps_dest);
let registry = Arc::new(registry);

loop {
log::info!(
"Connecting to connector... {:?} addr: {}",
Expand All @@ -121,7 +136,7 @@ async fn main() {
"Connected to connector via tcp with res {:?}",
conn.response()
);
run_connection_loop(conn, args.http_dest, args.https_dest).await;
run_connection_loop(conn, registry.clone()).await;
}
Err(e) => {
log::error!("Connect to connector via tcp error: {}", e);
Expand All @@ -142,7 +157,7 @@ async fn main() {
"Connected to connector via quic with res {:?}",
conn.response()
);
run_connection_loop(conn, args.http_dest, args.https_dest).await;
run_connection_loop(conn, registry.clone()).await;
}
Err(e) => {
log::error!("Connect to connector via quic error: {}", e);
Expand All @@ -157,8 +172,7 @@ async fn main() {

pub async fn run_connection_loop<S, R, W>(
mut connection: impl Connection<S, R, W>,
http_dest: SocketAddr,
https_dest: SocketAddr,
registry: Arc<dyn ServiceRegistry>,
) where
S: SubConnection<R, W> + 'static,
R: AsyncRead + Send + Unpin + 'static,
Expand All @@ -168,11 +182,8 @@ pub async fn run_connection_loop<S, R, W>(
match connection.recv().await {
Ok(sub_connection) => {
log::info!("recv sub_connection");
async_std::task::spawn_local(run_tunnel_connection(
sub_connection,
http_dest,
https_dest,
));
let registry = registry.clone();
async_std::task::spawn_local(run_tunnel_connection(sub_connection, registry));
}
Err(e) => {
log::error!("recv sub_connection error: {}", e);
Expand Down
21 changes: 21 additions & 0 deletions crates/protocol/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,24 @@ impl TryFrom<&[u8]> for ClusterTunnelResponse {
bincode::deserialize(buf)
}
}

#[derive(Debug, Serialize, Deserialize)]
pub struct AgentTunnelRequest {
pub service: Option<u16>,
pub tls: bool,
pub domain: String,
}

impl From<&AgentTunnelRequest> for Vec<u8> {
fn from(resp: &AgentTunnelRequest) -> Self {
bincode::serialize(resp).expect("Should ok")
}
}

impl TryFrom<&[u8]> for AgentTunnelRequest {
type Error = bincode::Error;

fn try_from(buf: &[u8]) -> Result<Self, Self::Error> {
bincode::deserialize(buf)
}
}
1 change: 1 addition & 0 deletions crates/protocol/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pub mod cluster;
pub mod key;
pub mod services;
1 change: 1 addition & 0 deletions crates/protocol/src/services.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub const SERVICE_RTSP: u16 = 554;
1 change: 1 addition & 0 deletions crates/relayer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ metrics = { version = "0.22.0" }
quinn = { version = "0.11", features = ["ring", "runtime-async-std", "futures-io"] }
rustls = { version = "0.23", features = ["ring", "std"] }
atm0s-sdn = { git = "https://github.com/8xFF/atm0s-sdn.git", rev = "e5acc4458f8ce9bd0d9286bb3ad68a2a21fffb11" }
rtsp-types = "0.1.2"

[features]
default = ["binary"]
Expand Down
Loading

0 comments on commit 795b9fa

Please sign in to comment.