diff --git a/matchbox_socket/Cargo.toml b/matchbox_socket/Cargo.toml index e53e735f..1f9751fd 100644 --- a/matchbox_socket/Cargo.toml +++ b/matchbox_socket/Cargo.toml @@ -40,6 +40,7 @@ web-sys = { version = "0.3.22", default-features = false, features = [ "RtcPeerConnection", "RtcSdpType", "RtcSessionDescription", "RtcSessionDescriptionInit", "RtcIceGatheringState", "RtcIceCandidate", "RtcIceCandidateInit", "RtcPeerConnectionIceEvent", + "RtcIceConnectionState", "RtcConfiguration", "RtcDataChannel", "RtcDataChannelInit", "RtcDataChannelType", ] } serde-wasm-bindgen = { version = "0.4" } diff --git a/matchbox_socket/src/webrtc_socket/wasm/message_loop.rs b/matchbox_socket/src/webrtc_socket/wasm/message_loop.rs index e3071a70..ca58cbd1 100644 --- a/matchbox_socket/src/webrtc_socket/wasm/message_loop.rs +++ b/matchbox_socket/src/webrtc_socket/wasm/message_loop.rs @@ -12,8 +12,8 @@ use wasm_bindgen::{prelude::*, JsCast, JsValue}; use wasm_bindgen_futures::JsFuture; use web_sys::{ MessageEvent, RtcConfiguration, RtcDataChannel, RtcDataChannelInit, RtcDataChannelType, - RtcIceCandidateInit, RtcPeerConnection, RtcPeerConnectionIceEvent, RtcSdpType, - RtcSessionDescriptionInit, + RtcIceCandidateInit, RtcIceGatheringState, RtcPeerConnection, RtcPeerConnectionIceEvent, + RtcSdpType, RtcSessionDescriptionInit, }; use crate::webrtc_socket::{ @@ -164,6 +164,13 @@ async fn handshake_offer( .await .efix()?; debug!("created offer for new peer"); + + // todo: the point of implementing ice trickle is to avoid this wait... + // however, for some reason removing this wait causes problems with NAT + // punching in practice. + // We should figure out why this is happening. + wait_for_ice_gathering_complete(conn.clone()).await; + signal_peer.send(PeerSignal::Offer(conn.local_description().unwrap().sdp())); let mut received_candidates = vec![]; @@ -200,23 +207,26 @@ async fn handshake_offer( // send ICE candidates to remote peer let signal_peer_ice = signal_peer.clone(); let onicecandidate: Box = Box::new( - move |event: RtcPeerConnectionIceEvent| match event.candidate() { - Some(candidate) => { - let candidate = js_sys::JSON::stringify(&candidate.to_json()) + move |event: RtcPeerConnectionIceEvent| { + let candidate_json = match event.candidate() { + Some(candidate) => js_sys::JSON::stringify(&candidate.to_json()) .expect("failed to serialize candidate") .as_string() - .unwrap(); + .unwrap(), + None => { + debug!("Received RtcPeerConnectionIceEvent with no candidate. This means there are no further ice candidates for this session"); + "null".to_string() + } + }; - debug!("sending IceCandidate signal: {candidate:?}"); - signal_peer_ice.send(PeerSignal::IceCandidate(candidate)); - } - None => { - debug!("Received RtcPeerConnectionIceEvent with no candidate. This means there are no further ice candidates for this session"); - } + debug!("sending IceCandidate signal: {candidate_json:?}"); + signal_peer_ice.send(PeerSignal::IceCandidate(candidate_json)); }, ); let onicecandidate = Closure::wrap(onicecandidate); conn.set_onicecandidate(Some(onicecandidate.as_ref().unchecked_ref())); + // note: we can let rust keep ownership of this closure, since we replace + // the event handler later in this method when ice is finished // handle pending ICE candidates for candidate in received_candidates { @@ -245,25 +255,40 @@ async fn handshake_offer( // TODO: we should support getting new ICE candidates even after connecting, // since it's possible to return to the ice gathering state // See: - conn.set_onicecandidate(None); + let onicecandidate: Box = + Box::new(move |_event: RtcPeerConnectionIceEvent| { + warn!("received ice candidate event after handshake completed"); + }); + let onicecandidate = Closure::wrap(onicecandidate); + conn.set_onicecandidate(Some(onicecandidate.as_ref().unchecked_ref())); + onicecandidate.forget(); - debug!("Ice completed: {:?}", conn.ice_gathering_state()); + debug!( + "handshake_offer completed, ice gathering state: {:?}", + conn.ice_gathering_state() + ); Ok((signal_peer.id, data_channel)) } async fn try_add_rtc_ice_candidate(connection: &RtcPeerConnection, candidate_string: &str) { - let candidate_init = match js_sys::JSON::parse(candidate_string).map(RtcIceCandidateInit::from) - { - Ok(js_value) => js_value, + let parsed_candidate = match js_sys::JSON::parse(candidate_string) { + Ok(c) => c, Err(err) => { error!("failed to parse candidate json: {err:?}"); return; } }; + let candidate_init = if parsed_candidate.is_null() { + debug!("Received null ice candidate, this means there are no further ice candidates"); + None + } else { + Some(RtcIceCandidateInit::from(parsed_candidate)) + }; + JsFuture::from( - connection.add_ice_candidate_with_opt_rtc_ice_candidate_init(Some(&candidate_init)), + connection.add_ice_candidate_with_opt_rtc_ice_candidate_init(candidate_init.as_ref()), ) .await .expect("failed to add ice candidate"); @@ -339,6 +364,12 @@ async fn handshake_accept( .await .efix()?; + // todo: the point of implementing ice trickle is to avoid this wait... + // however, for some reason removing this wait causes problems with NAT + // punching in practice. + // We should figure out why this is happening. + wait_for_ice_gathering_complete(conn.clone()).await; + let answer = PeerSignal::Answer(conn.local_description().unwrap().sdp()); signal_peer.send(answer); @@ -346,23 +377,26 @@ async fn handshake_accept( let signal_peer_ice = signal_peer.clone(); // todo: exactly the same as offer, dedup? let onicecandidate: Box = Box::new( - move |event: RtcPeerConnectionIceEvent| match event.candidate() { - Some(candidate) => { - let candidate = js_sys::JSON::stringify(&candidate.to_json()) + move |event: RtcPeerConnectionIceEvent| { + let candidate_json = match event.candidate() { + Some(candidate) => js_sys::JSON::stringify(&candidate.to_json()) .expect("failed to serialize candidate") .as_string() - .unwrap(); + .unwrap(), + None => { + debug!("Received RtcPeerConnectionIceEvent with no candidate. This means there are no further ice candidates for this session"); + "null".to_string() + } + }; - debug!("sending IceCandidate signal: {candidate:?}"); - signal_peer_ice.send(PeerSignal::IceCandidate(candidate)); - } - None => { - debug!("Received RtcPeerConnectionIceEvent with no candidate. This means there are no further ice candidates for this session"); - } + debug!("sending IceCandidate signal: {candidate_json:?}"); + signal_peer_ice.send(PeerSignal::IceCandidate(candidate_json)); }, ); let onicecandidate = Closure::wrap(onicecandidate); conn.set_onicecandidate(Some(onicecandidate.as_ref().unchecked_ref())); + // note: we can let rust keep ownership of this closure, since we replace + // the event handler later in this method when ice is finished // handle pending ICE candidates for candidate in received_candidates { @@ -391,9 +425,18 @@ async fn handshake_accept( // TODO: we should support getting new ICE candidates even after connecting, // since it's possible to return to the ice gathering state // See: - conn.set_onicecandidate(None); + let onicecandidate: Box = + Box::new(move |_event: RtcPeerConnectionIceEvent| { + warn!("received ice candidate event after handshake completed"); + }); + let onicecandidate = Closure::wrap(onicecandidate); + conn.set_onicecandidate(Some(onicecandidate.as_ref().unchecked_ref())); + onicecandidate.forget(); - debug!("Ice completed: {:?}", conn.ice_gathering_state()); + debug!( + "handshake_accept completed, ice gathering state: {:?}", + conn.ice_gathering_state() + ); Ok((signal_peer.id, data_channel)) } @@ -415,7 +458,46 @@ fn create_rtc_peer_connection(config: &WebRtcSocketConfig) -> RtcPeerConnection }; let ice_server_config_list = [ice_server_config]; peer_config.ice_servers(&serde_wasm_bindgen::to_value(&ice_server_config_list).unwrap()); - RtcPeerConnection::new_with_configuration(&peer_config).unwrap() + let connection = RtcPeerConnection::new_with_configuration(&peer_config).unwrap(); + + let connection_1 = connection.clone(); + let oniceconnectionstatechange: Box = Box::new(move |_event: JsValue| { + debug!( + "ice connection state changed: {:?}", + connection_1.ice_connection_state() + ); + }); + let oniceconnectionstatechange = Closure::wrap(oniceconnectionstatechange); + connection + .set_oniceconnectionstatechange(Some(oniceconnectionstatechange.as_ref().unchecked_ref())); + oniceconnectionstatechange.forget(); + + connection +} + +async fn wait_for_ice_gathering_complete(conn: RtcPeerConnection) { + if conn.ice_gathering_state() == RtcIceGatheringState::Complete { + debug!("Ice gathering already completed"); + return; + } + + let (mut tx, mut rx) = futures_channel::mpsc::channel(1); + + let conn_clone = conn.clone(); + let onstatechange: Box = Box::new(move |_| { + if conn_clone.ice_gathering_state() == RtcIceGatheringState::Complete { + tx.try_send(()).unwrap(); + } + }); + + let onstatechange = Closure::wrap(onstatechange); + + conn.set_onicegatheringstatechange(Some(onstatechange.as_ref().unchecked_ref())); + + rx.next().await; + + conn.set_onicegatheringstatechange(None); + debug!("Ice gathering completed"); } fn create_data_channel(