Skip to content

Commit

Permalink
Merge branch 'algesten:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
giangndm authored Apr 28, 2024
2 parents 0c5ab6c + 3902a6d commit e9e19b2
Show file tree
Hide file tree
Showing 3 changed files with 197 additions and 54 deletions.
157 changes: 105 additions & 52 deletions src/ice/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ const TIMING_ADVANCE: Duration = Duration::from_millis(50);
/// Handles the ICE protocol for a given peer.
///
/// Each connection between two peers corresponds to one [`IceAgent`] on either end.
/// To form connections to multiple peers, a peer needs to create a dedicated [`IceAgent`] for each one.
/// To form connections to multiple peers, a peer needs to create a dedicated [`IceAgent`] for
/// each one.
#[derive(Debug)]
pub struct IceAgent {
/// Last time handle_timeout run (paced by timing_advance).
Expand Down Expand Up @@ -469,37 +470,52 @@ impl IceAgent {

c.set_local_preference(pref);

// Tie this ufrag to this ICE-session.
c.set_ufrag(&self.local_credentials.ufrag);

// A candidate is redundant if and only if its transport address and base equal those
// of another candidate. The agent SHOULD eliminate the redundant
// candidate with the lower priority.
//
// NB this must be done _after_ set_local_preference(), since the prio() used in the
// elimination is calculated from that preference.
if let Some((idx, other)) =
let maybe_redundant =
self.local_candidates.iter_mut().enumerate().find(|(_, v)| {
v.addr() == c.addr() && v.base() == c.base() && v.proto() == c.proto()
})
{
if c.prio() < other.prio() {
// The new candidate is not better than what we already got.
});

let local_idx = if let Some((idx, other)) = maybe_redundant {
if other.discarded() && c.kind() == other.kind() && c.raddr() == other.raddr() {
debug!("Re-enable previously discarded local: {:?}", other);
other.set_discarded(false);
idx
} else {
if c.prio() < other.prio() {
// The new candidate is not better than what we already got.
debug!(
"Reject redundant candidate, current: {:?} rejected: {:?}",
other, c
);
return false;
}

// Stop using the current candidate in favor of the new one.
debug!(
"Reject redundant candidate, current: {:?} rejected: {:?}",
"Replace redundant candidate, current: {:?} replaced with: {:?}",
other, c
);
return false;
}

// Stop using the current candidate in favor of the new one.
debug!(
"Replace redundant candidate, current: {:?} replaced with: {:?}",
other, c
);
other.set_discarded();
self.discard_candidate_pairs_by_local(idx);
}
other.set_discarded(true);
self.discard_candidate_pairs_by_local(idx);

// Tie this ufrag to this ICE-session.
c.set_ufrag(&self.local_credentials.ufrag);
info!("Add local candidate: {:?}", c);
self.local_candidates.push(c);
self.local_candidates.len() - 1
}
} else {
info!("Add local candidate: {:?}", c);
self.local_candidates.push(c);
self.local_candidates.len() - 1
};

