Skip to content

Commit

Permalink
feat: graceful disconnect with webrtc (8xFF#385)
Browse files Browse the repository at this point in the history
  • Loading branch information
giangndm authored Jul 26, 2024
1 parent d632c3f commit a484a4e
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 26 deletions.
6 changes: 1 addition & 5 deletions packages/media_core/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,6 @@ pub enum EndpointInput<Ext> {
Net(BackendIncoming),
Cluster(ClusterEndpointEvent),
Ext(Ext),
Close,
}

pub enum EndpointOutput<Ext> {
Expand Down Expand Up @@ -238,14 +237,11 @@ where
EndpointInput::Cluster(event) => {
self.internal.input(&mut self.switcher).on_cluster_event(now, event);
}
EndpointInput::Close => {
self.transport.input(&mut self.switcher).on_input(now, TransportInput::Close);
}
}
}

fn on_shutdown(&mut self, now: Instant) {
self.transport.input(&mut self.switcher).on_input(now, TransportInput::Close);
self.transport.input(&mut self.switcher).on_input(now, TransportInput::SystemClose);
}
}

Expand Down
2 changes: 1 addition & 1 deletion packages/media_core/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ pub enum TransportInput<Ext> {
Endpoint(EndpointEvent),
RpcRes(EndpointReqId, EndpointRes),
Ext(Ext),
Close,
SystemClose,
}

/// This is event from transport, in general is is result of transport protocol
Expand Down
29 changes: 22 additions & 7 deletions packages/media_runner/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,11 @@ impl<ES: 'static + MediaEdgeSecure> MediaServerWorker<ES> {
)
}))),
),
transport_webrtc::ExtOut::Disconnect(req_id, variant, res) => match variant {
transport_webrtc::Variant::Whip => Output::ExtRpc(req_id, RpcRes::Whip(whip::RpcRes::Delete(res.map(|_| WhipDeleteRes {})))),
transport_webrtc::Variant::Whep => Output::ExtRpc(req_id, RpcRes::Whep(whep::RpcRes::Delete(res.map(|_| WhepDeleteRes {})))),
transport_webrtc::Variant::Webrtc => Output::ExtRpc(req_id, RpcRes::Webrtc(webrtc::RpcRes::Delete(res))),
},
},
transport_webrtc::GroupOutput::Shutdown(_session) => Output::Continue,
transport_webrtc::GroupOutput::Continue => Output::Continue,
Expand All @@ -454,6 +459,7 @@ impl<ES: 'static + MediaEdgeSecure> MediaServerWorker<ES> {
match req {
RpcReq::Whip(req) => match req {
whip::RpcReq::Connect(req) => {
log::info!("on rpc request {req_id}, whip::RpcReq::Connect");
match self
.media_webrtc
.input(&mut self.switcher)
Expand All @@ -471,13 +477,15 @@ impl<ES: 'static + MediaEdgeSecure> MediaServerWorker<ES> {
);
}
whip::RpcReq::Delete(req) => {
//TODO check error instead of auto response ok
self.queue.push_back(Output::ExtRpc(req_id, RpcRes::Whip(whip::RpcRes::Delete(Ok(WhipDeleteRes {})))));
self.media_webrtc.input(&mut self.switcher).on_event(now, GroupInput::Close(req.conn_id.into()));
log::info!("on rpc request {req_id}, whip::RpcReq::Delete");
self.media_webrtc
.input(&mut self.switcher)
.on_event(now, GroupInput::Ext(req.conn_id.into(), transport_webrtc::ExtIn::Disconnect(req_id, transport_webrtc::Variant::Whip)));
}
},
RpcReq::Whep(req) => match req {
whep::RpcReq::Connect(req) => {
log::info!("on rpc request {req_id}, whep::RpcReq::Connect");
let peer_id = format!("whep-{}", random::<u64>());
match self
.media_webrtc
Expand All @@ -496,13 +504,15 @@ impl<ES: 'static + MediaEdgeSecure> MediaServerWorker<ES> {
);
}
whep::RpcReq::Delete(req) => {
//TODO check error instead of auto response ok
self.queue.push_back(Output::ExtRpc(req_id, RpcRes::Whep(whep::RpcRes::Delete(Ok(WhepDeleteRes {})))));
self.media_webrtc.input(&mut self.switcher).on_event(now, GroupInput::Close(req.conn_id.into()));
log::info!("on rpc request {req_id}, whep::RpcReq::Delete");
self.media_webrtc
.input(&mut self.switcher)
.on_event(now, GroupInput::Ext(req.conn_id.into(), transport_webrtc::ExtIn::Disconnect(req_id, transport_webrtc::Variant::Whep)));
}
},
RpcReq::Webrtc(req) => match req {
webrtc::RpcReq::Connect(session_id, ip, user_agent, req, userdata, record) => {
log::info!("on rpc request {req_id}, webrtc::RpcReq::Connect");
match self
.media_webrtc
.input(&mut self.switcher)
Expand Down Expand Up @@ -539,7 +549,12 @@ impl<ES: 'static + MediaEdgeSecure> MediaServerWorker<ES> {
),
);
}
webrtc::RpcReq::Delete(_) => todo!(),
webrtc::RpcReq::Delete(conn) => {
log::info!("on rpc request {req_id}, webrtc::RpcReq::Delete");
self.media_webrtc
.input(&mut self.switcher)
.on_event(now, GroupInput::Ext(conn.into(), transport_webrtc::ExtIn::Disconnect(req_id, transport_webrtc::Variant::Webrtc)));
}
},
}
}
Expand Down
11 changes: 8 additions & 3 deletions packages/transport_webrtc/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,15 @@ pub enum ExtIn {
RemoteIce(u64, Variant, Vec<String>),
/// Last option<string>, bool is userdata and record flag
RestartIce(u64, Variant, IpAddr, String, ConnectRequest, Option<String>, bool),
Disconnect(u64, Variant),
}

