diff --git a/rtc b/rtc index 5260d38..3bb55d7 160000 --- a/rtc +++ b/rtc @@ -1 +1 @@ -Subproject commit 5260d38967d6c935bca697a8564f70b9f2ab197c +Subproject commit 3bb55d7cf57593b4e653f1fada9f061e1f708b06 diff --git a/test-data/output.h264 b/test-data/output.h264 new file mode 100644 index 0000000..03555e8 Binary files /dev/null and b/test-data/output.h264 differ diff --git a/test-data/output.ogg b/test-data/output.ogg new file mode 100644 index 0000000..e824455 Binary files /dev/null and b/test-data/output.ogg differ diff --git a/test-data/output_vp8.ivf b/test-data/output_vp8.ivf new file mode 100644 index 0000000..28bf002 Binary files /dev/null and b/test-data/output_vp8.ivf differ diff --git a/test-data/output_vp9.ivf b/test-data/output_vp9.ivf new file mode 100644 index 0000000..03607b5 Binary files /dev/null and b/test-data/output_vp9.ivf differ diff --git a/tests/common/mod.rs b/tests/common/mod.rs index b9e14f5..27b8882 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -6,22 +6,35 @@ use log::LevelFilter::Debug; use log::{error, info}; use rand::random; use sfu::{EndpointId, SessionId}; -use std::collections::HashMap; use std::io::Write; use std::sync::Arc; +use std::time::Duration; +use tokio::sync::Notify; use webrtc::api::interceptor_registry::register_default_interceptors; use webrtc::api::media_engine::MediaEngine; use webrtc::api::APIBuilder; use webrtc::data_channel::data_channel_message::DataChannelMessage; +use webrtc::data_channel::RTCDataChannel; +use webrtc::ice_transport::ice_connection_state::RTCIceConnectionState; use webrtc::interceptor::registry::Registry; use webrtc::peer_connection::configuration::RTCConfiguration; use webrtc::peer_connection::peer_connection_state::RTCPeerConnectionState; use webrtc::peer_connection::sdp::sdp_type::RTCSdpType; use webrtc::peer_connection::sdp::session_description::RTCSessionDescription; use webrtc::peer_connection::RTCPeerConnection; +use webrtc::rtp_transceiver::rtp_codec::RTCRtpCodecCapability; +use webrtc::rtp_transceiver::rtp_transceiver_direction::RTCRtpTransceiverDirection; +use webrtc::rtp_transceiver::{RTCRtpTransceiver, RTCRtpTransceiverInit}; +use webrtc::track::track_local::track_local_static_sample::TrackLocalStaticSample; +use webrtc::track::track_local::TrackLocal; pub const HOST: &'static str = "127.0.0.1"; pub const SIGNAL_PORT: u16 = 8080; +pub const OGG_PAGE_DURATION: Duration = Duration::from_millis(20); + +fn pretty_sdp(input: &str) -> String { + input.replace("\\r\\n", "\n") +} pub async fn setup_peer_connection( config: RTCConfiguration, @@ -92,12 +105,12 @@ pub async fn setup_peer_connection( pub async fn setup_peer_connections( configs: Vec, -) -> Result>> { - let mut peer_connections = HashMap::new(); +) -> Result)>> { + let mut peer_connections = Vec::with_capacity(configs.len()); for config in configs { let (endpoint_id, peer_connection) = setup_peer_connection(config).await?; - peer_connections.insert(endpoint_id, peer_connection); + peer_connections.push((endpoint_id, peer_connection)); } Ok(peer_connections) @@ -110,7 +123,7 @@ pub async fn teardown_peer_connection(pc: Arc) -> Result<()> } pub async fn teardown_peer_connections( - pcs: HashMap>, + pcs: Vec<(EndpointId, Arc)>, ) -> Result<()> { for (_, pc) in pcs { teardown_peer_connection(pc).await?; @@ -140,29 +153,79 @@ async fn signaling( std::str::from_utf8(&hyper::body::to_bytes(resp.into_body()).await?)?.to_string(); info!( "{}/{}: answer sdp {}", - session_id, endpoint_id, answer_payload + session_id, + endpoint_id, + pretty_sdp(&answer_payload) ); let answer = serde_json::from_str::(&answer_payload)?; Ok(answer) } -pub async fn connect( +pub async fn renegotiate( host: &str, signal_port: u16, session_id: SessionId, endpoint_id: EndpointId, peer_connection: &Arc, + data_channel: Option<&Arc>, ) -> Result<()> { + // Create an offer to send to the other process + let offer = peer_connection.create_offer(None).await?; + + // Send our offer to the HTTP server listening in the other process + let offer_payload = serde_json::to_string(&offer)?; + info!( + "{}/{}: offer sdp {}", + session_id, + endpoint_id, + pretty_sdp(&offer_payload) + ); + + // Sets the LocalDescription, and starts our UDP listeners + // Note: this will start the gathering of ICE candidates + peer_connection.set_local_description(offer).await?; + + if let Some(data_channel) = data_channel { + data_channel.send_text(offer_payload).await?; + } else { + let answer = signaling(host, signal_port, session_id, endpoint_id, offer_payload).await?; + peer_connection.set_remote_description(answer).await?; + } + + Ok(()) +} + +pub async fn connect( + host: &str, + signal_port: u16, + session_id: SessionId, + endpoint_id: EndpointId, + peer_connection: &Arc, +) -> Result> { // Create a datachannel with label 'data' let data_channel = peer_connection.create_data_channel("data", None).await?; + // Register channel opening handling + let data_channel_opened_notify_tx = Arc::new(Notify::new()); + let data_channel_opened_ready_notify_rx = data_channel_opened_notify_tx.clone(); + data_channel.on_open(Box::new(move || { + info!("DataChannel is opened"); + data_channel_opened_notify_tx.notify_waiters(); + Box::pin(async {}) + })); + // Register SDP message handling let peer_connection_clone = peer_connection.clone(); let data_channel_clone = data_channel.clone(); data_channel.on_message(Box::new(move |msg: DataChannelMessage| { let sdp_str = String::from_utf8(msg.data.to_vec()).unwrap(); - info!("{session_id}/{endpoint_id}: SDP from DataChannel: '{sdp_str}'"); + info!( + "{}/{}: SDP from DataChannel: {}", + session_id, + endpoint_id, + pretty_sdp(&sdp_str) + ); let sdp = match serde_json::from_str::(&sdp_str) { Ok(sdp) => sdp, Err(err) => { @@ -200,7 +263,12 @@ pub async fn connect( return; } }; - info!("{session_id}/{endpoint_id}: SDP to DataChannel: '{answer_str}'"); + info!( + "{}/{}: SDP to DataChannel: '{}'", + session_id, + endpoint_id, + pretty_sdp(&answer_str) + ); if let Err(err) = dc.send_text(answer_str).await { error!("data channel send answer error {:?}", err); assert!(false); @@ -222,22 +290,66 @@ pub async fn connect( }) })); - // Create an offer to send to the other process - let offer = peer_connection.create_offer(None).await?; + renegotiate( + host, + signal_port, + session_id, + endpoint_id, + peer_connection, + None, + ) + .await?; - // Send our offer to the HTTP server listening in the other process - let offer_payload = serde_json::to_string(&offer)?; - info!( - "{}/{}: offer sdp {}", - session_id, endpoint_id, offer_payload - ); + let ice_ready_notify_tx = Arc::new(Notify::new()); + let ice_ready_notify_rx = ice_ready_notify_tx.clone(); - // Sets the LocalDescription, and starts our UDP listeners - // Note: this will start the gathering of ICE candidates - peer_connection.set_local_description(offer).await?; + // Set the handler for ICE connection state + // This will notify you when the peer has connected/disconnected + peer_connection.on_ice_connection_state_change(Box::new( + move |connection_state: RTCIceConnectionState| { + info!("Connection State has changed {connection_state}"); + if connection_state == RTCIceConnectionState::Connected { + ice_ready_notify_tx.notify_waiters(); + } + Box::pin(async {}) + }, + )); - let answer = signaling(host, signal_port, session_id, endpoint_id, offer_payload).await?; - peer_connection.set_remote_description(answer).await?; + // Wait for connection established + ice_ready_notify_rx.notified().await; - Ok(()) + // Wait for data channel opened + data_channel_opened_ready_notify_rx.notified().await; + + Ok(data_channel) +} + +pub async fn add_track( + peer_connection: &Arc, + mime_type: &str, + track_id: &str, + direction: RTCRtpTransceiverDirection, +) -> Result> { + // Create a video track + let track = Arc::new(TrackLocalStaticSample::new( + RTCRtpCodecCapability { + mime_type: mime_type.to_owned(), + ..Default::default() + }, + track_id.to_owned(), + "webrtc-rs".to_owned(), + )); + + // Add this newly created track to the PeerConnection + let rtp_transceiver = peer_connection + .add_transceiver_from_track( + Arc::clone(&track) as Arc, + Some(RTCRtpTransceiverInit { + direction, + send_encodings: vec![], + }), + ) + .await?; + + Ok(rtp_transceiver) } diff --git a/tests/integration_test.rs b/tests/data_channels_test.rs similarity index 93% rename from tests/integration_test.rs rename to tests/data_channels_test.rs index 9575f77..c667af1 100644 --- a/tests/integration_test.rs +++ b/tests/data_channels_test.rs @@ -67,8 +67,8 @@ async fn test_data_channels() -> anyhow::Result<()> { } }; - for (&endpoint_id, peer_connection) in peer_connections.iter() { - match common::connect(HOST, SIGNAL_PORT, session_id, endpoint_id, peer_connection).await { + for (endpoint_id, peer_connection) in peer_connections.iter() { + match common::connect(HOST, SIGNAL_PORT, session_id, *endpoint_id, peer_connection).await { Ok(ok) => ok, Err(err) => { error!("{}/{}: error {}", session_id, endpoint_id, err); diff --git a/tests/play_from_disk_vpx_test.rs b/tests/play_from_disk_vpx_test.rs new file mode 100644 index 0000000..a26f838 --- /dev/null +++ b/tests/play_from_disk_vpx_test.rs @@ -0,0 +1,101 @@ +use crate::common::{HOST, SIGNAL_PORT}; +use log::{error, info}; +use rand::random; +use sfu::SessionId; +use webrtc::api::media_engine::MIME_TYPE_VP8; +use webrtc::ice_transport::ice_server::RTCIceServer; +use webrtc::peer_connection::configuration::RTCConfiguration; +use webrtc::rtp_transceiver::rtp_transceiver_direction::RTCRtpTransceiverDirection; + +// importing common module. +mod common; + +#[tokio::test] +async fn test_play_from_disk_vpx_1to1() -> anyhow::Result<()> { + // Prepare the configuration + let session_id: SessionId = random::(); + let config = RTCConfiguration { + ice_servers: vec![RTCIceServer { + urls: vec!["stun:stun.l.google.com:19302".to_owned()], + ..Default::default() + }], + ..Default::default() + }; + + let peer_connections = match common::setup_peer_connections(vec![config.clone(), config]).await + { + Ok(ok) => ok, + Err(err) => { + error!("{}: error {}", session_id, err); + return Err(err.into()); + } + }; + + let mut data_channels = vec![]; + for (endpoint_id, peer_connection) in peer_connections.iter() { + let data_channel = + match common::connect(HOST, SIGNAL_PORT, session_id, *endpoint_id, peer_connection) + .await + { + Ok(ok) => ok, + Err(err) => { + error!("{}/{}: error {}", session_id, endpoint_id, err); + return Err(err.into()); + } + }; + data_channels.push(data_channel); + } + + let rtp_transceiver = match common::add_track( + &peer_connections[0].1, + MIME_TYPE_VP8, + "video_track", + RTCRtpTransceiverDirection::Sendonly, + ) + .await + { + Ok(ok) => ok, + Err(err) => { + error!("{}/{}: error {}", session_id, peer_connections[0].0, err); + return Err(err.into()); + } + }; + + // Read incoming RTCP packets + // Before these packets are returned they are processed by interceptors. For things + // like NACK this needs to be called. + tokio::spawn(async move { + let rtp_sender = rtp_transceiver.sender().await; + while let Ok((rtcp_packets, _)) = rtp_sender.read_rtcp().await { + info!("received RTCP packets {:?}", rtcp_packets); + //TODO: check RTCP report and handle cancel + } + }); + + match common::renegotiate( + HOST, + SIGNAL_PORT, + session_id, + peer_connections[0].0, + &peer_connections[0].1, + Some(&data_channels[0]), + ) + .await + { + Ok(ok) => ok, + Err(err) => { + error!("{}/{}: error {}", session_id, peer_connections[0].0, err); + return Err(err.into()); + } + }; + + match common::teardown_peer_connections(peer_connections).await { + Ok(ok) => ok, + Err(err) => { + error!("{}: error {}", session_id, err); + return Err(err.into()); + } + } + + Ok(()) +}