Skip to content

Commit

Permalink
add test_play_from_disk_vpx_1to1
Browse files Browse the repository at this point in the history
  • Loading branch information
yngrtc committed Feb 18, 2024
1 parent 7dba3c4 commit 94ffec6
Show file tree
Hide file tree
Showing 8 changed files with 239 additions and 26 deletions.
2 changes: 1 addition & 1 deletion rtc
Submodule rtc updated from 5260d3 to 3bb55d
Binary file added test-data/output.h264
Binary file not shown.
Binary file added test-data/output.ogg
Binary file not shown.
Binary file added test-data/output_vp8.ivf
Binary file not shown.
Binary file added test-data/output_vp9.ivf
Binary file not shown.
158 changes: 135 additions & 23 deletions tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,35 @@ use log::LevelFilter::Debug;
use log::{error, info};
use rand::random;
use sfu::{EndpointId, SessionId};
use std::collections::HashMap;
use std::io::Write;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Notify;
use webrtc::api::interceptor_registry::register_default_interceptors;
use webrtc::api::media_engine::MediaEngine;
use webrtc::api::APIBuilder;
use webrtc::data_channel::data_channel_message::DataChannelMessage;
use webrtc::data_channel::RTCDataChannel;
use webrtc::ice_transport::ice_connection_state::RTCIceConnectionState;
use webrtc::interceptor::registry::Registry;
use webrtc::peer_connection::configuration::RTCConfiguration;
use webrtc::peer_connection::peer_connection_state::RTCPeerConnectionState;
use webrtc::peer_connection::sdp::sdp_type::RTCSdpType;
use webrtc::peer_connection::sdp::session_description::RTCSessionDescription;
use webrtc::peer_connection::RTCPeerConnection;
use webrtc::rtp_transceiver::rtp_codec::RTCRtpCodecCapability;
use webrtc::rtp_transceiver::rtp_transceiver_direction::RTCRtpTransceiverDirection;
use webrtc::rtp_transceiver::{RTCRtpTransceiver, RTCRtpTransceiverInit};
use webrtc::track::track_local::track_local_static_sample::TrackLocalStaticSample;
use webrtc::track::track_local::TrackLocal;

pub const HOST: &'static str = "127.0.0.1";
pub const SIGNAL_PORT: u16 = 8080;
pub const OGG_PAGE_DURATION: Duration = Duration::from_millis(20);

fn pretty_sdp(input: &str) -> String {
input.replace("\\r\\n", "\n")
}

