Skip to content

Commit

Permalink
WIP: whep success connect
Browse files Browse the repository at this point in the history
  • Loading branch information
giangndm committed Apr 21, 2024
1 parent efddd99 commit f792f23
Show file tree
Hide file tree
Showing 24 changed files with 797 additions and 297 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ atm0s-sdn = { path = "../atm0s-sdn/packages/runner" }
convert-enum = "0.1"
num_enum = "0.7"
log = "0.4"
smallmap = "1.4"
6 changes: 4 additions & 2 deletions bin/src/http.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::net::SocketAddr;

use media_server_protocol::endpoint::ClusterConnId;
use media_server_protocol::transport::{RpcReq, RpcRes};
use poem::endpoint::StaticFilesEndpoint;
Expand Down Expand Up @@ -51,7 +53,7 @@ pub async fn run_gateway_http_server(sender: Sender<Rpc<RpcReq<ClusterConnId>, R
Ok(())
}

pub async fn run_media_http_server(sender: Sender<Rpc<RpcReq<ClusterConnId>, RpcRes<ClusterConnId>>>) -> Result<(), Box<dyn std::error::Error>> {
pub async fn run_media_http_server(port: u16, sender: Sender<Rpc<RpcReq<ClusterConnId>, RpcRes<ClusterConnId>>>) -> Result<(), Box<dyn std::error::Error>> {
let api_service: OpenApiService<_, ()> = OpenApiService::new(api_media::MediaApis, "Media Server APIs", env!("CARGO_PKG_VERSION")).server("/");
let ui = api_service.swagger_ui();
let spec = api_service.spec();
Expand All @@ -63,6 +65,6 @@ pub async fn run_media_http_server(sender: Sender<Rpc<RpcReq<ClusterConnId>, Rpc
.with(Cors::new())
.data(api_media::MediaServerCtx { sender });

Server::new(TcpListener::bind("0.0.0.0:3000")).run(route).await?;
Server::new(TcpListener::bind(SocketAddr::new([0, 0, 0, 0].into(), port))).run(route).await?;
Ok(())
}
93 changes: 93 additions & 0 deletions bin/src/http/api_media.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use media_server_protocol::{
endpoint::ClusterConnId,
transport::{
whep::{self, WhepConnectReq, WhepDeleteReq, WhepRemoteIceReq},
whip::{self, WhipConnectReq, WhipDeleteReq, WhipRemoteIceReq},
RpcReq, RpcRes, RpcResult,
},
Expand Down Expand Up @@ -122,4 +123,96 @@ impl MediaApis {
_ => Err(poem::Error::from_status(StatusCode::INTERNAL_SERVER_ERROR)),
}
}

/// connect whep endpoint
#[oai(path = "/whep/endpoint", method = "post")]
async fn whep_create(
&self,
Data(data): Data<&MediaServerCtx>,
UserAgent(user_agent): UserAgent,
RemoteIpAddr(ip_addr): RemoteIpAddr,
TokenAuthorization(token): TokenAuthorization,
body: ApplicationSdp<String>,
) -> Result<CustomHttpResponse<ApplicationSdp<String>>> {
log::info!("[MediaAPIs] create whep endpoint with token {}, ip {}, user_agent {}", token.token, ip_addr, user_agent);
let (req, rx) = Rpc::new(RpcReq::Whep(whep::RpcReq::Connect(WhepConnectReq {
ip: ip_addr,
sdp: body.0,
token: token.token,
user_agent,
})));
data.sender.send(req).await.map_err(|_e| poem::Error::from_status(StatusCode::INTERNAL_SERVER_ERROR))?;
let res = rx.await.map_err(|_e| poem::Error::from_status(StatusCode::INTERNAL_SERVER_ERROR))?;
match res {
RpcRes::Whep(whep::RpcRes::Connect(res)) => match res {
RpcResult::Ok(res) => {
log::info!("[HttpApis] Whep endpoint created with conn_id {}", res.conn_id);
Ok(CustomHttpResponse {
code: StatusCode::CREATED,
res: ApplicationSdp(res.sdp),
headers: vec![("location", format!("/whep/conn/{}", res.conn_id))],
})
}
RpcResult::Err(e) => {
log::warn!("Whep endpoint creation failed with {e}");
Err(poem::Error::from_string(e.to_string(), StatusCode::BAD_REQUEST))
}
},
_ => Err(poem::Error::from_status(StatusCode::INTERNAL_SERVER_ERROR)),
}
}

/// patch whep conn for trickle-ice
#[oai(path = "/whep/conn/:conn_id", method = "patch")]
async fn conn_whep_patch(&self, Data(data): Data<&MediaServerCtx>, conn_id: Path<String>, body: ApplicationSdpPatch<String>) -> Result<HttpResponse<ApplicationSdpPatch<String>>> {
let conn_id = conn_id.0.parse().map_err(|_e| poem::Error::from_status(StatusCode::BAD_REQUEST))?;
log::info!("[HttpApis] patch whep endpoint with sdp {}", body.0);
let (req, rx) = Rpc::new(RpcReq::Whep(whep::RpcReq::RemoteIce(WhepRemoteIceReq { conn_id, ice: body.0 })));
data.sender.send(req).await.map_err(|_e| poem::Error::from_status(StatusCode::INTERNAL_SERVER_ERROR))?;
let res = rx.await.map_err(|_e| poem::Error::from_status(StatusCode::INTERNAL_SERVER_ERROR))?;
//TODO process with ICE restart
match res {
RpcRes::Whep(whep::RpcRes::RemoteIce(res)) => match res {
RpcResult::Ok(_res) => {
log::info!("[HttpApis] Whep endpoint patch trickle-ice with conn_id {conn_id}");
Ok(HttpResponse::new(ApplicationSdpPatch("".to_string())).status(StatusCode::NO_CONTENT))
}
RpcResult::Err(e) => {
log::warn!("Whep endpoint patch trickle-ice failed with error {e}");
Err(poem::Error::from_string(e.to_string(), StatusCode::BAD_REQUEST))
}
},
_ => Err(poem::Error::from_status(StatusCode::INTERNAL_SERVER_ERROR)),
}
}

/// post whep conn for action
#[oai(path = "/api/whep/conn/:conn_id", method = "post")]
async fn conn_whep_post(&self, _ctx: Data<&MediaServerCtx>, _conn_id: Path<String>, _body: Json<String>) -> Result<ApplicationSdp<String>> {
// let conn_id = conn_id.0.parse().map_err(|_e| poem::Error::from_status(StatusCode::BAD_REQUEST))?;
Err(poem::Error::from_string("Not supported", StatusCode::BAD_REQUEST))
}

/// delete whep conn
#[oai(path = "/whep/conn/:conn_id", method = "delete")]
async fn conn_whep_delete(&self, Data(data): Data<&MediaServerCtx>, conn_id: Path<String>) -> Result<PlainText<String>> {
let conn_id = conn_id.0.parse().map_err(|_e| poem::Error::from_status(StatusCode::BAD_REQUEST))?;
log::info!("[HttpApis] close whep endpoint conn {}", conn_id);
let (req, rx) = Rpc::new(RpcReq::Whep(whep::RpcReq::Delete(WhepDeleteReq { conn_id })));
data.sender.send(req).await.map_err(|_e| poem::Error::from_status(StatusCode::INTERNAL_SERVER_ERROR))?;
let res = rx.await.map_err(|_e| poem::Error::from_status(StatusCode::INTERNAL_SERVER_ERROR))?;
match res {
RpcRes::Whep(whep::RpcRes::Delete(res)) => match res {
RpcResult::Ok(_res) => {
log::info!("[HttpApis] Whep endpoint closed with conn_id {conn_id}");
Ok(PlainText("OK".to_string()))
}
RpcResult::Err(e) => {
log::warn!("Whep endpoint close request failed with error {e}");
Err(poem::Error::from_string(e.to_string(), StatusCode::BAD_REQUEST))
}
},
_ => Err(poem::Error::from_status(StatusCode::INTERNAL_SERVER_ERROR)),
}
}
}
14 changes: 9 additions & 5 deletions bin/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,22 +48,26 @@ struct Args {
#[arg(env, long)]
seeds: Vec<NodeAddr>,

/// Neighbors
#[arg(env, long)]
/// Workers
#[arg(env, long, default_value_t = 1)]
workers: usize,

#[command(subcommand)]
server: ServerType,
}

#[tokio::main]
#[tokio::main(flavor = "current_thread")]
async fn main() {
if std::env::var_os("RUST_LOG").is_none() {
std::env::set_var("RUST_LOG", "atm0s_media_server=info");
std::env::set_var("RUST_LOG", "info");
}
if std::env::var_os("RUST_BACKTRACE").is_none() {
std::env::set_var("RUST_BACKTRACE", "1");
}
let args: Args = Args::parse();
tracing_subscriber::registry().with(fmt::layer()).with(EnvFilter::from_default_env()).init();

let http_port = args.http_port;
let workers = args.workers;
let node = NodeConfig {
node_id: args.node_id,
Expand All @@ -76,6 +80,6 @@ async fn main() {
match args.server {
ServerType::Gateway(args) => run_media_gateway(workers, args).await,
ServerType::Connector(args) => run_media_connector(workers, args).await,
ServerType::Media(args) => run_media_server(workers, node, args).await,
ServerType::Media(args) => run_media_server(workers, http_port, node, args).await,
}
}
32 changes: 22 additions & 10 deletions bin/src/server/media.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,25 @@ use runtime_worker::{ExtIn, ExtOut};

#[derive(Debug, Parser)]
pub struct Args {
/// Custom binding address for WebRTC UDP
#[arg(env, long)]
custom_addrs: Vec<SocketAddr>,
}

pub async fn run_media_server(workers: usize, node: NodeConfig, args: Args) {
pub async fn run_media_server(workers: usize, http_port: Option<u16>, node: NodeConfig, args: Args) {
println!("Running media server");
let (req_tx, mut req_rx) = tokio::sync::mpsc::channel(1024);
tokio::spawn(async move {
if let Err(e) = run_media_http_server(req_tx).await {
log::error!("HTTP Error: {}", e);
}
});
if let Some(http_port) = http_port {
tokio::spawn(async move {
if let Err(e) = run_media_http_server(http_port, req_tx).await {
log::error!("HTTP Error: {}", e);
}
});
}

//TODO get local addrs
let node_id = node.node_id;
let node_session = node.session;
let webrtc_addrs = args.custom_addrs;
let mut controller = Controller::<_, _, _, _, _, 128>::default();
for i in 0..workers {
Expand All @@ -49,21 +54,28 @@ pub async fn run_media_server(workers: usize, node: NodeConfig, args: Args) {
req_id_seed += 1;
reqs.insert(req_id, req.answer_tx);

let (req, _node_id) = req.req.extract();
let (req, worker) = req.extract();
let (req, _node_id) = req.req.down();
let (req, worker) = req.down();

let ext = ExtIn::Rpc(req_id, req);
if let Some(worker) = worker {
controller.send_to(worker, ext);
if worker < workers as u16 {
log::info!("on req {req_id} dest to worker {worker}");
controller.send_to(worker, ext);
} else {
log::info!("on req {req_id} dest to wrong worker {worker} but workers is {workers}");
}
} else {
log::info!("on req {req_id} dest to any worker");
controller.send_to_best(ext);
}
}

while let Some(out) = controller.pop_event() {
match out {
ExtOut::Rpc(req_id, worker, res) => {
let res = res.up_layer(worker).up_layer(node_id);
log::info!("on req {req_id} res from worker {worker}");
let res = res.up(worker).up((node_id, node_session));
if let Some(tx) = reqs.remove(&req_id) {
if let Err(_) = tx.send(res) {
log::error!("Send rpc response error for req {req_id}");
Expand Down
6 changes: 6 additions & 0 deletions bin/src/server/media/runtime_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ impl WorkerInner<Owner, ExtIn, ExtOut, Channel, Event, ICfg, SCfg> for MediaRunt
}

fn on_tick<'a>(&mut self, now: Instant) -> Option<Output<'a>> {
if !self.queue.is_empty() {
return self.queue.pop_front();
}
let out = self.worker.on_tick(now)?;
Some(self.process_out(out))
}
Expand All @@ -104,6 +107,9 @@ impl WorkerInner<Owner, ExtIn, ExtOut, Channel, Event, ICfg, SCfg> for MediaRunt
}

fn pop_output<'a>(&mut self, now: Instant) -> Option<Output<'a>> {
if !self.queue.is_empty() {
return self.queue.pop_front();
}
let out = self.worker.pop_output(now)?;
Some(self.process_out(out))
}
Expand Down
1 change: 1 addition & 0 deletions packages/media_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ edition = "2021"
[dependencies]
log = { workspace = true }
num_enum = { workspace = true }
smallmap = { workspace = true }
sans-io-runtime = { workspace = true, default-features = false }
atm0s-sdn = { workspace = true }
media-server-protocol = { path = "../protocol" }
Expand Down
12 changes: 8 additions & 4 deletions packages/media_core/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,18 +78,22 @@ impl<Owner> Default for MediaCluster<Owner> {

impl<Owner> MediaCluster<Owner> {
pub fn on_tick(&mut self, now: Instant) -> Option<Output<Owner>> {
todo!()
//TODO
None
}

pub fn on_input(&mut self, now: Instant, input: Input<Owner>) -> Option<Output<Owner>> {
todo!()
//TODO
None
}

pub fn pop_output(&mut self, now: Instant) -> Option<Output<Owner>> {
todo!()
//TODO
None
}

pub fn shutdown<'a>(&mut self, now: Instant) -> Option<Output<Owner>> {
todo!()
//TODO
None
}
}
1 change: 1 addition & 0 deletions packages/media_core/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ impl<T: Transport<ExtIn, ExtOut>, ExtIn, ExtOut> Endpoint<T, ExtIn, ExtOut> {
let out = self.internal.on_transport_rpc(now, req_id, req)?;
self.process_internal_output(now, out)
}
TransportOutput::Destroy => Some(EndpointOutput::Shutdown),
}
}

Expand Down
Loading

0 comments on commit f792f23

Please sign in to comment.