#[derive(Debug, PartialEq, Eq)]
pub enum ExtOut {
RemoteIce(u64, Variant, RpcResult<u32>),
/// response is (ice_lite, answer_sdp)
RestartIce(u64, Variant, RpcResult<(bool, String)>),
Disconnect(u64, Variant, RpcResult<()>),
}

#[derive(Debug, PartialEq, Eq)]
Expand Down Expand Up @@ -323,10 +325,13 @@ impl<ES: 'static + MediaEdgeSecure> Transport<ExtIn, ExtOut> for TransportWebrtc
.push_back(TransportOutput::Ext(ExtOut::RestartIce(req_id, variant, Err(RpcError::new2(WebrtcError::InvalidSdp)))));
}
}
ExtIn::Disconnect(req_id, variant) => {
self.internal.close(now);
self.queue.push_back(TransportOutput::Ext(ExtOut::Disconnect(req_id, variant, Ok(()))));
}
},
TransportInput::Close => {
log::info!("[TransportWebrtc] close request");
self.rtc.disconnect();
TransportInput::SystemClose => {
log::info!("[TransportWebrtc] system close request");
self.internal.close(now);
}
}
Expand Down
15 changes: 9 additions & 6 deletions packages/transport_webrtc/src/transport/webrtc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,9 +453,7 @@ impl<ES: MediaEdgeSecure> TransportWebrtcInternal for TransportWebrtcSdk<ES> {
log::info!("[TransportWebrtcSdk] channel closed, leave room {:?}", self.join);
self.state = State::Disconnected;
self.queue
.push_back(InternalOutput::TransportOutput(TransportOutput::Event(TransportEvent::State(TransportState::Disconnected(Some(
TransportError::Timeout,
))))));
.push_back(InternalOutput::TransportOutput(TransportOutput::Event(TransportEvent::State(TransportState::Disconnected(None)))));
}
Str0mEvent::IceConnectionStateChange(state) => self.on_str0m_state(now, state),
Str0mEvent::MediaAdded(media) => self.on_str0m_media_added(now, media),
Expand Down Expand Up @@ -686,11 +684,16 @@ impl<ES: MediaEdgeSecure> TransportWebrtcSdk<ES> {
}
self.queue.push_back(InternalOutput::RpcReq(req_id, InternalRpcReq::SetRemoteSdp(req.sdp)));
}
protobuf::session::request::session::Request::Disconnect(_) => {
protobuf::session::request::session::Request::Disconnect(_req) => {
log::info!("[TransportWebrtcSdk] switched to disconnected with close action from client");
self.state = State::Disconnected;
self.queue
.push_back(InternalOutput::TransportOutput(TransportOutput::Event(TransportEvent::State(TransportState::Disconnected(None)))))
.push_back(InternalOutput::TransportOutput(TransportOutput::Event(TransportEvent::State(TransportState::Disconnected(None)))));
self.send_rpc_res(
req_id,
protobuf::session::response::Response::Session(protobuf::session::response::Session {
response: Some(protobuf::session::response::session::Response::Disconnect(protobuf::session::response::session::Disconnect {})),
}),
);
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions packages/transport_webrtc/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ pub enum GroupInput {
Net(BackendIncoming),
Cluster(WebrtcSession, ClusterEndpointEvent),
Ext(WebrtcSession, ExtIn),
Close(WebrtcSession),
}

#[derive(Debug)]
Expand Down Expand Up @@ -165,12 +164,13 @@ impl<ES: MediaEdgeSecure> MediaWorkerWebrtc<ES> {
.push_back(GroupOutput::Ext(owner, ExtOut::RestartIce(req_id, variant, Err(RpcError::new2(WebrtcError::RpcEndpointNotFound)))));
}
}
ExtIn::Disconnect(req_id, variant) => {
self.queue
.push_back(GroupOutput::Ext(owner, ExtOut::Disconnect(req_id, variant, Err(RpcError::new2(WebrtcError::RpcEndpointNotFound)))));
}
}
}
}
GroupInput::Close(owner) => {
self.endpoints.on_event(now, owner.index(), EndpointInput::Close);
}
}
}

Expand Down

0 comments on commit a484a4e

Please sign in to comment.