pub async fn setup_peer_connection(
config: RTCConfiguration,
Expand Down Expand Up @@ -92,12 +105,12 @@ pub async fn setup_peer_connection(

pub async fn setup_peer_connections(
configs: Vec<RTCConfiguration>,
) -> Result<HashMap<EndpointId, Arc<RTCPeerConnection>>> {
let mut peer_connections = HashMap::new();
) -> Result<Vec<(EndpointId, Arc<RTCPeerConnection>)>> {
let mut peer_connections = Vec::with_capacity(configs.len());

for config in configs {
let (endpoint_id, peer_connection) = setup_peer_connection(config).await?;
peer_connections.insert(endpoint_id, peer_connection);
peer_connections.push((endpoint_id, peer_connection));
}

Ok(peer_connections)
Expand All @@ -110,7 +123,7 @@ pub async fn teardown_peer_connection(pc: Arc<RTCPeerConnection>) -> Result<()>
}

pub async fn teardown_peer_connections(
pcs: HashMap<EndpointId, Arc<RTCPeerConnection>>,
pcs: Vec<(EndpointId, Arc<RTCPeerConnection>)>,
) -> Result<()> {
for (_, pc) in pcs {
teardown_peer_connection(pc).await?;
Expand Down Expand Up @@ -140,29 +153,79 @@ async fn signaling(
std::str::from_utf8(&hyper::body::to_bytes(resp.into_body()).await?)?.to_string();
info!(
"{}/{}: answer sdp {}",
session_id, endpoint_id, answer_payload
session_id,
endpoint_id,
pretty_sdp(&answer_payload)
);
let answer = serde_json::from_str::<RTCSessionDescription>(&answer_payload)?;

Ok(answer)
}

pub async fn connect(
pub async fn renegotiate(
host: &str,
signal_port: u16,
session_id: SessionId,
endpoint_id: EndpointId,
peer_connection: &Arc<RTCPeerConnection>,
data_channel: Option<&Arc<RTCDataChannel>>,
) -> Result<()> {
// Create an offer to send to the other process
let offer = peer_connection.create_offer(None).await?;

// Send our offer to the HTTP server listening in the other process
let offer_payload = serde_json::to_string(&offer)?;
info!(
"{}/{}: offer sdp {}",
session_id,
endpoint_id,
pretty_sdp(&offer_payload)
);

// Sets the LocalDescription, and starts our UDP listeners
// Note: this will start the gathering of ICE candidates
peer_connection.set_local_description(offer).await?;

if let Some(data_channel) = data_channel {
data_channel.send_text(offer_payload).await?;
} else {
let answer = signaling(host, signal_port, session_id, endpoint_id, offer_payload).await?;
peer_connection.set_remote_description(answer).await?;
}

Ok(())
}

pub async fn connect(
host: &str,
signal_port: u16,
session_id: SessionId,
endpoint_id: EndpointId,
peer_connection: &Arc<RTCPeerConnection>,
) -> Result<Arc<RTCDataChannel>> {
// Create a datachannel with label 'data'
let data_channel = peer_connection.create_data_channel("data", None).await?;

// Register channel opening handling
let data_channel_opened_notify_tx = Arc::new(Notify::new());
let data_channel_opened_ready_notify_rx = data_channel_opened_notify_tx.clone();
data_channel.on_open(Box::new(move || {
info!("DataChannel is opened");
data_channel_opened_notify_tx.notify_waiters();
Box::pin(async {})
}));

// Register SDP message handling
let peer_connection_clone = peer_connection.clone();
let data_channel_clone = data_channel.clone();
data_channel.on_message(Box::new(move |msg: DataChannelMessage| {
let sdp_str = String::from_utf8(msg.data.to_vec()).unwrap();
info!("{session_id}/{endpoint_id}: SDP from DataChannel: '{sdp_str}'");
info!(
"{}/{}: SDP from DataChannel: {}",
session_id,
endpoint_id,
pretty_sdp(&sdp_str)
);
let sdp = match serde_json::from_str::<RTCSessionDescription>(&sdp_str) {
Ok(sdp) => sdp,
Err(err) => {
Expand Down Expand Up @@ -200,7 +263,12 @@ pub async fn connect(
return;
}
};
info!("{session_id}/{endpoint_id}: SDP to DataChannel: '{answer_str}'");
info!(
"{}/{}: SDP to DataChannel: '{}'",
session_id,
endpoint_id,
pretty_sdp(&answer_str)
);
if let Err(err) = dc.send_text(answer_str).await {
error!("data channel send answer error {:?}", err);
assert!(false);
Expand All @@ -222,22 +290,66 @@ pub async fn connect(
})
}));

// Create an offer to send to the other process
let offer = peer_connection.create_offer(None).await?;
renegotiate(
host,
signal_port,
session_id,
endpoint_id,
peer_connection,
None,
)
.await?;

// Send our offer to the HTTP server listening in the other process
let offer_payload = serde_json::to_string(&offer)?;
info!(
"{}/{}: offer sdp {}",
session_id, endpoint_id, offer_payload
);
let ice_ready_notify_tx = Arc::new(Notify::new());
let ice_ready_notify_rx = ice_ready_notify_tx.clone();

// Sets the LocalDescription, and starts our UDP listeners
// Note: this will start the gathering of ICE candidates
peer_connection.set_local_description(offer).await?;
// Set the handler for ICE connection state
// This will notify you when the peer has connected/disconnected
peer_connection.on_ice_connection_state_change(Box::new(
move |connection_state: RTCIceConnectionState| {
info!("Connection State has changed {connection_state}");
if connection_state == RTCIceConnectionState::Connected {
ice_ready_notify_tx.notify_waiters();
}
Box::pin(async {})
},
));

let answer = signaling(host, signal_port, session_id, endpoint_id, offer_payload).await?;
peer_connection.set_remote_description(answer).await?;
// Wait for connection established
ice_ready_notify_rx.notified().await;

Ok(())
// Wait for data channel opened
data_channel_opened_ready_notify_rx.notified().await;

Ok(data_channel)
}

pub async fn add_track(
peer_connection: &Arc<RTCPeerConnection>,
mime_type: &str,
track_id: &str,
direction: RTCRtpTransceiverDirection,
) -> Result<Arc<RTCRtpTransceiver>> {
// Create a video track
let track = Arc::new(TrackLocalStaticSample::new(
RTCRtpCodecCapability {
mime_type: mime_type.to_owned(),
..Default::default()
},
track_id.to_owned(),
"webrtc-rs".to_owned(),
));

// Add this newly created track to the PeerConnection
let rtp_transceiver = peer_connection
.add_transceiver_from_track(
Arc::clone(&track) as Arc<dyn TrackLocal + Send + Sync>,
Some(RTCRtpTransceiverInit {
direction,
send_encodings: vec![],
}),
)
.await?;

Ok(rtp_transceiver)
}
4 changes: 2 additions & 2 deletions tests/integration_test.rs → tests/data_channels_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ async fn test_data_channels() -> anyhow::Result<()> {
}
};

for (&endpoint_id, peer_connection) in peer_connections.iter() {
match common::connect(HOST, SIGNAL_PORT, session_id, endpoint_id, peer_connection).await {
for (endpoint_id, peer_connection) in peer_connections.iter() {
match common::connect(HOST, SIGNAL_PORT, session_id, *endpoint_id, peer_connection).await {
Ok(ok) => ok,
Err(err) => {
error!("{}/{}: error {}", session_id, endpoint_id, err);
Expand Down
101 changes: 101 additions & 0 deletions tests/play_from_disk_vpx_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
use crate::common::{HOST, SIGNAL_PORT};
use log::{error, info};
use rand::random;
use sfu::SessionId;
use webrtc::api::media_engine::MIME_TYPE_VP8;
use webrtc::ice_transport::ice_server::RTCIceServer;
use webrtc::peer_connection::configuration::RTCConfiguration;
use webrtc::rtp_transceiver::rtp_transceiver_direction::RTCRtpTransceiverDirection;

// importing common module.
mod common;

#[tokio::test]
async fn test_play_from_disk_vpx_1to1() -> anyhow::Result<()> {
// Prepare the configuration
let session_id: SessionId = random::<u64>();
let config = RTCConfiguration {
ice_servers: vec![RTCIceServer {
urls: vec!["stun:stun.l.google.com:19302".to_owned()],
..Default::default()
}],
..Default::default()
};

let peer_connections = match common::setup_peer_connections(vec![config.clone(), config]).await
{
Ok(ok) => ok,
Err(err) => {
error!("{}: error {}", session_id, err);
return Err(err.into());
}
};

let mut data_channels = vec![];
for (endpoint_id, peer_connection) in peer_connections.iter() {
let data_channel =
match common::connect(HOST, SIGNAL_PORT, session_id, *endpoint_id, peer_connection)
.await
{
Ok(ok) => ok,
Err(err) => {
error!("{}/{}: error {}", session_id, endpoint_id, err);
return Err(err.into());
}
};
data_channels.push(data_channel);
}

let rtp_transceiver = match common::add_track(
&peer_connections[0].1,
MIME_TYPE_VP8,
"video_track",
RTCRtpTransceiverDirection::Sendonly,
)
.await
{
Ok(ok) => ok,
Err(err) => {
error!("{}/{}: error {}", session_id, peer_connections[0].0, err);
return Err(err.into());
}
};

// Read incoming RTCP packets
// Before these packets are returned they are processed by interceptors. For things
// like NACK this needs to be called.
tokio::spawn(async move {
let rtp_sender = rtp_transceiver.sender().await;
while let Ok((rtcp_packets, _)) = rtp_sender.read_rtcp().await {
info!("received RTCP packets {:?}", rtcp_packets);
//TODO: check RTCP report and handle cancel
}
});

match common::renegotiate(
HOST,
SIGNAL_PORT,
session_id,
peer_connections[0].0,
&peer_connections[0].1,
Some(&data_channels[0]),
)
.await
{
Ok(ok) => ok,
Err(err) => {
error!("{}/{}: error {}", session_id, peer_connections[0].0, err);
return Err(err.into());
}
};

match common::teardown_peer_connections(peer_connections).await {
Ok(ok) => ok,
Err(err) => {
error!("{}: error {}", session_id, err);
return Err(err.into());
}
}

Ok(())
}

0 comments on commit 94ffec6

Please sign in to comment.