From aae90a08cd4a300fcd7a322176a560bfe8078243 Mon Sep 17 00:00:00 2001 From: Jake Shadle Date: Wed, 3 Apr 2024 16:36:54 +0200 Subject: [PATCH 1/9] Fix timestamp confusion --- Cargo.lock | 17 - Cargo.toml | 1 - src/cli/qcmp.rs | 39 ++- src/codec/qcmp.rs | 455 +++++++++++++++++--------- src/components/proxy/packet_router.rs | 7 +- src/components/proxy/sessions.rs | 7 +- src/filters/timestamp.rs | 2 +- src/lib.rs | 7 +- src/net/phoenix.rs | 120 ++++--- src/time.rs | 74 +++++ tests/qcmp.rs | 19 +- 11 files changed, 493 insertions(+), 255 deletions(-) create mode 100644 src/time.rs diff --git a/Cargo.lock b/Cargo.lock index 9aadc9b757..55a26fc3ba 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1584,12 +1584,6 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" -[[package]] -name = "minimal-lexical" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" - [[package]] name = "miniz_oxide" version = "0.7.2" @@ -1628,16 +1622,6 @@ dependencies = [ "libc", ] -[[package]] -name = "nom" -version = "7.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a" -dependencies = [ - "memchr", - "minimal-lexical", -] - [[package]] name = "notify" version = "6.1.1" @@ -2104,7 +2088,6 @@ dependencies = [ "libflate", "lz4_flex", "maxminddb", - "nom", "notify", "num_cpus", "once_cell", diff --git a/Cargo.toml b/Cargo.toml index 953e97f708..73ad0f0ec6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -117,7 +117,6 @@ trust-dns-resolver = { version = "0.23.0", features = [ "dns-over-https-rustls", ] } async-trait = "0.1.73" -nom = "7.1.3" strum = "0.25.0" strum_macros = "0.25.2" cfg-if = "1.0.0" diff --git a/src/cli/qcmp.rs b/src/cli/qcmp.rs index 6f7fa5d393..487678bbfd 100644 --- a/src/cli/qcmp.rs +++ b/src/cli/qcmp.rs @@ -15,7 +15,10 @@ */ use std::net::SocketAddr; -use crate::codec::qcmp::Protocol; +use crate::{ + codec::qcmp::{self, Protocol}, + time::{DurationNanos, UtcTimestamp}, +}; #[derive(Clone, Debug, clap::Subcommand)] pub enum Qcmp { @@ -43,18 +46,19 @@ impl Ping { let socket = tokio::net::UdpSocket::bind(addr).await?; let mut results = Vec::new(); - let mut buf = [0; u16::MAX as usize]; + let mut recv_buf = [0; 1500 /* MTU */]; + let mut send_buf = qcmp::QcmpPacket::default(); for _ in 0..self.amount { let ping = Protocol::ping(); socket - .send_to(&ping.encode(), &self.endpoint) + .send_to(ping.encode(&mut send_buf), &self.endpoint) .await .unwrap(); let Ok(socket_result) = tokio::time::timeout( std::time::Duration::from_secs(1), - socket.recv_from(&mut buf), + socket.recv_from(&mut recv_buf), ) .await else { @@ -70,8 +74,8 @@ impl Ping { } }; - let recv_time = crate::unix_timestamp(); - let reply = Protocol::parse(&buf[..size]).unwrap().unwrap(); + let recv_time = UtcTimestamp::now(); + let reply = Protocol::parse(&recv_buf[..size]).unwrap().unwrap(); if ping.nonce() != reply.nonce() { tracing::error!(sent_nonce=%ping.nonce(), recv_nonce=%reply.nonce(), "mismatched nonces"); @@ -79,16 +83,16 @@ impl Ping { } let delay = reply.round_trip_delay(recv_time).unwrap(); - let duration = std::time::Duration::from_nanos(delay as u64); - tracing::info!(delay_millis=%format!("{:.2}", duration.as_secs_f64() * 1000.0), "successful ping"); + tracing::info!(delay_millis=%format!("{:.2}", delay.duration().as_secs_f64() * 1000.0), "successful ping"); results.push(delay); } match median(&mut results) { Some(median) => { - let median = std::time::Duration::from_nanos(median as u64); + let median = median.duration(); let average = std::time::Duration::from_nanos( - (results.iter().sum::() / results.len() as i64) as u64, + (results.iter().map(|dn| dn.nanos() as i128).sum::() + / results.len() as i128) as u64, ); tracing::info!( median_millis=%format!("{:.2}", median.as_secs_f64() * 1000.0), @@ -105,7 +109,7 @@ impl Ping { } } -fn median(numbers: &mut [i64]) -> Option { +fn median(numbers: &mut [DurationNanos]) -> Option { let len = numbers.len(); if len == 0 { return None; @@ -121,7 +125,7 @@ fn median(numbers: &mut [i64]) -> Option { // Even number of elements: Return the average of the two middle ones. let mid1 = numbers[(len - 1) / 2]; let mid2 = numbers[len / 2]; - Some((mid1 + mid2) / 2) + Some(DurationNanos::from_nanos((mid1.nanos() + mid2.nanos()) / 2)) } } @@ -129,6 +133,10 @@ fn median(numbers: &mut [i64]) -> Option { mod tests { use super::*; + fn dn(nanos: i64) -> DurationNanos { + DurationNanos::from_nanos(nanos) + } + #[test] fn empty() { assert_eq!(median(&mut []), None); @@ -136,16 +144,17 @@ mod tests { #[test] fn single() { - assert_eq!(median(&mut [42]), Some(42)); + let dn = dn(42); + assert_eq!(median(&mut [dn]), Some(dn)); } #[test] fn odd() { - assert_eq!(median(&mut [3, 1, 2]), Some(2)); + assert_eq!(median(&mut [dn(3), dn(1), dn(2)]), Some(dn(2))); } #[test] fn even() { - assert_eq!(median(&mut [4, 3, 1, 2]), Some(2)); + assert_eq!(median(&mut [dn(4), dn(3), dn(1), dn(2)]), Some(dn(2))); } } diff --git a/src/codec/qcmp.rs b/src/codec/qcmp.rs index 908410ecf9..bd09b15011 100644 --- a/src/codec/qcmp.rs +++ b/src/codec/qcmp.rs @@ -16,44 +16,165 @@ //! Logic for parsing and generating Quilkin Control Message Protocol (QCMP) messages. +use crate::{ + net::{ + phoenix::{DistanceMeasure, Measurement}, + DualStackEpollSocket, DualStackLocalSocket, + }, + time::{DurationNanos, UtcTimestamp}, +}; use std::sync::Arc; - -use nom::bytes::complete; - -use crate::net::{phoenix::Measurement, DualStackEpollSocket, DualStackLocalSocket}; +#[cfg(test)] +use std::time::Duration; // Magic number to distinguish control packets from regular traffic. const MAGIC_NUMBER: &[u8] = b"QLKN"; -const TIMESTAMP_LEN: usize = (i64::BITS / 8) as usize; const VERSION: u8 = 0; -const VERSION_LEN: usize = 1; -const NONCE_LEN: usize = 1; -const LENGTH_LEN: usize = 2; -const DISCRIMINANT_LEN: usize = 1; +/// The minimum length of a QCMP packet +pub const MIN_QCMP_PACKET_LEN: usize = 4 /* MAGIC_NUMBER */ + 1 /* VERSION */ + 1 /* DISCRIMINANT */ + 1 /* NONCE */ + 2 /* LENGTH */ + std::mem::size_of::(); +/// The maximum length of a QCMP packet, including 2 additional i64 timestamps +pub const MAX_QCMP_PACKET_LEN: usize = MIN_QCMP_PACKET_LEN + std::mem::size_of::() * 2; +const PING: u8 = 0; +const PONG: u8 = 1; + +pub struct QcmpPacket { + buf: [u8; MAX_QCMP_PACKET_LEN], + len: usize, +} + +impl Default for QcmpPacket { + fn default() -> Self { + Self { + buf: [0u8; MAX_QCMP_PACKET_LEN], + len: 0, + } + } +} + +#[cfg(target_os = "linux")] +unsafe impl tokio_uring::buf::IoBuf for QcmpPacket { + fn stable_ptr(&self) -> *const u8 { + self.buf.as_ptr() + } + + fn bytes_init(&self) -> usize { + self.len + } + + fn bytes_total(&self) -> usize { + self.buf.len() + } +} type Result = std::result::Result; +struct PacketBuilder<'buf> { + packet: &'buf mut QcmpPacket, + offset: usize, +} + +impl<'buf> PacketBuilder<'buf> { + #[inline] + fn wrap(packet: &'buf mut QcmpPacket) -> Self { + packet.len = 0; + Self { packet, offset: 0 } + } + + #[inline] + fn push(&mut self, val: u8) { + self.packet.buf[self.offset] = val; + self.offset += 1; + } + + #[inline] + fn push_slice(&mut self, slice: &[u8]) { + self.packet.buf[self.offset..self.offset + slice.len()].copy_from_slice(slice); + self.offset += slice.len(); + } + + #[inline] + fn finalize(self) -> &'buf [u8] { + self.packet.buf[self.offset..].fill(0); + self.packet.len = self.offset; + &self.packet.buf[..self.offset] + } +} + +struct PacketParser<'buf> { + packet: &'buf [u8], + offset: usize, +} + +impl<'buf> PacketParser<'buf> { + fn wrap(packet: &'buf [u8]) -> Result { + if packet.len() < MIN_QCMP_PACKET_LEN { + return Err(Error::LengthMismatch( + MIN_QCMP_PACKET_LEN as _, + packet.len(), + )); + } + + Ok(Self { packet, offset: 0 }) + } + + #[inline] + fn read(&mut self) -> u8 { + let index = self.offset; + self.offset += 1; + // SAFETY: We manually check the packet size before any reads + unsafe { *self.packet.get_unchecked(index) } + } + + #[inline] + fn read_slice(&mut self) -> [u8; N] { + let mut s = [0u8; N]; + // SAFETY: We manually check the packet size before any reads + s.copy_from_slice(unsafe { self.packet.get_unchecked(self.offset..self.offset + N) }); + self.offset += N; + s + } +} + /// A measurement implementation using QCMP pings for measuring the distance /// between nodes. #[derive(Debug, Clone)] pub struct QcmpMeasurement { socket: Arc, + #[cfg(test)] + delay: Option, } impl QcmpMeasurement { pub fn new() -> crate::Result { Ok(Self { socket: Arc::new(DualStackEpollSocket::new(0)?), + #[cfg(test)] + delay: None, + }) + } + + #[cfg(test)] + pub fn with_artificial_delay(delay: Duration) -> crate::Result { + Ok(Self { + socket: Arc::new(DualStackEpollSocket::new(0)?), + delay: Some(delay), }) } } #[async_trait::async_trait] impl Measurement for QcmpMeasurement { - async fn measure_distance(&self, address: std::net::SocketAddr) -> eyre::Result<(i64, i64)> { - self.socket - .send_to(&Protocol::ping().encode(), address) - .await?; + async fn measure_distance( + &self, + address: std::net::SocketAddr, + ) -> eyre::Result { + { + let mut ping = QcmpPacket::default(); + self.socket + .send_to(Protocol::ping().encode(&mut ping), address) + .await?; + } + let mut recv = [0u8; 512]; let (size, _) = tokio::time::timeout( @@ -61,13 +182,19 @@ impl Measurement for QcmpMeasurement { self.socket.recv_from(&mut recv), ) .await??; - let now = crate::unix_timestamp(); + + #[cfg(test)] + if let Some(ad) = self.delay { + tokio::time::sleep(ad).await; + } + + let now = UtcTimestamp::now(); let Some(reply) = Protocol::parse(&recv[..size])? else { return Err(eyre::eyre!("received non qcmp packet")); }; reply - .incoming_and_outgoing_latency(now) + .distance(now) .ok_or_else(|| eyre::eyre!("received non ping reply")) } } @@ -76,11 +203,9 @@ pub fn spawn(socket: socket2::Socket, mut shutdown_rx: crate::ShutdownRx) { let port = crate::net::socket_port(&socket); uring_spawn!(tracing::debug_span!("qcmp"), async move { - // Initialize a buffer for the UDP packet. We use the maximum size of a UDP - // packet, which is the maximum value of 16 a bit integer. let mut input_buf = vec![0; 1 << 16]; let socket = DualStackLocalSocket::new(port).unwrap(); - let mut output_buf = Vec::new(); + let mut output_buf = QcmpPacket::default(); loop { let result = tokio::select! { @@ -90,7 +215,7 @@ pub fn spawn(socket: socket2::Socket, mut shutdown_rx: crate::ShutdownRx) { match result { (Ok((size, source)), new_input_buf) => { input_buf = new_input_buf; - let received_at = crate::unix_timestamp(); + let received_at = UtcTimestamp::now(); let command = match Protocol::parse(&input_buf[..size]) { Ok(Some(command)) => command, Ok(None) => { @@ -113,18 +238,15 @@ pub fn spawn(socket: socket2::Socket, mut shutdown_rx: crate::ShutdownRx) { }; Protocol::ping_reply(nonce, client_timestamp, received_at) - .encode_into_buffer(&mut output_buf); + .encode(&mut output_buf); - let mut new_output_buf = match socket.send_to(output_buf, source).await { + output_buf = match socket.send_to(output_buf, source).await { (Ok(_), buf) => buf, (Err(error), buf) => { tracing::warn!(%error, "error responding to ping"); buf } }; - - new_output_buf.clear(); - output_buf = new_output_buf; } (Err(error), new_input_buf) => { tracing::warn!(%error, "error receiving packet"); @@ -142,7 +264,7 @@ pub enum Protocol { /// latency. Ping { /// The timestamp from when the client sent the packet. - client_timestamp: i64, + client_timestamp: UtcTimestamp, /// The client's nonce. nonce: u8, }, @@ -152,13 +274,13 @@ pub enum Protocol { /// two machines. PingReply { /// The timestamp from when the client sent the ping packet. - client_timestamp: i64, + client_timestamp: UtcTimestamp, /// The client's nonce. nonce: u8, /// The timestamp from when the server received the ping packet. - server_start_timestamp: i64, + server_start_timestamp: UtcTimestamp, /// The timestamp from when the server sent the reply. - server_transmit_timestamp: i64, + server_transmit_timestamp: UtcTimestamp, }, } @@ -168,49 +290,54 @@ impl Protocol { pub fn ping() -> Self { Self::ping_with_nonce(rand::random()) } + /// Creates a [`Self::Ping`] with a user-specified nonce, should be sent /// as soon as possible from creation to maintain accuracy. pub fn ping_with_nonce(nonce: u8) -> Self { Self::Ping { nonce, - client_timestamp: crate::unix_timestamp(), + client_timestamp: UtcTimestamp::now(), } } /// Creates a [`Self::PingReply`] from the client and server start timestamp. /// It's recommended to transmit as as soon as possible from creation to /// keep the start and transmit times as accurate as possible. - pub fn ping_reply(nonce: u8, client_timestamp: i64, server_start_timestamp: i64) -> Self { + pub fn ping_reply( + nonce: u8, + client_timestamp: UtcTimestamp, + server_start_timestamp: UtcTimestamp, + ) -> Self { Self::PingReply { nonce, client_timestamp, server_start_timestamp, - server_transmit_timestamp: crate::unix_timestamp(), + server_transmit_timestamp: UtcTimestamp::now(), } } /// Encodes the protocol command into a buffer of bytes for network transmission. - pub fn encode(&self) -> Vec { - let mut buffer = Vec::new(); - self.encode_into_buffer(&mut buffer); - buffer - } - - /// Encodes the protocol command into a buffer of bytes for network transmission. - pub fn encode_into_buffer(&self, buffer: &mut Vec) { - buffer.extend(MAGIC_NUMBER); - buffer.push(VERSION); - buffer.push(self.discriminant()); - buffer.extend_from_slice(&self.discriminant_length().to_be_bytes()); - - let length = buffer.len(); + pub fn encode<'buf>(&self, buffer: &'buf mut QcmpPacket) -> &'buf [u8] { + let mut pb = PacketBuilder::wrap(buffer); + pb.push_slice(MAGIC_NUMBER); + pb.push(VERSION); + pb.push(self.discriminant()); + pb.push_slice(&self.discriminant_length().to_be_bytes()); + + #[cfg(debug_assertions)] + { + let length = pb.offset; + self.encode_payload(&mut pb); + + assert_eq!(pb.offset, length + usize::from(self.discriminant_length())); + } - self.encode_payload(buffer); + #[cfg(not(debug_assertions))] + { + self.encode_payload(&mut pb); + } - debug_assert_eq!( - buffer.len(), - length + usize::from(self.discriminant_length()) - ); + pb.finalize() } /// Returns the packet's nonce. @@ -221,25 +348,28 @@ impl Protocol { } } - fn encode_payload(&self, buffer: &mut Vec) { + fn encode_payload(&self, pb: &mut PacketBuilder<'_>) { + pb.push(self.nonce()); + + let mut ets = |ts: &UtcTimestamp| { + pb.push_slice(&ts.unix_nanos().to_be_bytes()); + }; + match self { Protocol::Ping { - nonce, - client_timestamp, + client_timestamp, .. } => { - buffer.push(*nonce); - buffer.extend_from_slice(&client_timestamp.to_be_bytes()) + ets(client_timestamp); } Protocol::PingReply { - nonce, client_timestamp, server_start_timestamp, server_transmit_timestamp, + .. } => { - buffer.push(*nonce); - buffer.extend_from_slice(&client_timestamp.to_be_bytes()); - buffer.extend_from_slice(&server_start_timestamp.to_be_bytes()); - buffer.extend_from_slice(&server_transmit_timestamp.to_be_bytes()); + ets(client_timestamp); + ets(server_start_timestamp); + ets(server_transmit_timestamp); } } } @@ -249,7 +379,10 @@ impl Protocol { /// proxy, using the same algorithm as [Network Time Protocol (NTP)][ntp]. /// /// [ntp]: https://en.wikipedia.org/wiki/Network_Time_Protocol#Clock_synchronization_algorithm - pub fn round_trip_delay(&self, client_response_timestamp: i64) -> Option { + pub fn round_trip_delay( + &self, + client_response_timestamp: UtcTimestamp, + ) -> Option { let Protocol::PingReply { client_timestamp, server_start_timestamp, @@ -260,18 +393,15 @@ impl Protocol { return None; }; - Some( - (client_response_timestamp - client_timestamp) - - (server_transmit_timestamp - server_start_timestamp), - ) + Some(DurationNanos::from_nanos( + (client_response_timestamp.unix_nanos() - client_timestamp.unix_nanos()) + - (server_transmit_timestamp.unix_nanos() - server_start_timestamp.unix_nanos()), + )) } /// If the command is [`Protocol::PingReply`], with `client_response_timestamp` /// returns the time between the client -> server, and the server -> client. - pub fn incoming_and_outgoing_latency( - &self, - client_response_timestamp: i64, - ) -> Option<(i64, i64)> { + pub fn distance(&self, client_response_timestamp: UtcTimestamp) -> Option { let Protocol::PingReply { client_timestamp, server_start_timestamp, @@ -282,17 +412,17 @@ impl Protocol { return None; }; - Some(( - server_start_timestamp - client_timestamp, - client_response_timestamp - server_transmit_timestamp, - )) + Some(DistanceMeasure { + incoming: *server_start_timestamp - *client_timestamp, + outgoing: client_response_timestamp - *server_transmit_timestamp, + }) } /// Returns the discriminant code, identifying the payload. const fn discriminant(&self) -> u8 { match self { - Self::Ping { .. } => 0, - Self::PingReply { .. } => 1, + Self::Ping { .. } => PING, + Self::PingReply { .. } => PONG, } } @@ -303,89 +433,86 @@ impl Protocol { /// The expected length of payload based on its discriminant. const fn payload_length(discriminant: u8) -> Result { - Ok(match discriminant { - 0 => NONCE_LEN as u16 + TIMESTAMP_LEN as u16, - 1 => NONCE_LEN as u16 + (TIMESTAMP_LEN as u16 * 3), + let num = match discriminant { + PING => 1, + PONG => 3, code => return Err(Error::InvalidCommand(code)), - }) + }; + + Ok(1 + std::mem::size_of::() as u16 * num) } /// Parses the provided input, and attempts to parse it as a `Protocol` - /// packet. Returning `None` if the magic number is not present, and thus - /// is not a QCMP packet, and returning `Err` when it was detected as a - /// QCMP packet, but there was an error in parsing the payload. + /// packet. + /// + /// Returns `None` if the magic number is not present, and thus is not a + /// QCMP packet, and returning `Err` when it was detected as a QCMP packet, + /// but there was an error in parsing the payload. pub fn parse(input: &[u8]) -> Result> { - let Ok((input, _)) = complete::tag::<_, _, nom::error::Error<_>>(MAGIC_NUMBER)(input) - else { - return Ok(None); - }; + let mut pp = PacketParser::wrap(input)?; - let (input, version) = Self::parse_version(input)?; + let magic = pp.read_slice::<4>(); + if magic != MAGIC_NUMBER { + return Ok(None); + } - if version != 0 { + let version = pp.read(); + if version != VERSION { return Err(Error::UnknownVersion(version)); } - let (input, discriminant) = Self::parse_discriminant(input)?; - let (input, length) = Self::parse_length(input)?; - let payload_length = Self::payload_length(discriminant)?; + let discriminant = pp.read(); + // Now that we know the packet kind we can ensure the rest of the expected + // packet length is available to avoid panics + let size = match discriminant { + PING => MIN_QCMP_PACKET_LEN, + PONG => MAX_QCMP_PACKET_LEN, + unknown => return Err(Error::InvalidCommand(unknown)), + }; - if usize::from(length) != input.len() { - return Err(Error::LengthMismatch(length, input.len())); - } else if length != payload_length { - return Err(Error::LengthMismatch(length, payload_length.into())); + if pp.packet.len() < size { + return Err(Error::LengthMismatch(size as _, pp.packet.len())); } - match discriminant { - 0 => Self::parse_ping_payload(input).map(Some), - 1 => Self::parse_ping_reply_payload(input).map(Some), - _ => unreachable!(), - } - } + let length = u16::from_be_bytes(pp.read_slice::<2>()); + let payload_length = Self::payload_length(discriminant)?; - fn parse_length(input: &[u8]) -> nom::IResult<&[u8], u16> { - complete::take(LENGTH_LEN)(input) - .map(|(input, length)| (input, u16::from_be_bytes([length[0], length[1]]))) - } + if length != payload_length { + return Err(Error::LengthMismatch(length, payload_length.into())); + } - fn parse_version(input: &[u8]) -> nom::IResult<&[u8], u8> { - complete::take(VERSION_LEN)(input).map(|(input, version)| (input, version[0])) - } + let remainder = pp.packet.len() - pp.offset; - fn parse_nonce(input: &[u8]) -> nom::IResult<&[u8], u8> { - complete::take(NONCE_LEN)(input).map(|(input, nonce)| (input, nonce[0])) - } + if usize::from(length) != remainder { + return Err(Error::LengthMismatch(length, remainder)); + } - fn parse_discriminant(input: &[u8]) -> nom::IResult<&[u8], u8> { - complete::take(DISCRIMINANT_LEN)(input) - .map(|(input, discriminant)| (input, discriminant[0])) - } + let nonce = pp.read(); - fn parse_timestamp(input: &[u8]) -> nom::IResult<&[u8], i64> { - complete::take(TIMESTAMP_LEN)(input) - .map(|(input, ts)| (input, i64::from_be_bytes(ts.try_into().unwrap()))) - } + fn parse_timestamp(pp: &mut PacketParser<'_>) -> UtcTimestamp { + UtcTimestamp::from_nanos(i64::from_be_bytes(pp.read_slice::<8>())) + } - fn parse_ping_payload(input: &[u8]) -> Result { - let (input, nonce) = Self::parse_nonce(input)?; - let (_, client_timestamp) = Self::parse_timestamp(input)?; - Ok(Self::Ping { - nonce, - client_timestamp, - }) - } + let payload = match discriminant { + PING => Self::Ping { + nonce, + client_timestamp: parse_timestamp(&mut pp), + }, + PONG => { + let client_timestamp = parse_timestamp(&mut pp); + let server_start_timestamp = parse_timestamp(&mut pp); + let server_transmit_timestamp = parse_timestamp(&mut pp); + Self::PingReply { + nonce, + client_timestamp, + server_start_timestamp, + server_transmit_timestamp, + } + } + _ => unreachable!("we should have already verified the discriminant"), + }; - fn parse_ping_reply_payload(input: &[u8]) -> Result { - let (input, nonce) = Self::parse_nonce(input)?; - let (input, client_timestamp) = Self::parse_timestamp(input)?; - let (input, server_start_timestamp) = Self::parse_timestamp(input)?; - let (_, server_transmit_timestamp) = Self::parse_timestamp(input)?; - Ok(Self::PingReply { - nonce, - client_timestamp, - server_start_timestamp, - server_transmit_timestamp, - }) + Ok(Some(payload)) } } @@ -400,14 +527,6 @@ pub enum Error { LengthMismatch(u16, usize), #[error("unknown command code: {0}")] InvalidCommand(u8), - #[error("failed to parse packet payload: {0}")] - Parse(String), -} - -impl From>> for Error { - fn from(error: nom::Err>) -> Self { - Self::Parse(error.to_string()) - } } #[cfg(test)] @@ -425,7 +544,7 @@ mod tests { // Version 0, // Code - 0, + PING, // Length 0, 9, // Nonce @@ -438,7 +557,8 @@ mod tests { assert!(matches!(ping, Protocol::Ping { nonce: 0xBF, .. })); - assert_eq!(ping.encode(), INPUT); + let mut packet = QcmpPacket::default(); + assert_eq!(ping.encode(&mut packet), INPUT); } #[test] @@ -450,7 +570,7 @@ mod tests { // Version 0, // Code - 1, + PONG, // Length 0, 25, // Nonce @@ -467,7 +587,8 @@ mod tests { ping_reply, Protocol::PingReply { nonce: 0xBF, .. } )); - assert_eq!(ping_reply.encode(), INPUT); + let mut packet = QcmpPacket::default(); + assert_eq!(ping_reply.encode(&mut packet), INPUT); } #[test] @@ -479,7 +600,7 @@ mod tests { // Version 0, // Code (intentionally Ping) - 0, + PING, // Length 0, 25, // Nonce @@ -524,31 +645,43 @@ mod tests { #[test] fn reject_no_magic_header() { #[rustfmt::skip] - const INPUT: &[u8] = &[0xff, 0xff, 0, 0, 0, 0, 0x63, 0xb6, 0xe9, 0x57]; + const INPUT: &[u8] = &[ + // Magic + b'Q', 0xff, b'K', b'N', + // Version + 0, + // Code + PING, + // Length + 0, 9, + // Nonce + 0xBF, + // Payload + 0, 0, 0, 0, 0x63, 0xb6, 0xe9, 0x57, + ]; assert!(Protocol::parse(INPUT).unwrap().is_none()); } #[tokio::test] async fn qcmp_measurement() { - const FIFTY_MILLIS_IN_NANOS: i64 = 50_000_000; - let socket = raw_socket_with_reuse(0).unwrap(); let addr = socket.local_addr().unwrap().as_socket().unwrap(); let (_tx, rx) = crate::make_shutdown_channel(Default::default()); super::spawn(socket, rx); - tokio::time::sleep(std::time::Duration::from_millis(150)).await; + tokio::time::sleep(std::time::Duration::from_millis(10)).await; - let node = QcmpMeasurement::new().unwrap(); + let delay = std::time::Duration::from_millis(50); + let node = QcmpMeasurement::with_artificial_delay(delay).unwrap(); - let (incoming, outgoing) = node.measure_distance(addr).await.unwrap(); + let dm = node.measure_distance(addr).await.unwrap(); + let total = dm.total(); assert!( - FIFTY_MILLIS_IN_NANOS > incoming + outgoing, - "Node1's distance is {}ns, greater than {}ns", - incoming + outgoing, - FIFTY_MILLIS_IN_NANOS + total > delay && total < delay * 2, + "Node1's distance is {total:?}, expected > {delay:?} and less than {:?}", + delay * 2 ); } } diff --git a/src/components/proxy/packet_router.rs b/src/components/proxy/packet_router.rs index 16da79aa93..b7f4bdb7cd 100644 --- a/src/components/proxy/packet_router.rs +++ b/src/components/proxy/packet_router.rs @@ -5,6 +5,7 @@ use super::{ use crate::{ filters::{Filter as _, ReadContext}, pool::PoolBuffer, + time::UtcTimestamp, Config, }; use std::{net::SocketAddr, sync::Arc}; @@ -15,7 +16,7 @@ use tokio::sync::mpsc; struct DownstreamPacket { asn_info: Option, contents: PoolBuffer, - received_at: i64, + received_at: UtcTimestamp, source: SocketAddr, } @@ -121,7 +122,7 @@ impl DownstreamReceiveWorkerConfig { Ok((_size, mut source)) => { source.set_ip(source.ip().to_canonical()); let packet = DownstreamPacket { - received_at: crate::unix_timestamp(), + received_at: UtcTimestamp::now(), asn_info: crate::net::maxmind_db::MaxmindDb::lookup(source.ip()), contents, source, @@ -132,7 +133,7 @@ impl DownstreamReceiveWorkerConfig { crate::metrics::READ, packet.asn_info.as_ref(), ) - .set(packet.received_at - last_received_at); + .set((packet.received_at - last_received_at).nanos()); } last_received_at = Some(packet.received_at); diff --git a/src/components/proxy/sessions.rs b/src/components/proxy/sessions.rs index 1d47217405..30f42900c6 100644 --- a/src/components/proxy/sessions.rs +++ b/src/components/proxy/sessions.rs @@ -32,6 +32,7 @@ use crate::{ net::maxmind_db::IpNetEntry, net::DualStackLocalSocket, pool::{BufferPool, FrozenPoolBuffer, PoolBuffer}, + time::UtcTimestamp, Loggable, ShutdownRx, }; @@ -202,9 +203,9 @@ impl SessionPool { packet: PoolBuffer, mut recv_addr: SocketAddr, port: u16, - last_received_at: &mut Option, + last_received_at: &mut Option, ) { - let received_at = crate::unix_timestamp(); + let received_at = UtcTimestamp::now(); recv_addr.set_ip(recv_addr.ip().to_canonical()); let (downstream_addr, asn_info): (SocketAddr, Option) = { let storage = self.storage.read().await; @@ -222,7 +223,7 @@ impl SessionPool { if let Some(last_received_at) = last_received_at { crate::metrics::packet_jitter(crate::metrics::WRITE, asn_info) - .set(received_at - *last_received_at); + .set((received_at - *last_received_at).nanos()); } *last_received_at = Some(received_at); diff --git a/src/filters/timestamp.rs b/src/filters/timestamp.rs index 181f92a0f7..315a14d46c 100644 --- a/src/filters/timestamp.rs +++ b/src/filters/timestamp.rs @@ -165,7 +165,7 @@ mod tests { ); ctx.metadata.insert( TIMESTAMP_KEY.into(), - Value::Number(crate::unix_timestamp() as u64), + Value::Number(crate::time::UtcTimestamp::now().unix() as u64), ); filter.read(&mut ctx).await.unwrap(); diff --git a/src/lib.rs b/src/lib.rs index e733e0339a..a80081da88 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -19,6 +19,7 @@ pub(crate) mod collections; pub(crate) mod metrics; pub mod pool; +pub mod time; // Above other modules for thr `uring_spawn` macro. #[macro_use] @@ -78,12 +79,6 @@ pub(crate) trait Loggable { fn log(&self); } -/// Gets the current [Unix timestamp](https://en.wikipedia.org/wiki/Unix_time) -#[inline] -pub fn unix_timestamp() -> i64 { - time::OffsetDateTime::now_utc().unix_timestamp() -} - #[cfg(doctest)] mod external_doc_tests { #![doc = include_str!("../docs/src/services/proxy/filters.md")] diff --git a/src/net/phoenix.rs b/src/net/phoenix.rs index 8261b4ad55..ed167cf8f4 100644 --- a/src/net/phoenix.rs +++ b/src/net/phoenix.rs @@ -162,12 +162,48 @@ pub fn spawn( Ok(()) } +use crate::time::DurationNanos; + +#[derive(Copy, Clone)] +#[cfg_attr(test, derive(Debug))] +pub struct DistanceMeasure { + pub incoming: DurationNanos, + pub outgoing: DurationNanos, +} + +impl Default for DistanceMeasure { + fn default() -> Self { + Self::from((0, 0)) + } +} + +impl From<(i64, i64)> for DistanceMeasure { + fn from(value: (i64, i64)) -> Self { + Self { + incoming: DurationNanos::from_nanos(value.0), + outgoing: DurationNanos::from_nanos(value.1), + } + } +} + +impl DistanceMeasure { + #[inline] + pub fn total_nanos(self) -> i64 { + self.incoming.nanos() + self.outgoing.nanos() + } + + #[inline] + pub fn total(self) -> std::time::Duration { + self.incoming.duration() + self.outgoing.duration() + } +} + /// An implementation of measuring the network difference between two nodes. #[async_trait] pub trait Measurement { /// Gets the difference between this node and `address`, returning the /// latency in nanoseconds on success. - async fn measure_distance(&self, address: SocketAddr) -> eyre::Result<(i64, i64)>; + async fn measure_distance(&self, address: SocketAddr) -> eyre::Result; } /// A `Phoenix` instance maintains a virtual coordinate space for nodes in a @@ -206,16 +242,15 @@ impl Phoenix { /// and update their coordinates. pub async fn background_update_task(&self) { let mut current_interval = self.interval_range.start; - let mut first = true; + let mut first = Some(self.all_nodes()); loop { let mut total_difference = 0; let mut count = 0; let nodes_to_probe = first - .then(|| self.all_nodes()) + .take() .unwrap_or_else(|| self.random_subset_of_nodes()); - first = false; for address in nodes_to_probe { let Some(mut node) = self.nodes.get_mut(&address) else { @@ -224,9 +259,9 @@ impl Phoenix { }; match self.measurement.measure_distance(address).await { - Ok((incoming_distance, outgoing_distance)) => { - node.adjust_coordinates(incoming_distance, outgoing_distance); - total_difference += outgoing_distance + incoming_distance; + Ok(distance) => { + node.adjust_coordinates(distance); + total_difference += distance.total_nanos(); count += 1; } Err(error) => { @@ -247,12 +282,8 @@ impl Phoenix { } // Ensure current_interval remains within bounds - if current_interval < self.interval_range.start { - current_interval = self.interval_range.start; - } - if current_interval > self.interval_range.end { - current_interval = self.interval_range.end; - } + current_interval = + current_interval.clamp(self.interval_range.start, self.interval_range.end); } let _ = self.update_watcher.0.send(()); @@ -302,11 +333,10 @@ impl Phoenix { .collect::>() { if let Some(mut node) = self.nodes.get_mut(&address) { - let Ok((incoming, outgoing)) = self.measurement.measure_distance(address).await - else { + let Ok(distance) = self.measurement.measure_distance(address).await else { continue; }; - node.adjust_coordinates(incoming, outgoing); + node.adjust_coordinates(distance); } else { self.nodes.entry(address).and_modify(|node| { node.increase_error_estimate(); @@ -497,21 +527,22 @@ impl Node { self.error_estimate += 0.1; } - fn adjust_coordinates(&mut self, incoming_distance: i64, outgoing_distance: i64) { + fn adjust_coordinates(&mut self, distance: DistanceMeasure) { + let incoming = distance.incoming.nanos() as f64; + let outgoing = distance.outgoing.nanos() as f64; + let Some(coordinates) = &mut self.coordinates else { self.coordinates = Some(Coordinates { - x: incoming_distance as f64, - y: outgoing_distance as f64, + x: incoming, + y: outgoing, }); return; }; - let incoming_distance_f = incoming_distance as f64; - let outgoing_distance_f = outgoing_distance as f64; let weight = self.error_estimate; - coordinates.x = (coordinates.x + (incoming_distance_f * weight)) / 2.0; - coordinates.y = (coordinates.y + (outgoing_distance_f * weight)) / 2.0; + coordinates.x = (coordinates.x + (incoming * weight)) / 2.0; + coordinates.y = (coordinates.y + (outgoing * weight)) / 2.0; } } @@ -528,43 +559,52 @@ mod tests { #[derive(Clone)] struct LoggingMockMeasurement { - latencies: HashMap, + latencies: HashMap, probed_addresses: Arc>>, } #[async_trait] impl Measurement for LoggingMockMeasurement { - async fn measure_distance(&self, address: SocketAddr) -> eyre::Result<(i64, i64)> { + async fn measure_distance(&self, address: SocketAddr) -> eyre::Result { self.probed_addresses.lock().await.insert(address); - Ok(*self.latencies.get(&address).unwrap_or(&(0, 0))) + Ok(*self + .latencies + .get(&address) + .unwrap_or(&DistanceMeasure::default())) } } struct MockMeasurement { - latencies: HashMap, + latencies: HashMap, } #[async_trait] impl Measurement for MockMeasurement { - async fn measure_distance(&self, address: SocketAddr) -> eyre::Result<(i64, i64)> { - Ok(*self.latencies.get(&address).unwrap_or(&(0, 0))) + async fn measure_distance(&self, address: SocketAddr) -> eyre::Result { + Ok(*self + .latencies + .get(&address) + .unwrap_or(&DistanceMeasure::default())) } } #[derive(Debug)] struct FailedAddressesMock { - latencies: HashMap, + latencies: HashMap, failed_addresses: Arc>>, } #[async_trait] impl Measurement for FailedAddressesMock { - async fn measure_distance(&self, address: SocketAddr) -> eyre::Result<(i64, i64)> { + async fn measure_distance(&self, address: SocketAddr) -> eyre::Result { let failed_addresses = self.failed_addresses.lock().await; if failed_addresses.contains(&address) { Err(eyre::eyre!("Measurement timed out")) } else { - Ok(*self.latencies.get(&address).unwrap_or(&(0, 0))) + Ok(*self + .latencies + .get(&address) + .unwrap_or(&DistanceMeasure::default())) } } } @@ -591,7 +631,7 @@ mod tests { #[tokio::test] async fn coordinates_adjustment() { let mut mock_latencies = HashMap::new(); - mock_latencies.insert("127.0.0.1:8081".parse().unwrap(), (25, 25)); + mock_latencies.insert("127.0.0.1:8081".parse().unwrap(), (25, 25).into()); let phoenix = Phoenix::new(MockMeasurement { latencies: mock_latencies, }); @@ -612,9 +652,9 @@ mod tests { #[tokio::test] async fn ordered_nodes_by_latency() { let mut mock_latencies = HashMap::new(); - mock_latencies.insert("127.0.0.1:8080".parse().unwrap(), (10, 10)); - mock_latencies.insert("127.0.0.1:8081".parse().unwrap(), (50, 50)); - mock_latencies.insert("127.0.0.1:8082".parse().unwrap(), (30, 30)); + mock_latencies.insert("127.0.0.1:8080".parse().unwrap(), (10, 10).into()); + mock_latencies.insert("127.0.0.1:8081".parse().unwrap(), (50, 50).into()); + mock_latencies.insert("127.0.0.1:8082".parse().unwrap(), (30, 30).into()); let phoenix = Phoenix::new(MockMeasurement { latencies: mock_latencies, @@ -683,8 +723,8 @@ mod tests { #[tokio::test] async fn successful_measurements() { let latencies = HashMap::from([ - ("127.0.0.1:8080".parse().unwrap(), (100, 100)), - ("127.0.0.1:8081".parse().unwrap(), (200, 200)), + ("127.0.0.1:8080".parse().unwrap(), (100, 100).into()), + ("127.0.0.1:8081".parse().unwrap(), (200, 200).into()), ]); let failed_addresses = Arc::new(Mutex::new(HashSet::new())); let measurement = FailedAddressesMock { @@ -710,8 +750,8 @@ mod tests { #[tokio::test] async fn failed_measurements_excluded() { let latencies = HashMap::from([ - ("127.0.0.1:8080".parse().unwrap(), (100, 100)), - ("127.0.0.1:8081".parse().unwrap(), (200, 200)), + ("127.0.0.1:8080".parse().unwrap(), (100, 100).into()), + ("127.0.0.1:8081".parse().unwrap(), (200, 200).into()), ]); let failed_addresses = Arc::new(Mutex::new(HashSet::from(["127.0.0.1:8081" .parse() diff --git a/src/time.rs b/src/time.rs new file mode 100644 index 0000000000..ff6e046764 --- /dev/null +++ b/src/time.rs @@ -0,0 +1,74 @@ +/// A UTC timestamp +#[derive(Copy, Clone)] +pub struct UtcTimestamp { + inner: time::OffsetDateTime, +} + +impl UtcTimestamp { + #[inline] + pub fn now() -> Self { + Self { + inner: time::OffsetDateTime::now_utc(), + } + } + + /// Gets the current [Unix timestamp](https://en.wikipedia.org/wiki/Unix_time) + #[inline] + pub fn unix(self) -> i64 { + self.inner.unix_timestamp() + } + + /// Gets the current [Unix timestamp](https://en.wikipedia.org/wiki/Unix_time) in nanoseconds. + /// + /// Note we truncate to a 64-bit integer, which will be fine unless someone happens + /// to be running quilkin in a couple of hundred years + #[inline] + pub fn unix_nanos(self) -> i64 { + self.inner.unix_timestamp_nanos() as _ + } + + #[inline] + pub fn from_nanos(nanos: i64) -> Self { + Self { + inner: time::OffsetDateTime::from_unix_timestamp_nanos(nanos as _) + .expect("hello future person, apologies"), + } + } +} + +use std::fmt; + +impl fmt::Debug for UtcTimestamp { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{:?}", self.inner) + } +} + +#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)] +#[cfg_attr(test, derive(Debug))] +pub struct DurationNanos(i64); + +impl DurationNanos { + #[inline] + pub fn from_nanos(n: i64) -> Self { + Self(n) + } + + #[inline] + pub fn nanos(self) -> i64 { + self.0 + } + + #[inline] + pub fn duration(self) -> std::time::Duration { + std::time::Duration::from_nanos(self.0 as _) + } +} + +impl std::ops::Sub for UtcTimestamp { + type Output = DurationNanos; + + fn sub(self, rhs: Self) -> Self::Output { + DurationNanos(self.unix_nanos() - rhs.unix_nanos()) + } +} diff --git a/tests/qcmp.rs b/tests/qcmp.rs index 728a6cbbb6..7bac9181fd 100644 --- a/tests/qcmp.rs +++ b/tests/qcmp.rs @@ -66,25 +66,28 @@ async fn ping(port: u16) { let local_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port); let ping = Protocol::ping(); - socket.send_to(&ping.encode(), &local_addr).await.unwrap(); - let mut buf = [0; u16::MAX as usize]; + let mut ping_packet = quilkin::codec::qcmp::QcmpPacket::default(); + socket + .send_to(ping.encode(&mut ping_packet), &local_addr) + .await + .unwrap(); + let mut buf = [0; quilkin::codec::qcmp::MAX_QCMP_PACKET_LEN]; let (size, _) = tokio::time::timeout(Duration::from_secs(1), socket.recv_from(&mut buf)) .await .unwrap() .unwrap(); - let recv_time = quilkin::unix_timestamp(); + let recv_time = quilkin::time::UtcTimestamp::now(); let reply = Protocol::parse(&buf[..size]).unwrap().unwrap(); assert_eq!(ping.nonce(), reply.nonce()); - const FIFTY_MILLIS_IN_NANOS: i64 = 50_000_000; + const MAX: std::time::Duration = std::time::Duration::from_millis(50); // If it takes longer than 50 milliseconds locally, it's likely that there // is bug. let delay = reply.round_trip_delay(recv_time).unwrap(); assert!( - FIFTY_MILLIS_IN_NANOS > delay, - "Delay {}ns greater than {}ns", - delay, - FIFTY_MILLIS_IN_NANOS + MAX > delay.duration(), + "Delay {:?} greater than {MAX:?}", + delay.duration(), ); } From 38bf7a6e8988665d065a5aa27f29e6a146bec213 Mon Sep 17 00:00:00 2001 From: Jake Shadle Date: Wed, 3 Apr 2024 16:56:18 +0200 Subject: [PATCH 2/9] Fix non-linux --- src/codec/qcmp.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/codec/qcmp.rs b/src/codec/qcmp.rs index bd09b15011..7946ca4e2a 100644 --- a/src/codec/qcmp.rs +++ b/src/codec/qcmp.rs @@ -66,6 +66,15 @@ unsafe impl tokio_uring::buf::IoBuf for QcmpPacket { } } +#[cfg(not(target_os = "linux"))] +impl std::ops::Deref for QcmpPacket { + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + &self.buf[..self.len] + } +} + type Result = std::result::Result; struct PacketBuilder<'buf> { From a8dd41c0251c640d6fe9d7aa163379d40bbe9bd5 Mon Sep 17 00:00:00 2001 From: Jake Shadle Date: Thu, 4 Apr 2024 10:42:37 +0200 Subject: [PATCH 3/9] Add basic checking of distance for phoenix --- src/components/proxy.rs | 7 ++++++- src/net/phoenix.rs | 30 ++++++++++++++++++++++++++---- 2 files changed, 32 insertions(+), 5 deletions(-) diff --git a/src/components/proxy.rs b/src/components/proxy.rs index 8c18950b07..51e31bbf37 100644 --- a/src/components/proxy.rs +++ b/src/components/proxy.rs @@ -212,7 +212,12 @@ impl Proxy { .await?; crate::codec::qcmp::spawn(self.qcmp, shutdown_rx.clone()); - crate::net::phoenix::spawn(self.phoenix, config.clone(), shutdown_rx.clone())?; + crate::net::phoenix::spawn( + self.phoenix, + config.clone(), + shutdown_rx.clone(), + crate::codec::qcmp::QcmpMeasurement::new()?, + )?; for notification in worker_notifications { notification.notified().await; diff --git a/src/net/phoenix.rs b/src/net/phoenix.rs index ed167cf8f4..cbace8f6a6 100644 --- a/src/net/phoenix.rs +++ b/src/net/phoenix.rs @@ -29,11 +29,12 @@ pub fn spawn( listener: crate::net::TcpListener, config: Arc, mut shutdown_rx: crate::ShutdownRx, + measurer: crate::codec::qcmp::QcmpMeasurement, ) -> crate::Result<()> { use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Response, Server as HyperServer, StatusCode}; - let phoenix = Phoenix::new(crate::codec::qcmp::QcmpMeasurement::new()?); + let phoenix = Phoenix::new(measurer); phoenix.add_nodes_from_config(&config); let crate::config::DatacenterConfig::NonAgent { datacenters } = &config.datacenter else { @@ -791,8 +792,17 @@ mod tests { let socket = raw_socket_with_reuse(qcmp_port).unwrap(); crate::codec::qcmp::spawn(socket, rx.clone()); tokio::time::sleep(std::time::Duration::from_millis(150)).await; - super::spawn(qcmp_listener, config.clone(), rx).unwrap(); - tokio::time::sleep(std::time::Duration::from_millis(1500)).await; + super::spawn( + qcmp_listener, + config.clone(), + rx, + crate::codec::qcmp::QcmpMeasurement::with_artificial_delay( + std::time::Duration::from_millis(50), + ) + .unwrap(), + ) + .unwrap(); + tokio::time::sleep(std::time::Duration::from_millis(50)).await; let client = hyper::Client::new(); @@ -807,6 +817,18 @@ mod tests { let map = serde_json::from_slice::>(&resp).unwrap(); - assert!(dbg!(map).contains_key("ABCD")); + let coords = Coordinates { + x: std::time::Duration::from_millis(50).as_nanos() as f64 / 2.0, + y: std::time::Duration::from_millis(1).as_nanos() as f64 / 2.0, + }; + + let min = Coordinates::ORIGIN.distance_to(&coords); + let max = min * 3.0; + let distance = map["ABCD"].as_f64().unwrap(); + + assert!( + distance > min && distance < max, + "expected distance {distance} to be > {min} and < {max}", + ); } } From fe6115e257b2bda99487211aac357f1a8fb2d436 Mon Sep 17 00:00:00 2001 From: Jake Shadle Date: Thu, 4 Apr 2024 10:55:55 +0200 Subject: [PATCH 4/9] Update h2 --- Cargo.lock | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 55a26fc3ba..3434e352df 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -915,9 +915,9 @@ checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253" [[package]] name = "h2" -version = "0.3.24" +version = "0.3.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb2c4422095b67ee78da96fbb51a4cc413b3b25883c7717ff7ca1ab31022c9c9" +checksum = "81fe527a889e1532da5c525686d96d4c2e74cdd345badf8dfef9f6b39dd5f5e8" dependencies = [ "bytes", "fnv", @@ -1046,7 +1046,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2 0.5.6", + "socket2 0.4.10", "tokio", "tower-service", "tracing", From 45c2f732e41ab717c0ba9b23520352b3a41225c8 Mon Sep 17 00:00:00 2001 From: Jake Shadle Date: Thu, 4 Apr 2024 10:58:52 +0200 Subject: [PATCH 5/9] Arbitrary timeout increase to see if it fixes test --- src/net/phoenix.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/net/phoenix.rs b/src/net/phoenix.rs index cbace8f6a6..b5bce26c62 100644 --- a/src/net/phoenix.rs +++ b/src/net/phoenix.rs @@ -802,7 +802,7 @@ mod tests { .unwrap(), ) .unwrap(); - tokio::time::sleep(std::time::Duration::from_millis(50)).await; + tokio::time::sleep(std::time::Duration::from_millis(150)).await; let client = hyper::Client::new(); From dee347879f9e84e7b1db54a5eb438c241e91232e Mon Sep 17 00:00:00 2001 From: Jake Shadle Date: Thu, 4 Apr 2024 11:19:51 +0200 Subject: [PATCH 6/9] Sigh --- src/codec/qcmp.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/codec/qcmp.rs b/src/codec/qcmp.rs index 7946ca4e2a..10d46ef235 100644 --- a/src/codec/qcmp.rs +++ b/src/codec/qcmp.rs @@ -199,7 +199,7 @@ impl Measurement for QcmpMeasurement { let now = UtcTimestamp::now(); let Some(reply) = Protocol::parse(&recv[..size])? else { - return Err(eyre::eyre!("received non qcmp packet")); + return Err(eyre::eyre!("received non qcmp packet {:?}", &recv[..size])); }; reply From 2df1724de2af450b5220c5167a5173bb62b1491f Mon Sep 17 00:00:00 2001 From: Jake Shadle Date: Thu, 4 Apr 2024 11:40:28 +0200 Subject: [PATCH 7/9] So tired --- src/codec/qcmp.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/codec/qcmp.rs b/src/codec/qcmp.rs index 10d46ef235..5eeb3edca2 100644 --- a/src/codec/qcmp.rs +++ b/src/codec/qcmp.rs @@ -679,7 +679,7 @@ mod tests { let (_tx, rx) = crate::make_shutdown_channel(Default::default()); super::spawn(socket, rx); - tokio::time::sleep(std::time::Duration::from_millis(10)).await; + tokio::time::sleep(std::time::Duration::from_millis(20)).await; let delay = std::time::Duration::from_millis(50); let node = QcmpMeasurement::with_artificial_delay(delay).unwrap(); From bae9448068412ba936b78c4cb897c17cffc56818 Mon Sep 17 00:00:00 2001 From: Jake Shadle Date: Thu, 4 Apr 2024 12:17:15 +0200 Subject: [PATCH 8/9] Sanity check --- src/codec/qcmp.rs | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/src/codec/qcmp.rs b/src/codec/qcmp.rs index 5eeb3edca2..b80a51f1e4 100644 --- a/src/codec/qcmp.rs +++ b/src/codec/qcmp.rs @@ -249,6 +249,8 @@ pub fn spawn(socket: socket2::Socket, mut shutdown_rx: crate::ShutdownRx) { Protocol::ping_reply(nonce, client_timestamp, received_at) .encode(&mut output_buf); + tracing::debug!("sending ping reply {:?}", &output_buf.buf[..output_buf.len]); + output_buf = match socket.send_to(output_buf, source).await { (Ok(_), buf) => buf, (Err(error), buf) => { @@ -684,13 +686,15 @@ mod tests { let delay = std::time::Duration::from_millis(50); let node = QcmpMeasurement::with_artificial_delay(delay).unwrap(); - let dm = node.measure_distance(addr).await.unwrap(); - let total = dm.total(); + for _ in 0..3 { + let dm = node.measure_distance(addr).await.unwrap(); + let total = dm.total(); - assert!( - total > delay && total < delay * 2, - "Node1's distance is {total:?}, expected > {delay:?} and less than {:?}", - delay * 2 - ); + assert!( + total > delay && total < delay * 2, + "Node1's distance is {total:?}, expected > {delay:?} and less than {:?}", + delay * 2 + ); + } } } From b4b11ee0605224039bcd5299ebc53bf526fa60cb Mon Sep 17 00:00:00 2001 From: Jake Shadle Date: Thu, 4 Apr 2024 12:37:05 +0200 Subject: [PATCH 9/9] Don't bind to same port --- src/test.rs | 3 +-- test/src/lib.rs | 6 ++++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/test.rs b/src/test.rs index c9b25ba767..91b81a5858 100644 --- a/src/test.rs +++ b/src/test.rs @@ -298,8 +298,7 @@ impl TestHelper { let server = server.unwrap_or_else(|| { let qcmp = crate::net::raw_socket_with_reuse(0).unwrap(); - let phoenix = - crate::net::TcpListener::bind(Some(crate::net::socket_port(&qcmp))).unwrap(); + let phoenix = crate::net::TcpListener::bind(None).unwrap(); crate::components::proxy::Proxy { num_workers: std::num::NonZeroUsize::new(1).unwrap(), diff --git a/test/src/lib.rs b/test/src/lib.rs index 8a55a71dbc..41a1b405f5 100644 --- a/test/src/lib.rs +++ b/test/src/lib.rs @@ -221,6 +221,7 @@ abort_task!(AgentPail); pub struct ProxyPail { pub port: u16, pub qcmp_port: u16, + pub phoenix_port: u16, pub task: JoinHandle, pub shutdown: ShutdownTx, pub config: Arc, @@ -421,8 +422,8 @@ impl Pail { let qcmp = quilkin::net::raw_socket_with_reuse(0).expect("failed to bind qcmp socket"); let qcmp_port = quilkin::net::socket_port(&qcmp); - let phoenix = - TcpListener::bind(Some(qcmp_port)).expect("failed to bind phoenix socket"); + let phoenix = TcpListener::bind(None).expect("failed to bind phoenix socket"); + let phoenix_port = phoenix.port(); let port = quilkin::net::socket_port(&socket); @@ -506,6 +507,7 @@ impl Pail { Self::Proxy(ProxyPail { port, qcmp_port, + phoenix_port, shutdown, task, config,