Skip to content

Commit

Permalink
perf: Remove async channels for pushing buffers to be sent (#1043)
Browse files Browse the repository at this point in the history
* Remove async channels for pushing buffers to be sent

* Fix tests

* Don't fail for no tests nextest-rs/nextest#1648
  • Loading branch information
Jake-Shadle authored Nov 28, 2024
1 parent 114e7e6 commit 632ec6a
Show file tree
Hide file tree
Showing 14 changed files with 423 additions and 531 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ jobs:
run: curl -LsSf https://get.nexte.st/latest/linux | tar zxf - -C ${CARGO_HOME:-~/.cargo}/bin
- name: Build
run: cargo build -p qt -p quilkin -p quilkin-xds --tests
- run: cargo nextest run -p qt -p quilkin -p quilkin-xds quilkin
- run: cargo nextest run --no-tests=pass -p qt -p quilkin -p quilkin-xds quilkin

build:
name: Build
Expand Down
14 changes: 0 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,12 @@ quilkin-proto.workspace = true

# Crates.io
arc-swap.workspace = true
async-channel.workspace = true
async-stream.workspace = true
base64.workspace = true
base64-serde = "0.8.0"
bytes = { version = "1.8.0", features = ["serde"] }
cached.workspace = true
cfg-if = "1.0"
crossbeam-utils = { version = "0.8", optional = true }
clap = { version = "4.5.21", features = ["cargo", "derive", "env"] }
dashmap = { version = "6.1", features = ["serde"] }
Expand Down Expand Up @@ -153,7 +153,6 @@ hickory-resolver = { version = "0.24", features = [
async-trait = "0.1.83"
strum = "0.26"
strum_macros = "0.26"
cfg-if = "1.0.0"
libflate = "2.1.0"
form_urlencoded = "1.2.1"
enum_dispatch = "0.3.13"
Expand Down Expand Up @@ -194,7 +193,6 @@ edition = "2021"

[workspace.dependencies]
arc-swap = { version = "1.7.1", features = ["serde"] }
async-channel = "2.3.1"
async-stream = "0.3.6"
base64 = "0.22.1"
cached = { version = "0.54", default-features = false }
Expand Down
1 change: 0 additions & 1 deletion crates/test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ publish = false
workspace = true

[dependencies]
async-channel.workspace = true
once_cell.workspace = true
quilkin.workspace = true
rand.workspace = true
Expand Down
47 changes: 19 additions & 28 deletions crates/test/tests/proxy.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use qt::*;
use quilkin::test::TestConfig;
use quilkin::{components::proxy, test::TestConfig};
use tracing::Instrument as _;

trace_test!(server, {
Expand Down Expand Up @@ -87,8 +87,7 @@ trace_test!(uring_receiver, {

let (mut packet_rx, endpoint) = sb.server("server");

let (error_sender, mut error_receiver) =
tokio::sync::mpsc::channel::<quilkin::components::proxy::ErrorMap>(20);
let (error_sender, mut error_receiver) = tokio::sync::mpsc::channel::<proxy::ErrorMap>(20);

tokio::task::spawn(
async move {
Expand All @@ -105,37 +104,32 @@ trace_test!(uring_receiver, {
config
.clusters
.modify(|clusters| clusters.insert_default([endpoint.into()].into()));
let (tx, rx) = async_channel::unbounded();
let (_shutdown_tx, shutdown_rx) =
quilkin::make_shutdown_channel(quilkin::ShutdownKind::Testing);

let socket = sb.client();
let (ws, addr) = sb.socket();

let pending_sends = proxy::PendingSends::new(1).unwrap();

// we'll test a single DownstreamReceiveWorkerConfig
let ready = quilkin::components::proxy::packet_router::DownstreamReceiveWorkerConfig {
proxy::packet_router::DownstreamReceiveWorkerConfig {
worker_id: 1,
port: addr.port(),
upstream_receiver: rx.clone(),
config: config.clone(),
error_sender,
buffer_pool: quilkin::test::BUFFER_POOL.clone(),
sessions: quilkin::components::proxy::SessionPool::new(
sessions: proxy::SessionPool::new(
config,
tx,
vec![pending_sends.0.clone()],
BUFFER_POOL.clone(),
shutdown_rx.clone(),
),
}
.spawn(shutdown_rx)
.spawn(pending_sends)
.await
.expect("failed to spawn task");

// Drop the socket, otherwise it can
drop(ws);

ready.recv().unwrap();

let msg = "hello-downstream";
tracing::debug!("sending packet");
socket.send_to(msg.as_bytes(), addr).await.unwrap();
Expand All @@ -158,36 +152,33 @@ trace_test!(
.clusters
.modify(|clusters| clusters.insert_default([endpoint.into()].into()));

let (tx, rx) = async_channel::unbounded();
let (_shutdown_tx, shutdown_rx) =
quilkin::make_shutdown_channel(quilkin::ShutdownKind::Testing);
let pending_sends: Vec<_> = [
proxy::PendingSends::new(1).unwrap(),
proxy::PendingSends::new(1).unwrap(),
proxy::PendingSends::new(1).unwrap(),
]
.into_iter()
.collect();

let sessions = quilkin::components::proxy::SessionPool::new(
let sessions = proxy::SessionPool::new(
config.clone(),
tx,
pending_sends.iter().map(|ps| ps.0.clone()).collect(),
BUFFER_POOL.clone(),
shutdown_rx.clone(),
);

const WORKER_COUNT: usize = 3;

let (socket, addr) = sb.socket();
let workers = quilkin::components::proxy::packet_router::spawn_receivers(
proxy::packet_router::spawn_receivers(
config,
socket,
WORKER_COUNT,
pending_sends,
&sessions,
rx,
BUFFER_POOL.clone(),
shutdown_rx,
)
.await
.unwrap();

for wn in workers {
wn.recv().unwrap();
}

let socket = std::sync::Arc::new(sb.client());
let msg = "recv-from";

Expand Down
9 changes: 9 additions & 0 deletions src/collections/ttl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,13 @@ impl<V> Value<V> {
/// Get the expiration time for this value. The returned value is the
/// number of seconds relative to some reference point (e.g UNIX_EPOCH), based
/// on the clock being used.
#[inline]
fn expiration_secs(&self) -> u64 {
self.expires_at.load(Ordering::Relaxed)
}

/// Update the value's expiration time to (now + TTL).
#[inline]
fn update_expiration(&self, ttl: Duration) {
match self.clock.compute_expiration_secs(ttl) {
Ok(new_expiration_time) => {
Expand Down Expand Up @@ -160,6 +162,7 @@ where
/// Returns the current time as the number of seconds relative to some initial
/// reference point (e.g UNIX_EPOCH), based on the clock implementation being used.
/// In tests, this will be driven by [`tokio::time`]
#[inline]
pub(crate) fn now_relative_secs(&self) -> u64 {
self.0.clock.now_relative_secs().unwrap_or_default()
}
Expand Down Expand Up @@ -237,6 +240,12 @@ where
self.0.inner.remove(&key).is_some()
}

/// Removes all entries from the map
#[inline]
pub fn clear(&self) {
self.0.inner.clear();
}

/// Returns an entry for in-place updates of the specified key-value pair.
/// Note: This acquires a write lock on the map's shard that corresponds
/// to the entry.
Expand Down
Loading

0 comments on commit 632ec6a

Please sign in to comment.