Skip to content

Commit

Permalink
Implement reamining proxy tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Jake-Shadle committed Mar 26, 2024
1 parent 0ba316c commit a213f6a
Show file tree
Hide file tree
Showing 8 changed files with 179 additions and 133 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ tokio-stream = { version = "0.1.14", features = ["net", "sync"] }
tonic = "0.10.2"
tracing.workspace = true
tracing-futures = { version = "0.2.5", features = ["futures-03"] }
tracing-subscriber = { version = "0.3.17", features = ["json", "env-filter"] }
tracing-subscriber = { workspace = true, features = ["json", "env-filter"] }
tryhard = "0.5.1"
url = { version = "2.4.1", features = ["serde"] }
uuid = { version = "1.4.1", default-features = false, features = ["v4"] }
Expand Down Expand Up @@ -186,3 +186,4 @@ tokio = { version = "1.32.0", features = [
] }
tempfile = "3.8.0"
tracing = "0.1.37"
tracing-subscriber = "0.3"
2 changes: 1 addition & 1 deletion src/codec/qcmp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl Measurement for QcmpMeasurement {
pub fn spawn(socket: socket2::Socket, mut shutdown_rx: crate::ShutdownRx) {
let port = crate::net::socket_port(&socket);

uring_spawn!(async move {
uring_spawn!(tracing::debug_span!("qcmp"), async move {
// Initialize a buffer for the UDP packet. We use the maximum size of a UDP
// packet, which is the maximum value of 16 a bit integer.
let mut input_buf = vec![0; 1 << 16];
Expand Down
7 changes: 4 additions & 3 deletions src/components/proxy.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
mod packet_router;
pub mod packet_router;
mod sessions;

use super::RunArgs;
use crate::{
net::{maxmind_db::IpNetEntry, xds::ResourceType},
pool::PoolBuffer,
};
use sessions::SessionPool;
pub use sessions::SessionPool;
use std::{
net::SocketAddr,
sync::{
Expand Down Expand Up @@ -208,7 +208,8 @@ impl Proxy {
&sessions,
upstream_receiver,
buffer_pool,
)?;
)
.await?;

crate::codec::qcmp::spawn(self.qcmp, shutdown_rx.clone());
crate::net::phoenix::spawn(self.phoenix, config.clone(), shutdown_rx.clone())?;
Expand Down
53 changes: 31 additions & 22 deletions src/components/proxy/packet_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ struct DownstreamPacket {

/// Represents the required arguments to run a worker task that
/// processes packets received downstream.
pub(crate) struct DownstreamReceiveWorkerConfig {
pub struct DownstreamReceiveWorkerConfig {
/// ID of the worker.
pub worker_id: usize,
/// Socket with reused port from which the worker receives packets.
Expand All @@ -34,7 +34,7 @@ pub(crate) struct DownstreamReceiveWorkerConfig {
}

impl DownstreamReceiveWorkerConfig {
pub fn spawn(self) -> Arc<tokio::sync::Notify> {
pub async fn spawn(self) -> eyre::Result<Arc<tokio::sync::Notify>> {
let Self {
worker_id,
upstream_receiver,
Expand All @@ -48,17 +48,22 @@ impl DownstreamReceiveWorkerConfig {
let notify = Arc::new(tokio::sync::Notify::new());
let is_ready = notify.clone();

uring_spawn!(async move {
// Initialize a buffer for the UDP packet. We use the maximum size of a UDP
// packet, which is the maximum value of 16 a bit integer.
let thread_span = tracing::debug_span!("receiver", id = worker_id).or_current();

let worker = uring_spawn!(thread_span, async move {
let mut last_received_at = None;
let socket = crate::net::DualStackLocalSocket::new(port)
.unwrap()
.make_refcnt();

tracing::trace!(port, "bound worker");
let send_socket = socket.clone();

let upstream = tracing::debug_span!("upstream").or_current();

uring_inner_spawn!(async move {
is_ready.notify_one();

loop {
tokio::select! {
result = upstream_receiver.recv() => {
Expand Down Expand Up @@ -103,12 +108,15 @@ impl DownstreamReceiveWorkerConfig {
}
}
}
});
}.instrument(upstream));

loop {
// Initialize a buffer for the UDP packet. We use the maximum size of a UDP
// packet, which is the maximum value of 16 a bit integer.
let buffer = buffer_pool.clone().alloc();

let (result, contents) = socket.recv_from(buffer).await;

match result {
Ok((_size, mut source)) => {
source.set_ip(source.ip().to_canonical());
Expand Down Expand Up @@ -146,7 +154,9 @@ impl DownstreamReceiveWorkerConfig {
}
});

notify
use eyre::WrapErr as _;
worker.await.context("failed to spawn receiver task")??;
Ok(notify)
}

#[inline]
Expand Down Expand Up @@ -237,7 +247,7 @@ impl DownstreamReceiveWorkerConfig {
/// This function also spawns the set of worker tasks responsible for consuming packets
/// off the aforementioned queue and processing them through the filter chain and session
/// pipeline.
pub(super) fn spawn_receivers(
pub async fn spawn_receivers(
config: Arc<Config>,
socket: socket2::Socket,
num_workers: usize,
Expand All @@ -249,21 +259,20 @@ pub(super) fn spawn_receivers(

let port = crate::net::socket_port(&socket);

let worker_notifications = (0..num_workers)
.map(|worker_id| {
let worker = DownstreamReceiveWorkerConfig {
worker_id,
upstream_receiver: upstream_receiver.clone(),
port,
config: config.clone(),
sessions: sessions.clone(),
error_sender: error_sender.clone(),
buffer_pool: buffer_pool.clone(),
};
let mut worker_notifications = Vec::with_capacity(num_workers);
for worker_id in 0..num_workers {
let worker = DownstreamReceiveWorkerConfig {
worker_id,
upstream_receiver: upstream_receiver.clone(),
port,
config: config.clone(),
sessions: sessions.clone(),
error_sender: error_sender.clone(),
buffer_pool: buffer_pool.clone(),
};

worker.spawn()
})
.collect();
worker_notifications.push(worker.spawn().await?);
}

tokio::spawn(async move {
let mut log_task = tokio::time::interval(std::time::Duration::from_secs(5));
Expand Down
2 changes: 1 addition & 1 deletion src/components/proxy/sessions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ impl SessionPool {

let pool = self.clone();

let initialised = uring_spawn!(async move {
let initialised = uring_spawn!(tracing::debug_span!("session pool"), async move {
let mut last_received_at = None;
let mut shutdown_rx = pool.shutdown_rx.clone();
cfg_if::cfg_if! {
Expand Down
2 changes: 1 addition & 1 deletion test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ socket2.workspace = true
tempfile.workspace = true
tokio = { workspace = true, features = ["macros"] }
tracing.workspace = true
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
tracing-subscriber = { workspace = true, features = ["env-filter"] }
10 changes: 10 additions & 0 deletions test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,16 @@ impl Sandbox {
.expect("operation timed out")
}

#[inline]
pub async fn maybe_timeout<F>(&self, ms: u64, fut: F) -> Option<F::Output>
where
F: std::future::Future,
{
tokio::time::timeout(std::time::Duration::from_millis(ms), fut)
.await
.ok()
}

/// Runs a future, expecting it to timeout instead of resolving, panics if
/// the future finishes before the timeout
#[inline]
Expand Down
Loading

0 comments on commit a213f6a

Please sign in to comment.