diff --git a/agones/src/lib.rs b/agones/src/lib.rs
index 5b96e0a9d5..b67b0e48ca 100644
--- a/agones/src/lib.rs
+++ b/agones/src/lib.rs
@@ -676,7 +676,7 @@ pub fn gameserver_address(gs: &GameServer) -> String {
// Output the events and logs for each pod that matches this label selector.
// Useful for determining why something is failing in CI without having to run a cluster.
-// Requires quilkin::test_utils::enable_log("agones=debug"); to enable debug logging within
+// Requires quilkin::test::enable_log("agones=debug"); to enable debug logging within
// the test
pub async fn debug_pods(client: &Client, labels: String) {
debug!(labels, "🪓 Debug output for Selector");
diff --git a/agones/src/provider.rs b/agones/src/provider.rs
index 74c9d77217..58cf54a932 100644
--- a/agones/src/provider.rs
+++ b/agones/src/provider.rs
@@ -42,7 +42,7 @@ mod tests {
use quilkin::{
config::providers::k8s::agones::{Fleet, GameServer},
- test_utils::TestHelper,
+ test::TestHelper,
};
use crate::{
diff --git a/agones/src/relay.rs b/agones/src/relay.rs
index 45d6e4876d..dceecc69ca 100644
--- a/agones/src/relay.rs
+++ b/agones/src/relay.rs
@@ -42,7 +42,7 @@ mod tests {
use quilkin::{
config::providers::k8s::agones::{Fleet, GameServer},
- test_utils::TestHelper,
+ test::TestHelper,
};
use crate::{
diff --git a/agones/src/sidecar.rs b/agones/src/sidecar.rs
index 2acd6aa9e4..858a70a175 100644
--- a/agones/src/sidecar.rs
+++ b/agones/src/sidecar.rs
@@ -19,7 +19,7 @@ mod tests {
use crate::{game_server, is_gameserver_ready, quilkin_config_map, quilkin_container, Client};
use k8s_openapi::api::core::v1::{ConfigMap, ConfigMapVolumeSource, Volume};
use kube::{api::PostParams, runtime::wait::await_condition, Api, ResourceExt};
- use quilkin::{config::providers::k8s::agones::GameServer, test_utils::TestHelper};
+ use quilkin::{config::providers::k8s::agones::GameServer, test::TestHelper};
use std::time::Duration;
use tokio::time::timeout;
diff --git a/benches/throughput.rs b/benches/throughput.rs
index 066dc7ca2a..335338273e 100644
--- a/benches/throughput.rs
+++ b/benches/throughput.rs
@@ -5,7 +5,7 @@ use std::time;
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use once_cell::sync::Lazy;
-use quilkin::test_utils::AddressType;
+use quilkin::test::AddressType;
const MESSAGE_SIZE: usize = 0xffff;
const DEFAULT_MESSAGE: [u8; 0xffff] = [0xff; 0xffff];
@@ -30,13 +30,13 @@ fn run_quilkin(port: u16, endpoint: SocketAddr) {
let runtime = tokio::runtime::Runtime::new().unwrap();
let config = Arc::new(quilkin::Config::default());
config.clusters.modify(|clusters| {
- clusters.insert_default([quilkin::endpoint::Endpoint::new(endpoint.into())].into())
+ clusters.insert_default([quilkin::net::endpoint::Endpoint::new(endpoint.into())].into())
});
let proxy = quilkin::cli::Proxy {
port,
qcmp_port: runtime
- .block_on(quilkin::test_utils::available_addr(&AddressType::Random))
+ .block_on(quilkin::test::available_addr(&AddressType::Random))
.port(),
..<_>::default()
};
diff --git a/docs/src/services/proxy/filters.md b/docs/src/services/proxy/filters.md
index 5319f0e9c0..0e1e10f119 100644
--- a/docs/src/services/proxy/filters.md
+++ b/docs/src/services/proxy/filters.md
@@ -128,4 +128,4 @@ required: [ 'name' ]
[TokenRouter]: ./filters/token_router.md
[Debug]: ./filters/debug.md
[LocalRateLimit]: ./filters/local_rate_limit.md
-[`quilkin::metadata::Value`]: ../../../api/quilkin/metadata/enum.Value.html
+[`quilkin::metadata::Value`]: ../../../api/quilkin/net/endpoint/metadata/enum.Value.html
diff --git a/examples/quilkin-filter-example/src/main.rs b/examples/quilkin-filter-example/src/main.rs
index 1175c52d6c..b05e40e9f8 100644
--- a/examples/quilkin-filter-example/src/main.rs
+++ b/examples/quilkin-filter-example/src/main.rs
@@ -106,7 +106,7 @@ async fn main() -> quilkin::Result<()> {
));
config.clusters.modify(|map| {
map.insert_default(
- [quilkin::endpoint::Endpoint::new(
+ [quilkin::net::endpoint::Endpoint::new(
(std::net::Ipv4Addr::LOCALHOST, 4321).into(),
)]
.into(),
diff --git a/src/cli.rs b/src/cli.rs
index 4c23fb5dce..2ad0a7ccd2 100644
--- a/src/cli.rs
+++ b/src/cli.rs
@@ -157,7 +157,7 @@ impl Cli {
tracing::info!(
version = crate_version!(),
- commit = crate::metadata::build::GIT_COMMIT_HASH,
+ commit = crate::net::endpoint::metadata::build::GIT_COMMIT_HASH,
"Starting Quilkin"
);
@@ -289,9 +289,9 @@ mod tests {
use crate::{
config::{Filter, Providers},
- endpoint::Endpoint,
filters::{Capture, StaticFilter, TokenRouter},
- test_utils::{create_socket, AddressType, TestHelper},
+ net::endpoint::Endpoint,
+ test::{create_socket, AddressType, TestHelper},
};
#[tokio::test]
@@ -335,7 +335,7 @@ mod tests {
config.clusters.write().insert_default(
[Endpoint::with_metadata(
(std::net::Ipv4Addr::LOCALHOST, server_port).into(),
- crate::endpoint::Metadata {
+ crate::net::endpoint::Metadata {
tokens: vec!["abc".into()].into_iter().collect(),
},
)]
@@ -345,7 +345,7 @@ mod tests {
})
.unwrap();
- let relay_admin_port = crate::test_utils::available_addr(&AddressType::Random)
+ let relay_admin_port = crate::test::available_addr(&AddressType::Random)
.await
.port();
let relay = Cli {
@@ -362,7 +362,7 @@ mod tests {
log_format: LogFormats::default(),
};
- let control_plane_admin_port = crate::test_utils::available_addr(&AddressType::Random)
+ let control_plane_admin_port = crate::test::available_addr(&AddressType::Random)
.await
.port();
let control_plane = Cli {
@@ -376,7 +376,7 @@ mod tests {
sub_zone: None,
zone: None,
idle_request_interval_secs: admin::IDLE_REQUEST_INTERVAL_SECS,
- qcmp_port: crate::test_utils::available_addr(&AddressType::Random)
+ qcmp_port: crate::test::available_addr(&AddressType::Random)
.await
.port(),
provider: Some(Providers::File {
@@ -386,7 +386,7 @@ mod tests {
log_format: LogFormats::default(),
};
- let proxy_admin_port = crate::test_utils::available_addr(&AddressType::Random)
+ let proxy_admin_port = crate::test::available_addr(&AddressType::Random)
.await
.port();
let proxy = Cli {
@@ -419,7 +419,7 @@ mod tests {
config.clusters.write().insert_default(
[Endpoint::with_metadata(
(std::net::Ipv6Addr::LOCALHOST, server_port).into(),
- crate::endpoint::Metadata {
+ crate::net::endpoint::Metadata {
tokens: vec![token.clone()].into_iter().collect(),
},
)]
diff --git a/src/cli/admin.rs b/src/cli/admin.rs
index 1d743b93a4..8d037d7986 100644
--- a/src/cli/admin.rs
+++ b/src/cli/admin.rs
@@ -193,7 +193,7 @@ fn collect_metrics() -> Response
{
#[cfg(test)]
mod tests {
use super::*;
- use crate::endpoint::Endpoint;
+ use crate::net::endpoint::Endpoint;
#[tokio::test]
async fn collect_metrics() {
diff --git a/src/cli/agent.rs b/src/cli/agent.rs
index c66f1fd0df..e714790f48 100644
--- a/src/cli/agent.rs
+++ b/src/cli/agent.rs
@@ -79,7 +79,7 @@ impl Agent {
mut shutdown_rx: tokio::sync::watch::Receiver<()>,
) -> crate::Result<()> {
let locality = (self.region.is_some() || self.zone.is_some() || self.sub_zone.is_some())
- .then(|| crate::endpoint::Locality {
+ .then(|| crate::net::endpoint::Locality {
region: self.region.clone().unwrap_or_default(),
zone: self.zone.clone().unwrap_or_default(),
sub_zone: self.sub_zone.clone().unwrap_or_default(),
@@ -97,7 +97,7 @@ impl Agent {
None => return Err(eyre::eyre!("no configuration provider given")),
};
- let task = crate::xds::client::MdsClient::connect(
+ let task = crate::net::xds::client::MdsClient::connect(
String::clone(&config.id.load()),
mode.clone(),
self.relay.clone(),
@@ -112,7 +112,7 @@ impl Agent {
None
};
- crate::protocol::spawn(self.qcmp_port).await?;
+ crate::codec::qcmp::spawn(self.qcmp_port).await?;
shutdown_rx.changed().await.map_err(From::from)
}
}
diff --git a/src/cli/manage.rs b/src/cli/manage.rs
index 34e9f9703b..e17fa9b110 100644
--- a/src/cli/manage.rs
+++ b/src/cli/manage.rs
@@ -60,7 +60,7 @@ impl Manage {
mut shutdown_rx: tokio::sync::watch::Receiver<()>,
) -> crate::Result<()> {
let locality = (self.region.is_some() || self.zone.is_some() || self.sub_zone.is_some())
- .then(|| crate::endpoint::Locality {
+ .then(|| crate::net::endpoint::Locality {
region: self.region.clone().unwrap_or_default(),
zone: self.zone.clone().unwrap_or_default(),
sub_zone: self.sub_zone.clone().unwrap_or_default(),
@@ -81,7 +81,7 @@ impl Manage {
let _relay_stream = if !self.relay.is_empty() {
tracing::info!("connecting to relay server");
- let client = crate::xds::client::MdsClient::connect(
+ let client = crate::net::xds::client::MdsClient::connect(
String::clone(&config.id.load()),
mode.clone(),
self.relay.clone(),
@@ -92,7 +92,7 @@ impl Manage {
None
};
- let server_task = tokio::spawn(crate::xds::server::spawn(self.port, config))
+ let server_task = tokio::spawn(crate::net::xds::server::spawn(self.port, config))
.map_err(From::from)
.and_then(std::future::ready);
diff --git a/src/cli/proxy.rs b/src/cli/proxy.rs
index 7c9072dd93..7b3f621943 100644
--- a/src/cli/proxy.rs
+++ b/src/cli/proxy.rs
@@ -14,6 +14,8 @@
* limitations under the License.
*/
+mod sessions;
+
use std::{
net::SocketAddr,
sync::{
@@ -26,11 +28,16 @@ use std::{
use tonic::transport::Endpoint;
use super::Admin;
-use crate::{proxy::SessionPool, xds::ResourceType, Config, Result};
+use sessions::{SessionKey, SessionPool};
#[cfg(doc)]
use crate::filters::FilterFactory;
-use crate::utils::net::DualStackLocalSocket;
+
+use crate::{
+ filters::{Filter, ReadContext},
+ net::{xds::ResourceType, DualStackLocalSocket},
+ Config, Result,
+};
define_port!(7777);
@@ -44,7 +51,7 @@ pub struct Proxy {
pub management_server: Vec,
/// The remote URL or local file path to retrieve the Maxmind database.
#[clap(long, env)]
- pub mmdb: Option,
+ pub mmdb: Option,
/// The port to listen on.
#[clap(short, long, env = super::PORT_ENV_VAR, default_value_t = PORT)]
pub port: u16,
@@ -104,7 +111,7 @@ impl Proxy {
self.to
.iter()
.cloned()
- .map(crate::endpoint::Endpoint::from)
+ .map(crate::net::endpoint::Endpoint::from)
.collect(),
);
});
@@ -130,7 +137,7 @@ impl Proxy {
*lock = Some(check.clone());
}
- let client = crate::xds::AdsClient::connect(
+ let client = crate::net::xds::AdsClient::connect(
String::clone(&id),
mode.clone(),
self.management_server.clone(),
@@ -151,7 +158,7 @@ impl Proxy {
};
self.run_recv_from(&config, &sessions, shared_socket)?;
- crate::protocol::spawn(self.qcmp_port).await?;
+ crate::codec::qcmp::spawn(self.qcmp_port).await?;
tracing::info!("Quilkin is ready");
shutdown_rx
@@ -186,7 +193,7 @@ impl Proxy {
// Contains config for each worker task.
let mut workers = Vec::with_capacity(num_workers);
- workers.push(crate::proxy::DownstreamReceiveWorkerConfig {
+ workers.push(DownstreamReceiveWorkerConfig {
worker_id: 0,
socket: shared_socket,
config: config.clone(),
@@ -194,7 +201,7 @@ impl Proxy {
});
for worker_id in 1..num_workers {
- workers.push(crate::proxy::DownstreamReceiveWorkerConfig {
+ workers.push(DownstreamReceiveWorkerConfig {
worker_id,
socket: Arc::new(DualStackLocalSocket::new(self.port)?),
config: config.clone(),
@@ -212,6 +219,189 @@ impl Proxy {
}
}
+#[derive(Clone, Debug, Default)]
+pub struct RuntimeConfig {
+ pub idle_request_interval_secs: u64,
+ // RwLock as this check is conditional on the proxy using xDS.
+ pub xds_is_healthy: Arc>>>,
+}
+
+impl RuntimeConfig {
+ pub fn is_ready(&self, config: &Config) -> bool {
+ self.xds_is_healthy
+ .read()
+ .as_ref()
+ .map_or(true, |health| health.load(Ordering::SeqCst))
+ && config.clusters.read().endpoints().count() != 0
+ }
+}
+
+/// Packet received from local port
+#[derive(Debug)]
+struct DownstreamPacket {
+ asn_info: Option,
+ contents: Vec,
+ received_at: i64,
+ source: SocketAddr,
+}
+
+/// Represents the required arguments to run a worker task that
+/// processes packets received downstream.
+pub(crate) struct DownstreamReceiveWorkerConfig {
+ /// ID of the worker.
+ pub worker_id: usize,
+ /// Socket with reused port from which the worker receives packets.
+ pub socket: Arc,
+ pub config: Arc,
+ pub sessions: Arc,
+}
+
+impl DownstreamReceiveWorkerConfig {
+ pub fn spawn(self) {
+ let Self {
+ worker_id,
+ socket,
+ config,
+ sessions,
+ } = self;
+
+ tokio::spawn(async move {
+ // Initialize a buffer for the UDP packet. We use the maximum size of a UDP
+ // packet, which is the maximum value of 16 a bit integer.
+ let mut buf = vec![0; 1 << 16];
+ let mut last_received_at = None;
+ loop {
+ tracing::debug!(
+ id = worker_id,
+ port = ?socket.local_ipv6_addr().map(|addr| addr.port()),
+ "Awaiting packet"
+ );
+
+ tokio::select! {
+ result = socket.recv_from(&mut buf) => {
+ match result {
+ Ok((size, mut source)) => {
+ crate::net::to_canonical(&mut source);
+ let packet = DownstreamPacket {
+ received_at: chrono::Utc::now().timestamp_nanos_opt().unwrap(),
+ asn_info: crate::net::maxmind_db::MaxmindDb::lookup(source.ip()),
+ contents: buf[..size].to_vec(),
+ source,
+ };
+
+ if let Some(last_received_at) = last_received_at {
+ crate::metrics::packet_jitter(
+ crate::metrics::READ,
+ packet.asn_info.as_ref(),
+ )
+ .set(packet.received_at - last_received_at);
+ }
+ last_received_at = Some(packet.received_at);
+
+ Self::spawn_process_task(packet, source, worker_id, &config, &sessions)
+ }
+ Err(error) => {
+ tracing::error!(%error, "error receiving packet");
+ return;
+ }
+ }
+ }
+ }
+ }
+ });
+ }
+
+ #[inline]
+ fn spawn_process_task(
+ packet: DownstreamPacket,
+ source: std::net::SocketAddr,
+ worker_id: usize,
+ config: &Arc,
+ sessions: &Arc,
+ ) {
+ tracing::trace!(
+ id = worker_id,
+ size = packet.contents.len(),
+ source = %source,
+ contents=&*crate::codec::base64::encode(&packet.contents),
+ "received packet from downstream"
+ );
+
+ tokio::spawn({
+ let config = config.clone();
+ let sessions = sessions.clone();
+
+ async move {
+ let timer = crate::metrics::processing_time(crate::metrics::READ).start_timer();
+
+ let asn_info = packet.asn_info.clone();
+ let asn_info = asn_info.as_ref();
+ match Self::process_downstream_received_packet(packet, config, sessions).await {
+ Ok(size) => {
+ crate::metrics::packets_total(crate::metrics::READ, asn_info).inc();
+ crate::metrics::bytes_total(crate::metrics::READ, asn_info)
+ .inc_by(size as u64);
+ }
+ Err(error) => {
+ let source = error.to_string();
+ crate::metrics::errors_total(crate::metrics::READ, &source, asn_info).inc();
+ crate::metrics::packets_dropped_total(
+ crate::metrics::READ,
+ &source,
+ asn_info,
+ )
+ .inc();
+ }
+ }
+
+ timer.stop_and_record();
+ }
+ });
+ }
+
+ /// Processes a packet by running it through the filter chain.
+ async fn process_downstream_received_packet(
+ packet: DownstreamPacket,
+ config: Arc,
+ sessions: Arc,
+ ) -> Result {
+ let endpoints: Vec<_> = config.clusters.read().endpoints().collect();
+ if endpoints.is_empty() {
+ return Err(PipelineError::NoUpstreamEndpoints);
+ }
+
+ let filters = config.filters.load();
+ let mut context = ReadContext::new(endpoints, packet.source.into(), packet.contents);
+ filters.read(&mut context).await?;
+ let mut bytes_written = 0;
+
+ for endpoint in context.endpoints.iter() {
+ let session_key = SessionKey {
+ source: packet.source,
+ dest: endpoint.address.to_socket_addr().await?,
+ };
+
+ bytes_written += sessions
+ .send(session_key, packet.asn_info.clone(), &context.contents)
+ .await?;
+ }
+
+ Ok(bytes_written)
+ }
+}
+
+#[derive(thiserror::Error, Debug)]
+pub enum PipelineError {
+ #[error("No upstream endpoints available")]
+ NoUpstreamEndpoints,
+ #[error("filter {0}")]
+ Filter(#[from] crate::filters::FilterError),
+ #[error("qcmp: {0}")]
+ Qcmp(#[from] crate::codec::qcmp::Error),
+ #[error("OS level error: {0}")]
+ Io(#[from] std::io::Error),
+}
+
#[cfg(test)]
mod tests {
use super::*;
@@ -220,8 +410,8 @@ mod tests {
use crate::{
config,
- endpoint::Endpoint,
- test_utils::{available_addr, create_socket, load_test_filters, AddressType, TestHelper},
+ net::endpoint::Endpoint,
+ test::{available_addr, create_socket, load_test_filters, AddressType, TestHelper},
};
#[tokio::test]
@@ -279,7 +469,7 @@ mod tests {
let endpoint = t.open_socket_and_recv_single_packet().await;
let mut local_addr = available_addr(&AddressType::Ipv6).await;
- crate::test_utils::map_addr_to_localhost(&mut local_addr);
+ crate::test::map_addr_to_localhost(&mut local_addr);
let proxy = crate::cli::Proxy {
port: local_addr.port(),
..<_>::default()
@@ -376,7 +566,7 @@ mod tests {
});
// we'll test a single DownstreamReceiveWorkerConfig
- crate::proxy::DownstreamReceiveWorkerConfig {
+ DownstreamReceiveWorkerConfig {
worker_id: 1,
socket: socket.clone(),
config: config.clone(),
@@ -384,7 +574,7 @@ mod tests {
config,
Arc::new(
DualStackLocalSocket::new(
- crate::test_utils::available_addr(&AddressType::Random)
+ crate::test::available_addr(&AddressType::Random)
.await
.port(),
)
@@ -422,7 +612,7 @@ mod tests {
let config = Arc::new(crate::Config::default());
config.clusters.modify(|clusters| {
clusters.insert_default(
- [crate::endpoint::Endpoint::from(
+ [crate::net::endpoint::Endpoint::from(
endpoint.socket.local_ipv4_addr().unwrap(),
)]
.into(),
@@ -431,7 +621,7 @@ mod tests {
let shared_socket = Arc::new(
DualStackLocalSocket::new(
- crate::test_utils::available_addr(&AddressType::Random)
+ crate::test::available_addr(&AddressType::Random)
.await
.port(),
)
@@ -458,20 +648,3 @@ mod tests {
);
}
}
-
-#[derive(Clone, Debug, Default)]
-pub struct RuntimeConfig {
- pub idle_request_interval_secs: u64,
- // RwLock as this check is conditional on the proxy using xDS.
- pub xds_is_healthy: Arc>>>,
-}
-
-impl RuntimeConfig {
- pub fn is_ready(&self, config: &Config) -> bool {
- self.xds_is_healthy
- .read()
- .as_ref()
- .map_or(true, |health| health.load(Ordering::SeqCst))
- && config.clusters.read().endpoints().count() != 0
- }
-}
diff --git a/src/proxy/sessions.rs b/src/cli/proxy/sessions.rs
similarity index 96%
rename from src/proxy/sessions.rs
rename to src/cli/proxy/sessions.rs
index b76ea7a630..dddadf8c23 100644
--- a/src/proxy/sessions.rs
+++ b/src/cli/proxy/sessions.rs
@@ -27,15 +27,13 @@ use tokio::{
};
use crate::{
- config::Config,
- filters::Filter,
- maxmind_db::IpNetEntry,
- utils::{net::DualStackLocalSocket, Loggable},
+ config::Config, filters::Filter, net::maxmind_db::IpNetEntry, net::DualStackLocalSocket,
+ Loggable,
};
pub(crate) mod metrics;
-pub type SessionMap = crate::ttl_map::TtlMap;
+pub type SessionMap = crate::collections::ttl::TtlMap;
/// A data structure that is responsible for holding sessions, and pooling
/// sockets between them. This means that we only provide new unique sockets
@@ -116,7 +114,7 @@ impl SessionPool {
},
Ok((size, mut recv_addr)) => {
let received_at = chrono::Utc::now().timestamp_nanos_opt().unwrap();
- crate::utils::net::to_canonical(&mut recv_addr);
+ crate::net::to_canonical(&mut recv_addr);
tracing::trace!(%recv_addr, %size, "received packet");
let (downstream_addr, asn_info): (SocketAddr, Option) = {
let storage = pool.storage.read().await;
@@ -291,7 +289,7 @@ impl SessionPool {
dest: SocketAddr,
packet: &[u8],
) -> Result {
- tracing::trace!(%source, %dest, contents = %crate::utils::base64_encode(packet), "received packet from upstream");
+ tracing::trace!(%source, %dest, contents = %crate::codec::base64::encode(packet), "received packet from upstream");
let mut context =
crate::filters::WriteContext::new(source.into(), dest.into(), packet.to_vec());
@@ -299,7 +297,7 @@ impl SessionPool {
config.filters.load().write(&mut context).await?;
let packet = context.contents.as_ref();
- tracing::trace!(%source, %dest, contents = %crate::utils::base64_encode(packet), "sending packet downstream");
+ tracing::trace!(%source, %dest, contents = %crate::codec::base64::encode(packet), "sending packet downstream");
downstream_socket
.send_to(packet, &dest)
.await
@@ -488,7 +486,7 @@ impl Loggable for Error {
#[cfg(test)]
mod tests {
use super::*;
- use crate::test_utils::{available_addr, AddressType, TestHelper};
+ use crate::test::{available_addr, AddressType, TestHelper};
use std::sync::Arc;
async fn new_pool(config: impl Into