From a484a4ee183d186bca855e6113f0551720c9ad97 Mon Sep 17 00:00:00 2001 From: giangndm <45644921+giangndm@users.noreply.github.com> Date: Fri, 26 Jul 2024 11:03:26 +0700 Subject: [PATCH] feat: graceful disconnect with webrtc (#385) --- packages/media_core/src/endpoint.rs | 6 +--- packages/media_core/src/transport.rs | 2 +- packages/media_runner/src/worker.rs | 29 ++++++++++++++----- packages/transport_webrtc/src/transport.rs | 11 +++++-- .../transport_webrtc/src/transport/webrtc.rs | 15 ++++++---- packages/transport_webrtc/src/worker.rs | 8 ++--- 6 files changed, 45 insertions(+), 26 deletions(-) diff --git a/packages/media_core/src/endpoint.rs b/packages/media_core/src/endpoint.rs index bd46b6ab..c6dbb662 100644 --- a/packages/media_core/src/endpoint.rs +++ b/packages/media_core/src/endpoint.rs @@ -171,7 +171,6 @@ pub enum EndpointInput { Net(BackendIncoming), Cluster(ClusterEndpointEvent), Ext(Ext), - Close, } pub enum EndpointOutput { @@ -238,14 +237,11 @@ where EndpointInput::Cluster(event) => { self.internal.input(&mut self.switcher).on_cluster_event(now, event); } - EndpointInput::Close => { - self.transport.input(&mut self.switcher).on_input(now, TransportInput::Close); - } } } fn on_shutdown(&mut self, now: Instant) { - self.transport.input(&mut self.switcher).on_input(now, TransportInput::Close); + self.transport.input(&mut self.switcher).on_input(now, TransportInput::SystemClose); } } diff --git a/packages/media_core/src/transport.rs b/packages/media_core/src/transport.rs index 99966765..d0958775 100644 --- a/packages/media_core/src/transport.rs +++ b/packages/media_core/src/transport.rs @@ -93,7 +93,7 @@ pub enum TransportInput { Endpoint(EndpointEvent), RpcRes(EndpointReqId, EndpointRes), Ext(Ext), - Close, + SystemClose, } /// This is event from transport, in general is is result of transport protocol diff --git a/packages/media_runner/src/worker.rs b/packages/media_runner/src/worker.rs index 20bddfef..df308126 100644 --- a/packages/media_runner/src/worker.rs +++ b/packages/media_runner/src/worker.rs @@ -441,6 +441,11 @@ impl MediaServerWorker { ) }))), ), + transport_webrtc::ExtOut::Disconnect(req_id, variant, res) => match variant { + transport_webrtc::Variant::Whip => Output::ExtRpc(req_id, RpcRes::Whip(whip::RpcRes::Delete(res.map(|_| WhipDeleteRes {})))), + transport_webrtc::Variant::Whep => Output::ExtRpc(req_id, RpcRes::Whep(whep::RpcRes::Delete(res.map(|_| WhepDeleteRes {})))), + transport_webrtc::Variant::Webrtc => Output::ExtRpc(req_id, RpcRes::Webrtc(webrtc::RpcRes::Delete(res))), + }, }, transport_webrtc::GroupOutput::Shutdown(_session) => Output::Continue, transport_webrtc::GroupOutput::Continue => Output::Continue, @@ -454,6 +459,7 @@ impl MediaServerWorker { match req { RpcReq::Whip(req) => match req { whip::RpcReq::Connect(req) => { + log::info!("on rpc request {req_id}, whip::RpcReq::Connect"); match self .media_webrtc .input(&mut self.switcher) @@ -471,13 +477,15 @@ impl MediaServerWorker { ); } whip::RpcReq::Delete(req) => { - //TODO check error instead of auto response ok - self.queue.push_back(Output::ExtRpc(req_id, RpcRes::Whip(whip::RpcRes::Delete(Ok(WhipDeleteRes {}))))); - self.media_webrtc.input(&mut self.switcher).on_event(now, GroupInput::Close(req.conn_id.into())); + log::info!("on rpc request {req_id}, whip::RpcReq::Delete"); + self.media_webrtc + .input(&mut self.switcher) + .on_event(now, GroupInput::Ext(req.conn_id.into(), transport_webrtc::ExtIn::Disconnect(req_id, transport_webrtc::Variant::Whip))); } }, RpcReq::Whep(req) => match req { whep::RpcReq::Connect(req) => { + log::info!("on rpc request {req_id}, whep::RpcReq::Connect"); let peer_id = format!("whep-{}", random::()); match self .media_webrtc @@ -496,13 +504,15 @@ impl MediaServerWorker { ); } whep::RpcReq::Delete(req) => { - //TODO check error instead of auto response ok - self.queue.push_back(Output::ExtRpc(req_id, RpcRes::Whep(whep::RpcRes::Delete(Ok(WhepDeleteRes {}))))); - self.media_webrtc.input(&mut self.switcher).on_event(now, GroupInput::Close(req.conn_id.into())); + log::info!("on rpc request {req_id}, whep::RpcReq::Delete"); + self.media_webrtc + .input(&mut self.switcher) + .on_event(now, GroupInput::Ext(req.conn_id.into(), transport_webrtc::ExtIn::Disconnect(req_id, transport_webrtc::Variant::Whep))); } }, RpcReq::Webrtc(req) => match req { webrtc::RpcReq::Connect(session_id, ip, user_agent, req, userdata, record) => { + log::info!("on rpc request {req_id}, webrtc::RpcReq::Connect"); match self .media_webrtc .input(&mut self.switcher) @@ -539,7 +549,12 @@ impl MediaServerWorker { ), ); } - webrtc::RpcReq::Delete(_) => todo!(), + webrtc::RpcReq::Delete(conn) => { + log::info!("on rpc request {req_id}, webrtc::RpcReq::Delete"); + self.media_webrtc + .input(&mut self.switcher) + .on_event(now, GroupInput::Ext(conn.into(), transport_webrtc::ExtIn::Disconnect(req_id, transport_webrtc::Variant::Webrtc))); + } }, } } diff --git a/packages/transport_webrtc/src/transport.rs b/packages/transport_webrtc/src/transport.rs index 794e2ee7..694950b3 100644 --- a/packages/transport_webrtc/src/transport.rs +++ b/packages/transport_webrtc/src/transport.rs @@ -61,6 +61,7 @@ pub enum ExtIn { RemoteIce(u64, Variant, Vec), /// Last option, bool is userdata and record flag RestartIce(u64, Variant, IpAddr, String, ConnectRequest, Option, bool), + Disconnect(u64, Variant), } #[derive(Debug, PartialEq, Eq)] @@ -68,6 +69,7 @@ pub enum ExtOut { RemoteIce(u64, Variant, RpcResult), /// response is (ice_lite, answer_sdp) RestartIce(u64, Variant, RpcResult<(bool, String)>), + Disconnect(u64, Variant, RpcResult<()>), } #[derive(Debug, PartialEq, Eq)] @@ -323,10 +325,13 @@ impl Transport for TransportWebrtc .push_back(TransportOutput::Ext(ExtOut::RestartIce(req_id, variant, Err(RpcError::new2(WebrtcError::InvalidSdp))))); } } + ExtIn::Disconnect(req_id, variant) => { + self.internal.close(now); + self.queue.push_back(TransportOutput::Ext(ExtOut::Disconnect(req_id, variant, Ok(())))); + } }, - TransportInput::Close => { - log::info!("[TransportWebrtc] close request"); - self.rtc.disconnect(); + TransportInput::SystemClose => { + log::info!("[TransportWebrtc] system close request"); self.internal.close(now); } } diff --git a/packages/transport_webrtc/src/transport/webrtc.rs b/packages/transport_webrtc/src/transport/webrtc.rs index b91ab53d..55ce537c 100644 --- a/packages/transport_webrtc/src/transport/webrtc.rs +++ b/packages/transport_webrtc/src/transport/webrtc.rs @@ -453,9 +453,7 @@ impl TransportWebrtcInternal for TransportWebrtcSdk { log::info!("[TransportWebrtcSdk] channel closed, leave room {:?}", self.join); self.state = State::Disconnected; self.queue - .push_back(InternalOutput::TransportOutput(TransportOutput::Event(TransportEvent::State(TransportState::Disconnected(Some( - TransportError::Timeout, - )))))); + .push_back(InternalOutput::TransportOutput(TransportOutput::Event(TransportEvent::State(TransportState::Disconnected(None))))); } Str0mEvent::IceConnectionStateChange(state) => self.on_str0m_state(now, state), Str0mEvent::MediaAdded(media) => self.on_str0m_media_added(now, media), @@ -686,11 +684,16 @@ impl TransportWebrtcSdk { } self.queue.push_back(InternalOutput::RpcReq(req_id, InternalRpcReq::SetRemoteSdp(req.sdp))); } - protobuf::session::request::session::Request::Disconnect(_) => { + protobuf::session::request::session::Request::Disconnect(_req) => { log::info!("[TransportWebrtcSdk] switched to disconnected with close action from client"); - self.state = State::Disconnected; self.queue - .push_back(InternalOutput::TransportOutput(TransportOutput::Event(TransportEvent::State(TransportState::Disconnected(None))))) + .push_back(InternalOutput::TransportOutput(TransportOutput::Event(TransportEvent::State(TransportState::Disconnected(None))))); + self.send_rpc_res( + req_id, + protobuf::session::response::Response::Session(protobuf::session::response::Session { + response: Some(protobuf::session::response::session::Response::Disconnect(protobuf::session::response::session::Disconnect {})), + }), + ); } } } diff --git a/packages/transport_webrtc/src/worker.rs b/packages/transport_webrtc/src/worker.rs index 56795d25..65ff29e1 100644 --- a/packages/transport_webrtc/src/worker.rs +++ b/packages/transport_webrtc/src/worker.rs @@ -35,7 +35,6 @@ pub enum GroupInput { Net(BackendIncoming), Cluster(WebrtcSession, ClusterEndpointEvent), Ext(WebrtcSession, ExtIn), - Close(WebrtcSession), } #[derive(Debug)] @@ -165,12 +164,13 @@ impl MediaWorkerWebrtc { .push_back(GroupOutput::Ext(owner, ExtOut::RestartIce(req_id, variant, Err(RpcError::new2(WebrtcError::RpcEndpointNotFound))))); } } + ExtIn::Disconnect(req_id, variant) => { + self.queue + .push_back(GroupOutput::Ext(owner, ExtOut::Disconnect(req_id, variant, Err(RpcError::new2(WebrtcError::RpcEndpointNotFound))))); + } } } } - GroupInput::Close(owner) => { - self.endpoints.on_event(now, owner.index(), EndpointInput::Close); - } } }