// These are the indexes of the remote candidates this candidate should be paired with.
let remote_idxs: Vec<_> = self
Expand All @@ -510,12 +526,6 @@ impl IceAgent {
.map(|(i, _)| i)
.collect();

info!("Add local candidate: {:?}", c);

self.local_candidates.push(c);

let local_idxs = [self.local_candidates.len() - 1];

// We always run in trickle ice mode.
//
// https://www.rfc-editor.org/rfc/rfc8838.html#section-10
Expand All @@ -525,7 +535,7 @@ impl IceAgent {
// TODO: The trickle ice spec is strange. What does it mean "has been trickled to the
// remote party"? Since we don't get a confirmation that the candidate has been received
// by the remote party, whether we form local pairs directly or later seems irrelevant.
self.form_pairs(&local_idxs, &remote_idxs);
self.form_pairs(&[local_idx], &remote_idxs);

true
}
Expand Down Expand Up @@ -581,10 +591,25 @@ impl IceAgent {
*existing = c;
idx
} else {
info!("Add remote candidate: {:?}", c);
let maybe_discarded = self.remote_candidates.iter().position(|o| {
o.discarded()
&& c.addr() == o.addr()
&& c.base() == o.base()
&& c.proto() == o.proto()
&& c.kind() == o.kind()
&& c.raddr() == o.raddr()
});

self.remote_candidates.push(c);
self.remote_candidates.len() - 1
if let Some(idx) = maybe_discarded {
let other = &mut self.remote_candidates[idx];
debug!("Re-enable previously discarded remote: {:?}", other);
other.set_discarded(false);
idx
} else {
info!("Add remote candidate: {:?}", c);
self.remote_candidates.push(c);
self.remote_candidates.len() - 1
}
};

// These are the indexes of the local candidates this candidate should be paired with.
Expand Down Expand Up @@ -710,7 +735,7 @@ impl IceAgent {
}) {
if !other.discarded() {
info!("Local candidate to discard {:?}", other);
other.set_discarded();
other.set_discarded(true);
self.discard_candidate_pairs_by_local(idx);
return true;
}
Expand All @@ -729,7 +754,7 @@ impl IceAgent {
{
if !other.discarded() {
info!("Remote candidate to discard {:?}", other);
other.set_discarded();
other.set_discarded(true);
self.discard_candidate_pairs_by_remote(idx);
return true;
}
Expand Down Expand Up @@ -901,7 +926,8 @@ impl IceAgent {

/// Provide the current time to the [`IceAgent`].
///
/// Typically, you will want to call [`IceAgent::poll_timeout`] and "wake-up" the agent once that time is reached.
/// Typically, you will want to call [`IceAgent::poll_timeout`] and "wake-up"
/// the agent once that time is reached.
pub fn handle_timeout(&mut self, now: Instant) {
// This happens exactly once because evaluate_state() below will
// switch away from New -> Checking.
Expand Down Expand Up @@ -1169,6 +1195,18 @@ impl IceAgent {
trace!("Remote candidate for STUN request found");
idx
} else {
let maybe_discarded = self
.remote_candidates
.iter()
.any(|c| c.discarded() && c.proto() == req.proto && c.addr() == req.source);

if maybe_discarded {
// The remote has been discarded, we do not want to create a
// peer reflexive in this case.
trace!("STUN request ignored because remote candidate is discarded");
return;
}

// o The priority is the value of the PRIORITY attribute in the Binding
// request.
//
Expand Down Expand Up @@ -1214,8 +1252,10 @@ impl IceAgent {
}) {
Some((i, _)) => i,
None => {
// Receiving traffic for an IP address that neither is a HOST nor RELAY is most likely a configuration fault where the user forgot to add a candidate for the local interface.
// We are network-connected application so we need to handle this gracefully: Log a message and discard the packet.
// Receiving traffic for an IP address that neither is a HOST nor RELAY
// is most likely a configuration fault where the user forgot to add a
// candidate for the local interface. We are network-connected application
// so we need to handle this gracefully: Log a message and discard the packet.

debug!(
"Discarding STUN request on unknown interface: {}",
Expand Down Expand Up @@ -1602,7 +1642,6 @@ impl IceAgent {
mod test {
use super::*;
use std::net::SocketAddr;
use std::sync::Once;

impl IceAgent {
fn pair_indexes(&self) -> Vec<(usize, usize)> {
Expand Down Expand Up @@ -1806,22 +1845,6 @@ mod test {

#[test]
fn form_pairs_replace_remote_redundant() {
use std::env;
use tracing_subscriber::{fmt, prelude::*, EnvFilter};

if env::var("RUST_LOG").is_err() {
env::set_var("RUST_LOG", "debug");
}

static START: Once = Once::new();

START.call_once(|| {
tracing_subscriber::registry()
.with(fmt::layer())
.with(EnvFilter::from_default_env())
.init();
});

let mut agent = IceAgent::new();
agent.set_ice_lite(true);

Expand Down Expand Up @@ -1856,6 +1879,36 @@ mod test {
assert_eq!(pair.remote_binding_request_time, Some(now));
}

#[test]
fn form_pairs_skip_invalidated_local() {
let mut agent = IceAgent::new();

let local = Candidate::test_peer_rflx(ipv4_2(), ipv4_1(), "udp");

agent.add_local_candidate(local.clone());
agent.invalidate_candidate(&local);

agent.add_remote_candidate(Candidate::host(ipv4_3(), "udp").unwrap());

// There should be no pairs since we invalidated the local candidate.
assert_eq!(agent.pair_indexes(), []);
}

#[test]
fn form_pairs_skip_invalidated_remote() {
let mut agent = IceAgent::new();

let remote = Candidate::host(ipv4_3(), "udp").unwrap();

agent.add_remote_candidate(remote.clone());
agent.invalidate_candidate(&remote);

agent.add_local_candidate(Candidate::test_peer_rflx(ipv4_2(), ipv4_1(), "udp"));

// There should be no pairs since we invalidated the local candidate.
assert_eq!(agent.pair_indexes(), []);
}

#[test]
fn poll_time_must_timing_advance() {
let mut agent = IceAgent::new();
Expand Down
4 changes: 2 additions & 2 deletions src/ice/candidate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,8 +401,8 @@ impl Candidate {
self.local_preference = Some(v);
}

pub(crate) fn set_discarded(&mut self) {
self.discarded = true;
pub(crate) fn set_discarded(&mut self, discarded: bool) {
self.discarded = discarded;
}

pub(crate) fn discarded(&self) -> bool {
Expand Down
90 changes: 90 additions & 0 deletions src/ice/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,96 @@ mod test {
assert!(a2.time.duration_since(a2_time) < STUN_TIMEOUT);
}

#[test]
pub fn re_adding_invalidated_local_candidate() {
let mut a1 = TestAgent::new(info_span!("L"));
let mut a2 = TestAgent::new(info_span!("R"));

let c1 = host("1.1.1.1:1000", "udp");
a1.add_local_candidate(c1.clone());
a2.add_remote_candidate(c1.clone());
let c2 = host("2.2.2.2:1000", "udp");
a2.add_local_candidate(c2.clone());
a1.add_remote_candidate(c2);
a1.set_controlling(true);
a2.set_controlling(false);

loop {
if a1.state().is_connected() && a2.state().is_connected() {
break;
}
progress(&mut a1, &mut a2);
}

a1.agent.invalidate_candidate(&c1);

// Let time pass until it disconnects.
loop {
if a1.state().is_disconnected() && a2.state().is_disconnected() {
break;
}
progress(&mut a1, &mut a2);
}

// Add back the invalidated candidate
a1.add_local_candidate(c1);

// progress() fails after 100 number of polls.
a1.progress_count = 0;
a2.progress_count = 0;
loop {
if a1.state().is_connected() && a2.state().is_connected() {
break;
}
progress(&mut a1, &mut a2);
}
}

#[test]
pub fn re_adding_invalidated_remote_candidate() {
let mut a1 = TestAgent::new(info_span!("L"));
let mut a2 = TestAgent::new(info_span!("R"));

let c1 = host("1.1.1.1:1000", "udp");
a1.add_local_candidate(c1.clone());
a2.add_remote_candidate(c1);
let c2 = host("2.2.2.2:1000", "udp");
a2.add_local_candidate(c2.clone());
a1.add_remote_candidate(c2.clone());
a1.set_controlling(true);
a2.set_controlling(false);

loop {
if a1.state().is_connected() && a2.state().is_connected() {
break;
}
progress(&mut a1, &mut a2);
}

a1.agent.invalidate_candidate(&c2);

// Let time pass until it disconnects.
loop {
if a1.state().is_disconnected() && a2.state().is_disconnected() {
break;
}
progress(&mut a1, &mut a2);
}

// Add back the invalidated candidate
a1.add_remote_candidate(c2);

// progress() fails after 100 number of polls.
a1.progress_count = 0;
a2.progress_count = 0;
loop {
if a1.state().is_connected() && a2.state().is_connected() {
break;
}
progress(&mut a1, &mut a2);
}
}

#[test]
pub fn ice_lite_no_connection() {
let mut a1 = TestAgent::new(info_span!("L"));
Expand Down

0 comments on commit e9e19b2

Please sign in to comment.