From 6e0adc902194155ec867a3fadd5bc984331f053b Mon Sep 17 00:00:00 2001 From: XAMPPRocky <4464295+XAMPPRocky@users.noreply.github.com> Date: Wed, 18 Oct 2023 10:55:09 +0200 Subject: [PATCH] Reorganise modules (#824) While working on adding delta xDS I was finding a lot of our module organisation to be quite messy and there was clear groupings of functionality. This commit doesn't make any functional changes it only changes the layout of the following modules. * proxy -> cli::proxy * utils -> codec::base64 (Loggable trait moved to lib.rs) * prost -> codec::prost * protocol -> codec::qcmp * ttl_map -> collections::ttl * endpoint -> net::endpoint * metadata -> net::endpoint::metadata * xds -> net::xds * test_utils -> test --- agones/src/lib.rs | 2 +- agones/src/provider.rs | 2 +- agones/src/relay.rs | 2 +- agones/src/sidecar.rs | 2 +- benches/throughput.rs | 6 +- docs/src/services/proxy/filters.md | 2 +- examples/quilkin-filter-example/src/main.rs | 2 +- src/cli.rs | 18 +- src/cli/admin.rs | 2 +- src/cli/agent.rs | 6 +- src/cli/manage.rs | 6 +- src/cli/proxy.rs | 237 +++++++++++++++++--- src/{ => cli}/proxy/sessions.rs | 22 +- src/{ => cli}/proxy/sessions/metrics.rs | 2 +- src/cli/qcmp.rs | 2 +- src/cli/relay.rs | 4 +- src/cluster.rs | 2 +- src/{extensions.rs => codec.rs} | 12 +- src/{utils.rs => codec/base64.rs} | 12 +- src/{ => codec}/prost.rs | 0 src/{protocol.rs => codec/qcmp.rs} | 2 +- src/collections.rs | 19 ++ src/{ttl_map.rs => collections/ttl.rs} | 2 +- src/config.rs | 20 +- src/config/providers.rs | 2 +- src/config/providers/k8s.rs | 2 +- src/config/providers/k8s/agones.rs | 8 +- src/config/watch/agones.rs | 2 +- src/config/watch/fs.rs | 6 +- src/filters/capture.rs | 7 +- src/filters/capture/affix.rs | 2 +- src/filters/capture/config.rs | 9 +- src/filters/capture/regex.rs | 2 +- src/filters/chain.rs | 10 +- src/filters/compress.rs | 2 +- src/filters/debug.rs | 2 +- src/filters/factory.rs | 2 +- src/filters/firewall.rs | 4 +- src/filters/load_balancer.rs | 2 +- src/filters/local_rate_limit.rs | 8 +- src/filters/match.rs | 12 +- src/filters/match/config.rs | 6 +- src/filters/read.rs | 5 +- src/filters/registry.rs | 4 +- src/filters/timestamp.rs | 10 +- src/filters/token_router.rs | 17 +- src/filters/write.rs | 2 +- src/lib.rs | 23 +- src/metrics.rs | 2 +- src/{utils => }/net.rs | 10 +- src/{ => net}/endpoint.rs | 66 +++--- src/{ => net}/endpoint/address.rs | 20 +- src/{ => net}/endpoint/locality.rs | 0 src/{ => net/endpoint}/metadata.rs | 12 +- src/{ => net/endpoint}/metadata/symbol.rs | 2 +- src/{ => net}/maxmind_db.rs | 0 src/{ => net}/xds.rs | 17 +- src/{ => net}/xds/client.rs | 6 +- src/{ => net}/xds/metrics.rs | 0 src/{ => net}/xds/resource.rs | 6 +- src/{ => net}/xds/server.rs | 23 +- src/proxy.rs | 193 ---------------- src/{test_utils.rs => test.rs} | 10 +- tests/capture.rs | 7 +- tests/compress.rs | 6 +- tests/concatenate.rs | 4 +- tests/filter_order.rs | 6 +- tests/filters.rs | 4 +- tests/firewall.rs | 4 +- tests/health.rs | 2 +- tests/load_balancer.rs | 4 +- tests/local_rate_limit.rs | 6 +- tests/match.rs | 4 +- tests/metrics.rs | 10 +- tests/no_filter.rs | 4 +- tests/qcmp.rs | 8 +- tests/token_router.rs | 7 +- 77 files changed, 487 insertions(+), 491 deletions(-) rename src/{ => cli}/proxy/sessions.rs (96%) rename src/{ => cli}/proxy/sessions/metrics.rs (96%) rename src/{extensions.rs => codec.rs} (78%) rename src/{utils.rs => codec/base64.rs} (71%) rename src/{ => codec}/prost.rs (100%) rename src/{protocol.rs => codec/qcmp.rs} (99%) create mode 100644 src/collections.rs rename src/{ttl_map.rs => collections/ttl.rs} (99%) rename src/{utils => }/net.rs (97%) rename src/{ => net}/endpoint.rs (82%) rename src/{ => net}/endpoint/address.rs (94%) rename src/{ => net}/endpoint/locality.rs (100%) rename src/{ => net/endpoint}/metadata.rs (94%) rename src/{ => net/endpoint}/metadata/symbol.rs (98%) rename src/{ => net}/maxmind_db.rs (100%) rename src/{ => net}/xds.rs (95%) rename src/{ => net}/xds/client.rs (99%) rename src/{ => net}/xds/metrics.rs (100%) rename src/{ => net}/xds/resource.rs (95%) rename src/{ => net}/xds/server.rs (96%) delete mode 100644 src/proxy.rs rename src/{test_utils.rs => test.rs} (98%) diff --git a/agones/src/lib.rs b/agones/src/lib.rs index 5b96e0a9d5..b67b0e48ca 100644 --- a/agones/src/lib.rs +++ b/agones/src/lib.rs @@ -676,7 +676,7 @@ pub fn gameserver_address(gs: &GameServer) -> String { // Output the events and logs for each pod that matches this label selector. // Useful for determining why something is failing in CI without having to run a cluster. -// Requires quilkin::test_utils::enable_log("agones=debug"); to enable debug logging within +// Requires quilkin::test::enable_log("agones=debug"); to enable debug logging within // the test pub async fn debug_pods(client: &Client, labels: String) { debug!(labels, "🪓 Debug output for Selector"); diff --git a/agones/src/provider.rs b/agones/src/provider.rs index 74c9d77217..58cf54a932 100644 --- a/agones/src/provider.rs +++ b/agones/src/provider.rs @@ -42,7 +42,7 @@ mod tests { use quilkin::{ config::providers::k8s::agones::{Fleet, GameServer}, - test_utils::TestHelper, + test::TestHelper, }; use crate::{ diff --git a/agones/src/relay.rs b/agones/src/relay.rs index 45d6e4876d..dceecc69ca 100644 --- a/agones/src/relay.rs +++ b/agones/src/relay.rs @@ -42,7 +42,7 @@ mod tests { use quilkin::{ config::providers::k8s::agones::{Fleet, GameServer}, - test_utils::TestHelper, + test::TestHelper, }; use crate::{ diff --git a/agones/src/sidecar.rs b/agones/src/sidecar.rs index 2acd6aa9e4..858a70a175 100644 --- a/agones/src/sidecar.rs +++ b/agones/src/sidecar.rs @@ -19,7 +19,7 @@ mod tests { use crate::{game_server, is_gameserver_ready, quilkin_config_map, quilkin_container, Client}; use k8s_openapi::api::core::v1::{ConfigMap, ConfigMapVolumeSource, Volume}; use kube::{api::PostParams, runtime::wait::await_condition, Api, ResourceExt}; - use quilkin::{config::providers::k8s::agones::GameServer, test_utils::TestHelper}; + use quilkin::{config::providers::k8s::agones::GameServer, test::TestHelper}; use std::time::Duration; use tokio::time::timeout; diff --git a/benches/throughput.rs b/benches/throughput.rs index 066dc7ca2a..335338273e 100644 --- a/benches/throughput.rs +++ b/benches/throughput.rs @@ -5,7 +5,7 @@ use std::time; use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; use once_cell::sync::Lazy; -use quilkin::test_utils::AddressType; +use quilkin::test::AddressType; const MESSAGE_SIZE: usize = 0xffff; const DEFAULT_MESSAGE: [u8; 0xffff] = [0xff; 0xffff]; @@ -30,13 +30,13 @@ fn run_quilkin(port: u16, endpoint: SocketAddr) { let runtime = tokio::runtime::Runtime::new().unwrap(); let config = Arc::new(quilkin::Config::default()); config.clusters.modify(|clusters| { - clusters.insert_default([quilkin::endpoint::Endpoint::new(endpoint.into())].into()) + clusters.insert_default([quilkin::net::endpoint::Endpoint::new(endpoint.into())].into()) }); let proxy = quilkin::cli::Proxy { port, qcmp_port: runtime - .block_on(quilkin::test_utils::available_addr(&AddressType::Random)) + .block_on(quilkin::test::available_addr(&AddressType::Random)) .port(), ..<_>::default() }; diff --git a/docs/src/services/proxy/filters.md b/docs/src/services/proxy/filters.md index 5319f0e9c0..0e1e10f119 100644 --- a/docs/src/services/proxy/filters.md +++ b/docs/src/services/proxy/filters.md @@ -128,4 +128,4 @@ required: [ 'name' ] [TokenRouter]: ./filters/token_router.md [Debug]: ./filters/debug.md [LocalRateLimit]: ./filters/local_rate_limit.md -[`quilkin::metadata::Value`]: ../../../api/quilkin/metadata/enum.Value.html +[`quilkin::metadata::Value`]: ../../../api/quilkin/net/endpoint/metadata/enum.Value.html diff --git a/examples/quilkin-filter-example/src/main.rs b/examples/quilkin-filter-example/src/main.rs index 1175c52d6c..b05e40e9f8 100644 --- a/examples/quilkin-filter-example/src/main.rs +++ b/examples/quilkin-filter-example/src/main.rs @@ -106,7 +106,7 @@ async fn main() -> quilkin::Result<()> { )); config.clusters.modify(|map| { map.insert_default( - [quilkin::endpoint::Endpoint::new( + [quilkin::net::endpoint::Endpoint::new( (std::net::Ipv4Addr::LOCALHOST, 4321).into(), )] .into(), diff --git a/src/cli.rs b/src/cli.rs index 4c23fb5dce..2ad0a7ccd2 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -157,7 +157,7 @@ impl Cli { tracing::info!( version = crate_version!(), - commit = crate::metadata::build::GIT_COMMIT_HASH, + commit = crate::net::endpoint::metadata::build::GIT_COMMIT_HASH, "Starting Quilkin" ); @@ -289,9 +289,9 @@ mod tests { use crate::{ config::{Filter, Providers}, - endpoint::Endpoint, filters::{Capture, StaticFilter, TokenRouter}, - test_utils::{create_socket, AddressType, TestHelper}, + net::endpoint::Endpoint, + test::{create_socket, AddressType, TestHelper}, }; #[tokio::test] @@ -335,7 +335,7 @@ mod tests { config.clusters.write().insert_default( [Endpoint::with_metadata( (std::net::Ipv4Addr::LOCALHOST, server_port).into(), - crate::endpoint::Metadata { + crate::net::endpoint::Metadata { tokens: vec!["abc".into()].into_iter().collect(), }, )] @@ -345,7 +345,7 @@ mod tests { }) .unwrap(); - let relay_admin_port = crate::test_utils::available_addr(&AddressType::Random) + let relay_admin_port = crate::test::available_addr(&AddressType::Random) .await .port(); let relay = Cli { @@ -362,7 +362,7 @@ mod tests { log_format: LogFormats::default(), }; - let control_plane_admin_port = crate::test_utils::available_addr(&AddressType::Random) + let control_plane_admin_port = crate::test::available_addr(&AddressType::Random) .await .port(); let control_plane = Cli { @@ -376,7 +376,7 @@ mod tests { sub_zone: None, zone: None, idle_request_interval_secs: admin::IDLE_REQUEST_INTERVAL_SECS, - qcmp_port: crate::test_utils::available_addr(&AddressType::Random) + qcmp_port: crate::test::available_addr(&AddressType::Random) .await .port(), provider: Some(Providers::File { @@ -386,7 +386,7 @@ mod tests { log_format: LogFormats::default(), }; - let proxy_admin_port = crate::test_utils::available_addr(&AddressType::Random) + let proxy_admin_port = crate::test::available_addr(&AddressType::Random) .await .port(); let proxy = Cli { @@ -419,7 +419,7 @@ mod tests { config.clusters.write().insert_default( [Endpoint::with_metadata( (std::net::Ipv6Addr::LOCALHOST, server_port).into(), - crate::endpoint::Metadata { + crate::net::endpoint::Metadata { tokens: vec![token.clone()].into_iter().collect(), }, )] diff --git a/src/cli/admin.rs b/src/cli/admin.rs index 1d743b93a4..8d037d7986 100644 --- a/src/cli/admin.rs +++ b/src/cli/admin.rs @@ -193,7 +193,7 @@ fn collect_metrics() -> Response { #[cfg(test)] mod tests { use super::*; - use crate::endpoint::Endpoint; + use crate::net::endpoint::Endpoint; #[tokio::test] async fn collect_metrics() { diff --git a/src/cli/agent.rs b/src/cli/agent.rs index c66f1fd0df..e714790f48 100644 --- a/src/cli/agent.rs +++ b/src/cli/agent.rs @@ -79,7 +79,7 @@ impl Agent { mut shutdown_rx: tokio::sync::watch::Receiver<()>, ) -> crate::Result<()> { let locality = (self.region.is_some() || self.zone.is_some() || self.sub_zone.is_some()) - .then(|| crate::endpoint::Locality { + .then(|| crate::net::endpoint::Locality { region: self.region.clone().unwrap_or_default(), zone: self.zone.clone().unwrap_or_default(), sub_zone: self.sub_zone.clone().unwrap_or_default(), @@ -97,7 +97,7 @@ impl Agent { None => return Err(eyre::eyre!("no configuration provider given")), }; - let task = crate::xds::client::MdsClient::connect( + let task = crate::net::xds::client::MdsClient::connect( String::clone(&config.id.load()), mode.clone(), self.relay.clone(), @@ -112,7 +112,7 @@ impl Agent { None }; - crate::protocol::spawn(self.qcmp_port).await?; + crate::codec::qcmp::spawn(self.qcmp_port).await?; shutdown_rx.changed().await.map_err(From::from) } } diff --git a/src/cli/manage.rs b/src/cli/manage.rs index 34e9f9703b..e17fa9b110 100644 --- a/src/cli/manage.rs +++ b/src/cli/manage.rs @@ -60,7 +60,7 @@ impl Manage { mut shutdown_rx: tokio::sync::watch::Receiver<()>, ) -> crate::Result<()> { let locality = (self.region.is_some() || self.zone.is_some() || self.sub_zone.is_some()) - .then(|| crate::endpoint::Locality { + .then(|| crate::net::endpoint::Locality { region: self.region.clone().unwrap_or_default(), zone: self.zone.clone().unwrap_or_default(), sub_zone: self.sub_zone.clone().unwrap_or_default(), @@ -81,7 +81,7 @@ impl Manage { let _relay_stream = if !self.relay.is_empty() { tracing::info!("connecting to relay server"); - let client = crate::xds::client::MdsClient::connect( + let client = crate::net::xds::client::MdsClient::connect( String::clone(&config.id.load()), mode.clone(), self.relay.clone(), @@ -92,7 +92,7 @@ impl Manage { None }; - let server_task = tokio::spawn(crate::xds::server::spawn(self.port, config)) + let server_task = tokio::spawn(crate::net::xds::server::spawn(self.port, config)) .map_err(From::from) .and_then(std::future::ready); diff --git a/src/cli/proxy.rs b/src/cli/proxy.rs index 7c9072dd93..7b3f621943 100644 --- a/src/cli/proxy.rs +++ b/src/cli/proxy.rs @@ -14,6 +14,8 @@ * limitations under the License. */ +mod sessions; + use std::{ net::SocketAddr, sync::{ @@ -26,11 +28,16 @@ use std::{ use tonic::transport::Endpoint; use super::Admin; -use crate::{proxy::SessionPool, xds::ResourceType, Config, Result}; +use sessions::{SessionKey, SessionPool}; #[cfg(doc)] use crate::filters::FilterFactory; -use crate::utils::net::DualStackLocalSocket; + +use crate::{ + filters::{Filter, ReadContext}, + net::{xds::ResourceType, DualStackLocalSocket}, + Config, Result, +}; define_port!(7777); @@ -44,7 +51,7 @@ pub struct Proxy { pub management_server: Vec, /// The remote URL or local file path to retrieve the Maxmind database. #[clap(long, env)] - pub mmdb: Option, + pub mmdb: Option, /// The port to listen on. #[clap(short, long, env = super::PORT_ENV_VAR, default_value_t = PORT)] pub port: u16, @@ -104,7 +111,7 @@ impl Proxy { self.to .iter() .cloned() - .map(crate::endpoint::Endpoint::from) + .map(crate::net::endpoint::Endpoint::from) .collect(), ); }); @@ -130,7 +137,7 @@ impl Proxy { *lock = Some(check.clone()); } - let client = crate::xds::AdsClient::connect( + let client = crate::net::xds::AdsClient::connect( String::clone(&id), mode.clone(), self.management_server.clone(), @@ -151,7 +158,7 @@ impl Proxy { }; self.run_recv_from(&config, &sessions, shared_socket)?; - crate::protocol::spawn(self.qcmp_port).await?; + crate::codec::qcmp::spawn(self.qcmp_port).await?; tracing::info!("Quilkin is ready"); shutdown_rx @@ -186,7 +193,7 @@ impl Proxy { // Contains config for each worker task. let mut workers = Vec::with_capacity(num_workers); - workers.push(crate::proxy::DownstreamReceiveWorkerConfig { + workers.push(DownstreamReceiveWorkerConfig { worker_id: 0, socket: shared_socket, config: config.clone(), @@ -194,7 +201,7 @@ impl Proxy { }); for worker_id in 1..num_workers { - workers.push(crate::proxy::DownstreamReceiveWorkerConfig { + workers.push(DownstreamReceiveWorkerConfig { worker_id, socket: Arc::new(DualStackLocalSocket::new(self.port)?), config: config.clone(), @@ -212,6 +219,189 @@ impl Proxy { } } +#[derive(Clone, Debug, Default)] +pub struct RuntimeConfig { + pub idle_request_interval_secs: u64, + // RwLock as this check is conditional on the proxy using xDS. + pub xds_is_healthy: Arc>>>, +} + +impl RuntimeConfig { + pub fn is_ready(&self, config: &Config) -> bool { + self.xds_is_healthy + .read() + .as_ref() + .map_or(true, |health| health.load(Ordering::SeqCst)) + && config.clusters.read().endpoints().count() != 0 + } +} + +/// Packet received from local port +#[derive(Debug)] +struct DownstreamPacket { + asn_info: Option, + contents: Vec, + received_at: i64, + source: SocketAddr, +} + +/// Represents the required arguments to run a worker task that +/// processes packets received downstream. +pub(crate) struct DownstreamReceiveWorkerConfig { + /// ID of the worker. + pub worker_id: usize, + /// Socket with reused port from which the worker receives packets. + pub socket: Arc, + pub config: Arc, + pub sessions: Arc, +} + +impl DownstreamReceiveWorkerConfig { + pub fn spawn(self) { + let Self { + worker_id, + socket, + config, + sessions, + } = self; + + tokio::spawn(async move { + // Initialize a buffer for the UDP packet. We use the maximum size of a UDP + // packet, which is the maximum value of 16 a bit integer. + let mut buf = vec![0; 1 << 16]; + let mut last_received_at = None; + loop { + tracing::debug!( + id = worker_id, + port = ?socket.local_ipv6_addr().map(|addr| addr.port()), + "Awaiting packet" + ); + + tokio::select! { + result = socket.recv_from(&mut buf) => { + match result { + Ok((size, mut source)) => { + crate::net::to_canonical(&mut source); + let packet = DownstreamPacket { + received_at: chrono::Utc::now().timestamp_nanos_opt().unwrap(), + asn_info: crate::net::maxmind_db::MaxmindDb::lookup(source.ip()), + contents: buf[..size].to_vec(), + source, + }; + + if let Some(last_received_at) = last_received_at { + crate::metrics::packet_jitter( + crate::metrics::READ, + packet.asn_info.as_ref(), + ) + .set(packet.received_at - last_received_at); + } + last_received_at = Some(packet.received_at); + + Self::spawn_process_task(packet, source, worker_id, &config, &sessions) + } + Err(error) => { + tracing::error!(%error, "error receiving packet"); + return; + } + } + } + } + } + }); + } + + #[inline] + fn spawn_process_task( + packet: DownstreamPacket, + source: std::net::SocketAddr, + worker_id: usize, + config: &Arc, + sessions: &Arc, + ) { + tracing::trace!( + id = worker_id, + size = packet.contents.len(), + source = %source, + contents=&*crate::codec::base64::encode(&packet.contents), + "received packet from downstream" + ); + + tokio::spawn({ + let config = config.clone(); + let sessions = sessions.clone(); + + async move { + let timer = crate::metrics::processing_time(crate::metrics::READ).start_timer(); + + let asn_info = packet.asn_info.clone(); + let asn_info = asn_info.as_ref(); + match Self::process_downstream_received_packet(packet, config, sessions).await { + Ok(size) => { + crate::metrics::packets_total(crate::metrics::READ, asn_info).inc(); + crate::metrics::bytes_total(crate::metrics::READ, asn_info) + .inc_by(size as u64); + } + Err(error) => { + let source = error.to_string(); + crate::metrics::errors_total(crate::metrics::READ, &source, asn_info).inc(); + crate::metrics::packets_dropped_total( + crate::metrics::READ, + &source, + asn_info, + ) + .inc(); + } + } + + timer.stop_and_record(); + } + }); + } + + /// Processes a packet by running it through the filter chain. + async fn process_downstream_received_packet( + packet: DownstreamPacket, + config: Arc, + sessions: Arc, + ) -> Result { + let endpoints: Vec<_> = config.clusters.read().endpoints().collect(); + if endpoints.is_empty() { + return Err(PipelineError::NoUpstreamEndpoints); + } + + let filters = config.filters.load(); + let mut context = ReadContext::new(endpoints, packet.source.into(), packet.contents); + filters.read(&mut context).await?; + let mut bytes_written = 0; + + for endpoint in context.endpoints.iter() { + let session_key = SessionKey { + source: packet.source, + dest: endpoint.address.to_socket_addr().await?, + }; + + bytes_written += sessions + .send(session_key, packet.asn_info.clone(), &context.contents) + .await?; + } + + Ok(bytes_written) + } +} + +#[derive(thiserror::Error, Debug)] +pub enum PipelineError { + #[error("No upstream endpoints available")] + NoUpstreamEndpoints, + #[error("filter {0}")] + Filter(#[from] crate::filters::FilterError), + #[error("qcmp: {0}")] + Qcmp(#[from] crate::codec::qcmp::Error), + #[error("OS level error: {0}")] + Io(#[from] std::io::Error), +} + #[cfg(test)] mod tests { use super::*; @@ -220,8 +410,8 @@ mod tests { use crate::{ config, - endpoint::Endpoint, - test_utils::{available_addr, create_socket, load_test_filters, AddressType, TestHelper}, + net::endpoint::Endpoint, + test::{available_addr, create_socket, load_test_filters, AddressType, TestHelper}, }; #[tokio::test] @@ -279,7 +469,7 @@ mod tests { let endpoint = t.open_socket_and_recv_single_packet().await; let mut local_addr = available_addr(&AddressType::Ipv6).await; - crate::test_utils::map_addr_to_localhost(&mut local_addr); + crate::test::map_addr_to_localhost(&mut local_addr); let proxy = crate::cli::Proxy { port: local_addr.port(), ..<_>::default() @@ -376,7 +566,7 @@ mod tests { }); // we'll test a single DownstreamReceiveWorkerConfig - crate::proxy::DownstreamReceiveWorkerConfig { + DownstreamReceiveWorkerConfig { worker_id: 1, socket: socket.clone(), config: config.clone(), @@ -384,7 +574,7 @@ mod tests { config, Arc::new( DualStackLocalSocket::new( - crate::test_utils::available_addr(&AddressType::Random) + crate::test::available_addr(&AddressType::Random) .await .port(), ) @@ -422,7 +612,7 @@ mod tests { let config = Arc::new(crate::Config::default()); config.clusters.modify(|clusters| { clusters.insert_default( - [crate::endpoint::Endpoint::from( + [crate::net::endpoint::Endpoint::from( endpoint.socket.local_ipv4_addr().unwrap(), )] .into(), @@ -431,7 +621,7 @@ mod tests { let shared_socket = Arc::new( DualStackLocalSocket::new( - crate::test_utils::available_addr(&AddressType::Random) + crate::test::available_addr(&AddressType::Random) .await .port(), ) @@ -458,20 +648,3 @@ mod tests { ); } } - -#[derive(Clone, Debug, Default)] -pub struct RuntimeConfig { - pub idle_request_interval_secs: u64, - // RwLock as this check is conditional on the proxy using xDS. - pub xds_is_healthy: Arc>>>, -} - -impl RuntimeConfig { - pub fn is_ready(&self, config: &Config) -> bool { - self.xds_is_healthy - .read() - .as_ref() - .map_or(true, |health| health.load(Ordering::SeqCst)) - && config.clusters.read().endpoints().count() != 0 - } -} diff --git a/src/proxy/sessions.rs b/src/cli/proxy/sessions.rs similarity index 96% rename from src/proxy/sessions.rs rename to src/cli/proxy/sessions.rs index b76ea7a630..dddadf8c23 100644 --- a/src/proxy/sessions.rs +++ b/src/cli/proxy/sessions.rs @@ -27,15 +27,13 @@ use tokio::{ }; use crate::{ - config::Config, - filters::Filter, - maxmind_db::IpNetEntry, - utils::{net::DualStackLocalSocket, Loggable}, + config::Config, filters::Filter, net::maxmind_db::IpNetEntry, net::DualStackLocalSocket, + Loggable, }; pub(crate) mod metrics; -pub type SessionMap = crate::ttl_map::TtlMap; +pub type SessionMap = crate::collections::ttl::TtlMap; /// A data structure that is responsible for holding sessions, and pooling /// sockets between them. This means that we only provide new unique sockets @@ -116,7 +114,7 @@ impl SessionPool { }, Ok((size, mut recv_addr)) => { let received_at = chrono::Utc::now().timestamp_nanos_opt().unwrap(); - crate::utils::net::to_canonical(&mut recv_addr); + crate::net::to_canonical(&mut recv_addr); tracing::trace!(%recv_addr, %size, "received packet"); let (downstream_addr, asn_info): (SocketAddr, Option) = { let storage = pool.storage.read().await; @@ -291,7 +289,7 @@ impl SessionPool { dest: SocketAddr, packet: &[u8], ) -> Result { - tracing::trace!(%source, %dest, contents = %crate::utils::base64_encode(packet), "received packet from upstream"); + tracing::trace!(%source, %dest, contents = %crate::codec::base64::encode(packet), "received packet from upstream"); let mut context = crate::filters::WriteContext::new(source.into(), dest.into(), packet.to_vec()); @@ -299,7 +297,7 @@ impl SessionPool { config.filters.load().write(&mut context).await?; let packet = context.contents.as_ref(); - tracing::trace!(%source, %dest, contents = %crate::utils::base64_encode(packet), "sending packet downstream"); + tracing::trace!(%source, %dest, contents = %crate::codec::base64::encode(packet), "sending packet downstream"); downstream_socket .send_to(packet, &dest) .await @@ -488,7 +486,7 @@ impl Loggable for Error { #[cfg(test)] mod tests { use super::*; - use crate::test_utils::{available_addr, AddressType, TestHelper}; + use crate::test::{available_addr, AddressType, TestHelper}; use std::sync::Arc; async fn new_pool(config: impl Into>) -> (Arc, watch::Sender<()>) { @@ -498,7 +496,7 @@ mod tests { Arc::new(config.into().unwrap_or_default()), Arc::new( DualStackLocalSocket::new( - crate::test_utils::available_addr(&AddressType::Random) + crate::test::available_addr(&AddressType::Random) .await .port(), ) @@ -654,11 +652,11 @@ mod tests { let mut t = TestHelper::default(); let dest = t.run_echo_server(&AddressType::Ipv6).await; let mut dest = dest.to_socket_addr().await.unwrap(); - crate::test_utils::map_addr_to_localhost(&mut dest); + crate::test::map_addr_to_localhost(&mut dest); let source = available_addr(&AddressType::Ipv6).await; let socket = tokio::net::UdpSocket::bind(source).await.unwrap(); let mut source = socket.local_addr().unwrap(); - crate::test_utils::map_addr_to_localhost(&mut source); + crate::test::map_addr_to_localhost(&mut source); let (pool, _sender) = new_pool(None).await; let key: SessionKey = (source, dest).into(); diff --git a/src/proxy/sessions/metrics.rs b/src/cli/proxy/sessions/metrics.rs similarity index 96% rename from src/proxy/sessions/metrics.rs rename to src/cli/proxy/sessions/metrics.rs index 3754a7d5ae..4f095ef7ad 100644 --- a/src/proxy/sessions/metrics.rs +++ b/src/cli/proxy/sessions/metrics.rs @@ -26,7 +26,7 @@ const PREFIX_ENTITY_LABEL: &str = "prefix_entity"; const PREFIX_NAME_LABEL: &str = "prefix_name"; use crate::metrics::{ASN_LABEL, PREFIX_LABEL}; -pub(crate) fn active_sessions(asn: Option<&crate::maxmind_db::IpNetEntry>) -> IntGauge { +pub(crate) fn active_sessions(asn: Option<&crate::net::maxmind_db::IpNetEntry>) -> IntGauge { static ACTIVE_SESSIONS: Lazy = Lazy::new(|| { prometheus::register_int_gauge_vec_with_registry! { Opts::new("active", "number of sessions currently active").subsystem(SUBSYSTEM), diff --git a/src/cli/qcmp.rs b/src/cli/qcmp.rs index ce4f72b746..2149702cac 100644 --- a/src/cli/qcmp.rs +++ b/src/cli/qcmp.rs @@ -15,7 +15,7 @@ */ use std::net::SocketAddr; -use crate::protocol::Protocol; +use crate::codec::qcmp::Protocol; #[derive(Clone, Debug, clap::Subcommand)] pub enum Qcmp { diff --git a/src/cli/relay.rs b/src/cli/relay.rs index e150b9bdb6..aa3189367c 100644 --- a/src/cli/relay.rs +++ b/src/cli/relay.rs @@ -63,8 +63,8 @@ impl Relay { mode: crate::cli::Admin, mut shutdown_rx: tokio::sync::watch::Receiver<()>, ) -> crate::Result<()> { - let xds_server = crate::xds::server::spawn(self.xds_port, config.clone()); - let mds_server = tokio::spawn(crate::xds::server::control_plane_discovery_server( + let xds_server = crate::net::xds::server::spawn(self.xds_port, config.clone()); + let mds_server = tokio::spawn(crate::net::xds::server::control_plane_discovery_server( self.mds_port, self.idle_request_interval_secs, config.clone(), diff --git a/src/cluster.rs b/src/cluster.rs index ae595b5eaf..f5d90ec4cb 100644 --- a/src/cluster.rs +++ b/src/cluster.rs @@ -20,7 +20,7 @@ use dashmap::DashMap; use once_cell::sync::Lazy; use serde::{Deserialize, Serialize}; -use crate::endpoint::{Endpoint, Locality}; +use crate::net::endpoint::{Endpoint, Locality}; const SUBSYSTEM: &str = "cluster"; diff --git a/src/extensions.rs b/src/codec.rs similarity index 78% rename from src/extensions.rs rename to src/codec.rs index 64fdb847c8..15e5542a82 100644 --- a/src/extensions.rs +++ b/src/codec.rs @@ -14,12 +14,8 @@ * limitations under the License. */ -use slog::Logger; +//! Implementations and utility methods for various codecs used in Quilkin. -pub(crate) use filter_chain::Error as FilterChainError; - -pub(crate) mod filter_manager; -mod filter_registry; -pub mod filters; - -mod filter_chain; +pub mod base64; +pub mod prost; +pub mod qcmp; diff --git a/src/utils.rs b/src/codec/base64.rs similarity index 71% rename from src/utils.rs rename to src/codec/base64.rs index 110a78a9f2..a32ddfe1c7 100644 --- a/src/utils.rs +++ b/src/codec/base64.rs @@ -14,20 +14,12 @@ * limitations under the License. */ -pub(crate) mod net; - use base64::Engine; -pub(crate) fn base64_encode>(bytes: A) -> String { +pub(crate) fn encode>(bytes: A) -> String { base64::engine::general_purpose::STANDARD.encode(bytes.as_ref()) } -pub(crate) fn base64_decode>(input: A) -> Result, base64::DecodeError> { +pub(crate) fn decode>(input: A) -> Result, base64::DecodeError> { base64::engine::general_purpose::STANDARD.decode(input.as_ref()) } - -/// A type which can be logged, usually error types. -pub(crate) trait Loggable { - /// Output a log. - fn log(&self); -} diff --git a/src/prost.rs b/src/codec/prost.rs similarity index 100% rename from src/prost.rs rename to src/codec/prost.rs diff --git a/src/protocol.rs b/src/codec/qcmp.rs similarity index 99% rename from src/protocol.rs rename to src/codec/qcmp.rs index 032a32138a..3eb7cc5573 100644 --- a/src/protocol.rs +++ b/src/codec/qcmp.rs @@ -19,7 +19,7 @@ use nom::bytes::complete; use tracing::Instrument; -use crate::utils::net::DualStackLocalSocket; +use crate::net::DualStackLocalSocket; // Magic number to distinguish control packets from regular traffic. const MAGIC_NUMBER: &[u8] = b"QLKN"; diff --git a/src/collections.rs b/src/collections.rs new file mode 100644 index 0000000000..3011617217 --- /dev/null +++ b/src/collections.rs @@ -0,0 +1,19 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +//! Collection types designed for use with Quilkin. + +pub mod ttl; diff --git a/src/ttl_map.rs b/src/collections/ttl.rs similarity index 99% rename from src/ttl_map.rs rename to src/collections/ttl.rs index e8d703ae3d..85805abadf 100644 --- a/src/ttl_map.rs +++ b/src/collections/ttl.rs @@ -472,7 +472,7 @@ impl Clock { #[cfg(test)] mod tests { use super::*; - use crate::endpoint::EndpointAddress; + use crate::net::endpoint::EndpointAddress; use std::net::Ipv4Addr; use tokio::time; diff --git a/src/config.rs b/src/config.rs index c5e5020c52..f357a9abb7 100644 --- a/src/config.rs +++ b/src/config.rs @@ -26,7 +26,7 @@ use uuid::Uuid; use crate::{ cluster::ClusterMap, filters::prelude::*, - xds::{ + net::xds::{ config::listener::v3::Listener, service::discovery::v3::DiscoveryResponse, Resource, ResourceType, }, @@ -73,7 +73,7 @@ impl Config { fn update_from_json( &self, map: serde_json::Map, - locality: Option, + locality: Option, ) -> Result<(), eyre::Error> { macro_rules! replace_if_present { ($($field:ident),+) => { @@ -176,7 +176,7 @@ impl Config { .endpoints .iter() .cloned() - .map(crate::endpoint::Endpoint::try_from) + .map(crate::net::endpoint::Endpoint::try_from) .collect::>()?, ); } @@ -236,11 +236,13 @@ pub struct Filter { pub config: Option, } -impl TryFrom for Filter { +impl TryFrom for Filter { type Error = CreationError; - fn try_from(filter: crate::xds::config::listener::v3::Filter) -> Result { - use crate::xds::config::listener::v3::filter::ConfigType; + fn try_from( + filter: crate::net::xds::config::listener::v3::Filter, + ) -> Result { + use crate::net::xds::config::listener::v3::filter::ConfigType; let config = if let Some(config_type) = filter.config_type { let config = match config_type { @@ -270,11 +272,11 @@ impl TryFrom for Filter { } } -impl TryFrom for crate::xds::config::listener::v3::Filter { +impl TryFrom for crate::net::xds::config::listener::v3::Filter { type Error = CreationError; fn try_from(filter: Filter) -> Result { - use crate::xds::config::listener::v3::filter::ConfigType; + use crate::net::xds::config::listener::v3::filter::ConfigType; let config = if let Some(config) = filter.config { Some( @@ -309,7 +311,7 @@ mod tests { use serde_json::json; - use crate::endpoint::{Endpoint, Metadata}; + use crate::net::endpoint::{Endpoint, Metadata}; use super::*; diff --git a/src/config/providers.rs b/src/config/providers.rs index ef543a71a1..ff67de7e8a 100644 --- a/src/config/providers.rs +++ b/src/config/providers.rs @@ -46,7 +46,7 @@ impl Providers { &self, config: std::sync::Arc, health_check: Arc, - locality: Option, + locality: Option, ) -> tokio::task::JoinHandle> { match &self { Self::Agones { diff --git a/src/config/providers/k8s.rs b/src/config/providers/k8s.rs index d2c9758ab5..08789ed128 100644 --- a/src/config/providers/k8s.rs +++ b/src/config/providers/k8s.rs @@ -8,7 +8,7 @@ use kube::runtime::watcher::Event; use agones::GameServer; -use crate::endpoint::{Endpoint, Locality}; +use crate::net::endpoint::{Endpoint, Locality}; pub fn update_filters_from_configmap( client: kube::Client, diff --git a/src/config/providers/k8s/agones.rs b/src/config/providers/k8s/agones.rs index 59e1ebd778..dac950940e 100644 --- a/src/config/providers/k8s/agones.rs +++ b/src/config/providers/k8s/agones.rs @@ -25,7 +25,7 @@ use kube::{core::Resource, CustomResource}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; -use crate::endpoint::Endpoint; +use crate::net::endpoint::Endpoint; const QUILKIN_TOKEN_LABEL: &str = "quilkin.dev/tokens"; @@ -60,8 +60,8 @@ impl GameServer { Endpoint::with_metadata( (status.address.clone(), port).into(), - crate::metadata::MetadataView::with_unknown( - crate::endpoint::Metadata { tokens }, + crate::net::endpoint::metadata::MetadataView::with_unknown( + crate::net::endpoint::Metadata { tokens }, extra_metadata, ), ) @@ -76,7 +76,7 @@ impl GameServer { value .split(',') .map(String::from) - .map(crate::utils::base64_decode) + .map(crate::codec::base64::decode) .filter_map(Result::ok) .collect() }) diff --git a/src/config/watch/agones.rs b/src/config/watch/agones.rs index 5e7e1d9bfc..239655b4c0 100644 --- a/src/config/watch/agones.rs +++ b/src/config/watch/agones.rs @@ -20,7 +20,7 @@ use std::sync::{ Arc, }; -use crate::{endpoint::Locality, Config}; +use crate::{net::endpoint::Locality, Config}; pub async fn watch( gameservers_namespace: impl AsRef, diff --git a/src/config/watch/fs.rs b/src/config/watch/fs.rs index bbf95ec3c0..ec8d479cd7 100644 --- a/src/config/watch/fs.rs +++ b/src/config/watch/fs.rs @@ -28,7 +28,7 @@ pub async fn watch( config: Arc, health_check: Arc, path: impl Into, - locality: Option, + locality: Option, ) -> crate::Result<()> { let path = path.into(); let span = tracing::info_span!("config_provider", path = %path.display()); @@ -93,9 +93,9 @@ mod tests { source.clusters.modify(|clusters| { clusters.insert_default( - [crate::endpoint::Endpoint::with_metadata( + [crate::net::endpoint::Endpoint::with_metadata( (std::net::Ipv4Addr::LOCALHOST, 4321).into(), - crate::endpoint::Metadata { + crate::net::endpoint::Metadata { tokens: <_>::from([Vec::from(*b"1x7ijy6")]), }, )] diff --git a/src/filters/capture.rs b/src/filters/capture.rs index 3c024a92a4..cd7e30211f 100644 --- a/src/filters/capture.rs +++ b/src/filters/capture.rs @@ -20,7 +20,7 @@ mod regex; crate::include_proto!("quilkin.filters.capture.v1alpha1"); -use crate::{filters::prelude::*, metadata}; +use crate::{filters::prelude::*, net::endpoint::metadata}; use self::quilkin::filters::capture::v1alpha1 as proto; @@ -91,8 +91,9 @@ struct NoValueCaptured; #[cfg(test)] mod tests { use crate::{ - endpoint::Endpoint, filters::metadata::CAPTURED_BYTES, metadata::Value, - test_utils::assert_write_no_change, + filters::metadata::CAPTURED_BYTES, + net::endpoint::{metadata::Value, Endpoint}, + test::assert_write_no_change, }; use super::*; diff --git a/src/filters/capture/affix.rs b/src/filters/capture/affix.rs index db6e7a4348..aba5fcba34 100644 --- a/src/filters/capture/affix.rs +++ b/src/filters/capture/affix.rs @@ -1,4 +1,4 @@ -use crate::metadata::Value; +use crate::net::endpoint::metadata::Value; /// Returns whether the capture size is bigger than the packet size. fn is_valid_size(contents: &[u8], size: u32) -> bool { diff --git a/src/filters/capture/config.rs b/src/filters/capture/config.rs index 7f700efb01..1f14c422ee 100644 --- a/src/filters/capture/config.rs +++ b/src/filters/capture/config.rs @@ -69,7 +69,7 @@ pub struct Config { /// The key to use when storing the captured value in the filter context. /// If a match was found it is available /// under `{{metadata_key}}/is_present`. - pub metadata_key: crate::metadata::Key, + pub metadata_key: crate::net::endpoint::metadata::Key, /// The capture strategy. pub strategy: Strategy, } @@ -165,8 +165,9 @@ impl<'de> serde::Deserialize<'de> for Config { } } - let metadata_key = metadata_key - .unwrap_or_else(|| crate::metadata::Key::from_static(CAPTURED_BYTES)); + let metadata_key = metadata_key.unwrap_or_else(|| { + crate::net::endpoint::metadata::Key::from_static(CAPTURED_BYTES) + }); let strategy = strategy.ok_or_else(|| { serde::de::Error::custom( "Capture strategy of `regex`, `suffix`, or `prefix` is required", @@ -204,7 +205,7 @@ impl TryFrom for Config { Ok(Self { metadata_key: p .metadata_key - .map(crate::metadata::Key::from) + .map(crate::net::endpoint::metadata::Key::from) .ok_or_else(|| { ConvertProtoConfigError::new("Missing", Some("metadata_key".into())) })?, diff --git a/src/filters/capture/regex.rs b/src/filters/capture/regex.rs index 64fc5ee35f..a12361f1d1 100644 --- a/src/filters/capture/regex.rs +++ b/src/filters/capture/regex.rs @@ -1,4 +1,4 @@ -use crate::metadata::Value; +use crate::net::endpoint::metadata::Value; /// Capture from the start of the packet. #[derive(serde::Serialize, serde::Deserialize, Debug, schemars::JsonSchema)] diff --git a/src/filters/chain.rs b/src/filters/chain.rs index 552cdc09a9..865e587396 100644 --- a/src/filters/chain.rs +++ b/src/filters/chain.rs @@ -190,7 +190,7 @@ impl TryFrom<&[FilterConfig]> for FilterChain { } } -impl TryFrom for crate::xds::config::listener::v3::FilterChain { +impl TryFrom for crate::net::xds::config::listener::v3::FilterChain { type Error = CreationError; fn try_from(chain: FilterChain) -> Result { @@ -198,7 +198,7 @@ impl TryFrom for crate::xds::config::listener::v3::FilterChain { } } -impl TryFrom<&'_ FilterChain> for crate::xds::config::listener::v3::FilterChain { +impl TryFrom<&'_ FilterChain> for crate::net::xds::config::listener::v3::FilterChain { type Error = CreationError; fn try_from(chain: &FilterChain) -> Result { @@ -309,9 +309,9 @@ impl Filter for FilterChain { mod tests { use crate::{ config, - endpoint::Endpoint, filters::Debug, - test_utils::{new_test_config, TestFilter}, + net::endpoint::Endpoint, + test::{new_test_config, TestFilter}, }; use super::*; @@ -349,7 +349,7 @@ mod tests { #[tokio::test] async fn chain_single_test_filter() { - crate::test_utils::load_test_filters(); + crate::test::load_test_filters(); let config = new_test_config(); let endpoints_fixture = endpoints(); let mut context = ReadContext::new( diff --git a/src/filters/compress.rs b/src/filters/compress.rs index a5950c4b9a..d9fc863877 100644 --- a/src/filters/compress.rs +++ b/src/filters/compress.rs @@ -136,7 +136,7 @@ impl StaticFilter for Compress { #[cfg(test)] mod tests { - use crate::{endpoint::Endpoint, filters::compress::compressor::Snappy}; + use crate::{filters::compress::compressor::Snappy, net::endpoint::Endpoint}; use super::*; diff --git a/src/filters/debug.rs b/src/filters/debug.rs index a9dc324f6b..321ea94e0c 100644 --- a/src/filters/debug.rs +++ b/src/filters/debug.rs @@ -93,7 +93,7 @@ impl TryFrom for Config { #[cfg(test)] mod tests { - use crate::test_utils::{assert_filter_read_no_change, assert_write_no_change}; + use crate::test::{assert_filter_read_no_change, assert_write_no_change}; use tracing_test::traced_test; use super::*; diff --git a/src/filters/factory.rs b/src/filters/factory.rs index 94f5de3b96..0f5a3fbca0 100644 --- a/src/filters/factory.rs +++ b/src/filters/factory.rs @@ -140,7 +140,7 @@ where Ok(prost_types::Any { type_url: self.name().into(), - value: crate::prost::encode::(&config.try_into()?)?, + value: crate::codec::prost::encode::(&config.try_into()?)?, }) } diff --git a/src/filters/firewall.rs b/src/filters/firewall.rs index 7698e49792..f8d9aa5bae 100644 --- a/src/filters/firewall.rs +++ b/src/filters/firewall.rs @@ -118,8 +118,8 @@ pub struct PacketDenied; mod tests { use std::net::Ipv4Addr; - use crate::endpoint::Endpoint; use crate::filters::firewall::config::PortRange; + use crate::net::endpoint::Endpoint; use tracing_test::traced_test; use super::*; @@ -166,7 +166,7 @@ mod tests { }], }; - let local_addr: crate::endpoint::EndpointAddress = (Ipv4Addr::LOCALHOST, 8081).into(); + let local_addr: crate::net::endpoint::EndpointAddress = (Ipv4Addr::LOCALHOST, 8081).into(); let mut ctx = WriteContext::new(([192, 168, 75, 20], 80).into(), local_addr.clone(), vec![]); diff --git a/src/filters/load_balancer.rs b/src/filters/load_balancer.rs index 24acf97782..728fe4cd1f 100644 --- a/src/filters/load_balancer.rs +++ b/src/filters/load_balancer.rs @@ -61,7 +61,7 @@ mod tests { use std::{collections::HashSet, net::Ipv4Addr}; use super::*; - use crate::endpoint::{Endpoint, EndpointAddress}; + use crate::net::endpoint::{Endpoint, EndpointAddress}; async fn get_response_addresses( filter: &dyn Filter, diff --git a/src/filters/local_rate_limit.rs b/src/filters/local_rate_limit.rs index 582737ae06..2ec14b4d5c 100644 --- a/src/filters/local_rate_limit.rs +++ b/src/filters/local_rate_limit.rs @@ -22,9 +22,9 @@ use std::time::Duration; use serde::{Deserialize, Serialize}; use crate::{ - endpoint::EndpointAddress, + collections::ttl::{Entry, TtlMap}, filters::prelude::*, - ttl_map::{Entry, TtlMap}, + net::endpoint::EndpointAddress, }; crate::include_proto!("quilkin.filters.local_rate_limit.v1alpha1"); @@ -207,7 +207,7 @@ mod tests { use tokio::time; use super::*; - use crate::{config::ConfigType, test_utils::assert_write_no_change}; + use crate::{config::ConfigType, test::assert_write_no_change}; fn rate_limiter(config: Config) -> LocalRateLimit { LocalRateLimit::new(config).unwrap() @@ -222,7 +222,7 @@ mod tests { /// Send a packet to the filter and assert whether or not it was processed. async fn read(r: &LocalRateLimit, address: &EndpointAddress, should_succeed: bool) { - let endpoints = vec![crate::endpoint::Endpoint::new( + let endpoints = vec![crate::net::endpoint::Endpoint::new( (Ipv4Addr::LOCALHOST, 8089).into(), )]; diff --git a/src/filters/match.rs b/src/filters/match.rs index af4810c56b..2a87961b67 100644 --- a/src/filters/match.rs +++ b/src/filters/match.rs @@ -17,7 +17,10 @@ mod config; mod metrics; -use crate::{filters::prelude::*, metadata, xds as envoy}; +use crate::{ + filters::prelude::*, + net::{endpoint::metadata, xds as envoy}, +}; use self::{metrics::Metrics, quilkin::filters::matches::v1alpha1 as proto}; @@ -164,12 +167,15 @@ impl StaticFilter for Match { mod tests { use super::*; - use crate::{endpoint::Endpoint, filters::*}; + use crate::{ + filters::*, + net::endpoint::{metadata, Endpoint}, + }; #[tokio::test] async fn metrics() { let metrics = Metrics::new(); - let key = crate::metadata::Key::from_static("myapp.com/token"); + let key = metadata::Key::from_static("myapp.com/token"); let config = Config { on_read: Some(DirectionalConfig { metadata_key: key, diff --git a/src/filters/match/config.rs b/src/filters/match/config.rs index a9a39855e2..245d6ef9a4 100644 --- a/src/filters/match/config.rs +++ b/src/filters/match/config.rs @@ -71,7 +71,7 @@ impl TryFrom for Config { pub struct DirectionalConfig { /// The key for the metadata to compare against. #[serde(rename = "metadataKey")] - pub metadata_key: crate::metadata::Key, + pub metadata_key: crate::net::endpoint::metadata::Key, /// List of filters to compare and potentially run if any match. pub branches: Vec, /// The behaviour for when none of the `branches` match. @@ -122,7 +122,7 @@ impl TryFrom for DirectionalConfig { #[derive(Debug, Deserialize, Serialize, Eq, PartialEq, schemars::JsonSchema)] pub struct Branch { /// The value to compare against the dynamic metadata. - pub value: crate::metadata::Value, + pub value: crate::net::endpoint::metadata::Value, /// The filter to run on successful matches. #[serde(flatten)] pub filter: Filter, @@ -168,7 +168,7 @@ impl Default for Fallthrough { } } -impl TryFrom for crate::xds::config::listener::v3::Filter { +impl TryFrom for crate::net::xds::config::listener::v3::Filter { type Error = crate::filters::CreationError; fn try_from(fallthrough: Fallthrough) -> Result { fallthrough.0.try_into() diff --git a/src/filters/read.rs b/src/filters/read.rs index 0f607329ab..e2f025a185 100644 --- a/src/filters/read.rs +++ b/src/filters/read.rs @@ -16,10 +16,7 @@ #[cfg(doc)] use crate::filters::Filter; -use crate::{ - endpoint::{Endpoint, EndpointAddress}, - metadata::DynamicMetadata, -}; +use crate::net::endpoint::{metadata::DynamicMetadata, Endpoint, EndpointAddress}; /// The input arguments to [`Filter::read`]. #[non_exhaustive] diff --git a/src/filters/registry.rs b/src/filters/registry.rs index f2fb7cf98a..0228838ccf 100644 --- a/src/filters/registry.rs +++ b/src/filters/registry.rs @@ -64,11 +64,11 @@ impl FilterRegistry { mod tests { use std::net::Ipv4Addr; - use crate::test_utils::load_test_filters; + use crate::test::load_test_filters; use super::*; - use crate::endpoint::{Endpoint, EndpointAddress}; use crate::filters::{Filter, FilterError, FilterRegistry, ReadContext, WriteContext}; + use crate::net::endpoint::{Endpoint, EndpointAddress}; struct TestFilter {} diff --git a/src/filters/timestamp.rs b/src/filters/timestamp.rs index e90dc70528..b1253c09c7 100644 --- a/src/filters/timestamp.rs +++ b/src/filters/timestamp.rs @@ -18,7 +18,11 @@ use std::sync::Arc; use chrono::prelude::*; -use crate::{filters::prelude::*, metadata::Value, metrics::Direction}; +use crate::{ + filters::prelude::*, + metrics::Direction, + net::endpoint::metadata::{self, Value}, +}; crate::include_proto!("quilkin.filters.timestamp.v1alpha1"); use self::quilkin::filters::timestamp::v1alpha1 as proto; @@ -33,7 +37,7 @@ pub struct Timestamp { impl Timestamp { /// Observes the duration since a timestamp stored in `metadata` and now, /// if present. - pub fn observe(&self, metadata: &crate::metadata::DynamicMetadata, direction: Direction) { + pub fn observe(&self, metadata: &metadata::DynamicMetadata, direction: Direction) { let value = metadata .get(&self.config.metadata_key) .and_then(|item| match item { @@ -125,7 +129,7 @@ impl StaticFilter for Timestamp { pub struct Config { /// The metadata key to read the UTC UNIX Timestamp from. #[serde(rename = "metadataKey")] - pub metadata_key: crate::metadata::Key, + pub metadata_key: metadata::Key, } impl Config { diff --git a/src/filters/token_router.rs b/src/filters/token_router.rs index 771b266e8a..a4a1959f3f 100644 --- a/src/filters/token_router.rs +++ b/src/filters/token_router.rs @@ -22,7 +22,7 @@ use serde::{Deserialize, Serialize}; use crate::{ filters::{metadata::CAPTURED_BYTES, prelude::*}, - metadata, + net::endpoint::metadata, }; use self::quilkin::filters::token_router::v1alpha1 as proto; @@ -56,7 +56,7 @@ impl Filter for TokenRouter { Some(metadata::Value::Bytes(token)) => { ctx.endpoints.retain(|endpoint| { if endpoint.metadata.known.tokens.contains(&**token) { - tracing::trace!(%endpoint.address, token = &*crate::utils::base64_encode(token), "Endpoint matched"); + tracing::trace!(%endpoint.address, token = &*crate::codec::base64::encode(token), "Endpoint matched"); true } else { false @@ -66,7 +66,7 @@ impl Filter for TokenRouter { if ctx.endpoints.is_empty() { Err(FilterError::new(Error::NoEndpointMatch( self.config.metadata_key, - crate::utils::base64_encode(token), + crate::codec::base64::encode(token), ))) } else { Ok(()) @@ -86,11 +86,11 @@ impl Filter for TokenRouter { #[derive(Debug, thiserror::Error)] pub enum Error { #[error("no routing token found for `{0}`")] - NoTokenFound(crate::metadata::Key), + NoTokenFound(metadata::Key), #[error("key `{0}` was found but wasn't bytes, found {1:?}")] - InvalidType(crate::metadata::Key, crate::metadata::Value), + InvalidType(metadata::Key, metadata::Value), #[error("no endpoint matched token `{1}` from `{0}`")] - NoEndpointMatch(crate::metadata::Key, String), + NoEndpointMatch(metadata::Key, String), } #[derive(Serialize, Deserialize, Debug, Eq, PartialEq, schemars::JsonSchema)] @@ -138,9 +138,8 @@ impl TryFrom for Config { #[cfg(test)] mod tests { use crate::{ - endpoint::{Endpoint, Metadata}, - metadata::Value, - test_utils::assert_write_no_change, + net::endpoint::{metadata::Value, Endpoint, Metadata}, + test::assert_write_no_change, }; use super::*; diff --git a/src/filters/write.rs b/src/filters/write.rs index 48d06800a7..d89fcd3592 100644 --- a/src/filters/write.rs +++ b/src/filters/write.rs @@ -16,7 +16,7 @@ use std::collections::HashMap; -use crate::{endpoint::EndpointAddress, metadata::DynamicMetadata}; +use crate::net::endpoint::{DynamicMetadata, EndpointAddress}; #[cfg(doc)] use crate::filters::Filter; diff --git a/src/lib.rs b/src/lib.rs index 43774c49a1..a82aa3b0bc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -16,25 +16,18 @@ #![deny(unused_must_use)] -mod maxmind_db; -mod proxy; - +pub(crate) mod collections; pub(crate) mod metrics; -pub(crate) mod prost; -pub(crate) mod ttl_map; -pub(crate) mod utils; pub mod cli; pub mod cluster; +pub mod codec; pub mod config; -pub mod endpoint; pub mod filters; -pub mod metadata; -pub mod protocol; -pub mod xds; +pub mod net; #[doc(hidden)] -pub mod test_utils; +pub mod test; pub type Result = std::result::Result; @@ -46,7 +39,13 @@ pub use self::{ pub use quilkin_macros::include_proto; -pub(crate) use self::maxmind_db::MaxmindDb; +pub(crate) use self::net::maxmind_db::MaxmindDb; + +/// A type which can be logged, usually error types. +pub(crate) trait Loggable { + /// Output a log. + fn log(&self); +} #[cfg(doctest)] mod external_doc_tests { diff --git a/src/metrics.rs b/src/metrics.rs index 909c4e2669..5bb8b59bdb 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -20,7 +20,7 @@ use prometheus::{ IntGaugeVec, Opts, Registry, DEFAULT_BUCKETS, }; -use crate::maxmind_db::IpNetEntry; +use crate::net::maxmind_db::IpNetEntry; pub use prometheus::Result; diff --git a/src/utils/net.rs b/src/net.rs similarity index 97% rename from src/utils/net.rs rename to src/net.rs index 8d41afe8da..e034a9dcdb 100644 --- a/src/utils/net.rs +++ b/src/net.rs @@ -14,6 +14,10 @@ * limitations under the License. */ +pub mod endpoint; +pub(crate) mod maxmind_db; +pub(crate) mod xds; + use std::{ io, net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}, @@ -22,6 +26,8 @@ use std::{ use socket2::{Protocol, Socket, Type}; use tokio::{net::ToSocketAddrs, net::UdpSocket}; +pub use endpoint::{Endpoint, EndpointAddress}; + /// returns a UdpSocket with address and port reuse, on Ipv6Addr::UNSPECIFIED fn socket_with_reuse(port: u16) -> std::io::Result { socket_with_reuse_and_address((Ipv6Addr::UNSPECIFIED, port).into()) @@ -120,8 +126,8 @@ mod tests { use tokio::time::timeout; - use crate::endpoint::address::AddressKind; - use crate::test_utils::{available_addr, AddressType, TestHelper}; + use crate::net::endpoint::address::AddressKind; + use crate::test::{available_addr, AddressType, TestHelper}; #[tokio::test] async fn dual_stack_socket_reusable() { diff --git a/src/endpoint.rs b/src/net/endpoint.rs similarity index 82% rename from src/endpoint.rs rename to src/net/endpoint.rs index e47e0b2298..ae7616ec3c 100644 --- a/src/endpoint.rs +++ b/src/net/endpoint.rs @@ -18,12 +18,13 @@ pub(crate) mod address; mod locality; +pub mod metadata; use serde::{Deserialize, Serialize}; -pub use self::{address::EndpointAddress, locality::Locality}; +pub use self::{address::EndpointAddress, locality::Locality, metadata::DynamicMetadata}; -pub type EndpointMetadata = crate::metadata::MetadataView; +pub type EndpointMetadata = metadata::MetadataView; /// A destination endpoint with any associated metadata. #[derive(Debug, Deserialize, Serialize, PartialEq, Clone, Eq, schemars::JsonSchema)] @@ -148,7 +149,7 @@ pub struct Metadata { pub tokens: base64_set::Set, } -impl From for crate::metadata::MetadataView { +impl From for crate::net::endpoint::metadata::MetadataView { fn from(metadata: Metadata) -> Self { Self { known: metadata, @@ -165,7 +166,7 @@ impl From for prost_types::Struct { values: metadata .tokens .into_iter() - .map(crate::utils::base64_encode) + .map(crate::codec::base64::encode) .map(prost_types::value::Kind::StringValue) .map(|k| prost_types::Value { kind: Some(k) }) .collect(), @@ -186,33 +187,32 @@ impl std::convert::TryFrom for Metadata { use prost_types::value::Kind; const TOKENS: &str = "tokens"; - let tokens = if let Some(kind) = value.fields.remove(TOKENS).and_then(|v| v.kind) { - match kind { - Kind::ListValue(list) => list - .values - .into_iter() - .filter_map(|v| v.kind) - .map(|kind| { - if let Kind::StringValue(string) = kind { - crate::utils::base64_decode(string) - .map_err(MetadataError::InvalidBase64) - } else { - Err(MetadataError::InvalidType { - key: "quilkin.dev.tokens", - expected: "base64 string", - }) - } - }) - .collect::>()?, - Kind::StringValue(string) => { - <_>::from([crate::utils::base64_decode(string) - .map_err(MetadataError::InvalidBase64)?]) + let tokens = + if let Some(kind) = value.fields.remove(TOKENS).and_then(|v| v.kind) { + match kind { + Kind::ListValue(list) => list + .values + .into_iter() + .filter_map(|v| v.kind) + .map(|kind| { + if let Kind::StringValue(string) = kind { + crate::codec::base64::decode(string) + .map_err(MetadataError::InvalidBase64) + } else { + Err(MetadataError::InvalidType { + key: "quilkin.dev.tokens", + expected: "base64 string", + }) + } + }) + .collect::>()?, + Kind::StringValue(string) => <_>::from([crate::codec::base64::decode(string) + .map_err(MetadataError::InvalidBase64)?]), + _ => return Err(MetadataError::MissingKey(TOKENS)), } - _ => return Err(MetadataError::MissingKey(TOKENS)), - } - } else { - <_>::default() - }; + } else { + <_>::default() + }; Ok(Self { tokens }) } @@ -246,7 +246,7 @@ mod base64_set { { serde::Serialize::serialize( &set.iter() - .map(crate::utils::base64_encode) + .map(crate::codec::base64::encode) .collect::>(), ser, ) @@ -265,7 +265,7 @@ mod base64_set { )) } else { set.into_iter() - .map(|string| crate::utils::base64_decode(string).map_err(D::Error::custom)) + .map(|string| crate::codec::base64::decode(string).map_err(D::Error::custom)) .collect() } } @@ -284,7 +284,7 @@ mod tests { assert_eq!( serde_json::to_value(EndpointMetadata::from(metadata)).unwrap(), serde_json::json!({ - crate::metadata::KEY: { + crate::net::endpoint::metadata::KEY: { "tokens": ["TWFu"], } }) diff --git a/src/endpoint/address.rs b/src/net/endpoint/address.rs similarity index 94% rename from src/endpoint/address.rs rename to src/net/endpoint/address.rs index 59f4d46bcd..b4fc29b424 100644 --- a/src/endpoint/address.rs +++ b/src/net/endpoint/address.rs @@ -25,7 +25,7 @@ use once_cell::sync::Lazy; use serde::{Deserialize, Serialize}; use trust_dns_resolver::{AsyncResolver, TokioAsyncResolver}; -use crate::xds::config::core::v3::{ +use crate::net::xds::config::core::v3::{ address::Address as EnvoyAddress, SocketAddress as EnvoySocketAddress, }; @@ -67,7 +67,7 @@ impl EndpointAddress { let ip = match &self.host { AddressKind::Ip(ip) => *ip, AddressKind::Name(name) => { - static CACHE: Lazy> = + static CACHE: Lazy> = Lazy::new(<_>::default); match CACHE.get(name) { @@ -214,7 +214,7 @@ impl From<(AddressKind, u16)> for EndpointAddress { impl From for EnvoySocketAddress { fn from(address: EndpointAddress) -> Self { - use crate::xds::config::core::v3::socket_address::{PortSpecifier, Protocol}; + use crate::net::xds::config::core::v3::socket_address::{PortSpecifier, Protocol}; Self { protocol: Protocol::Udp as i32, @@ -229,7 +229,7 @@ impl TryFrom for EndpointAddress { type Error = eyre::Error; fn try_from(value: EnvoySocketAddress) -> Result { - use crate::xds::config::core::v3::socket_address::PortSpecifier; + use crate::net::xds::config::core::v3::socket_address::PortSpecifier; let address = Self { host: value.address.parse()?, @@ -246,7 +246,7 @@ impl TryFrom for EndpointAddress { } } -impl From for crate::xds::config::core::v3::Address { +impl From for crate::net::xds::config::core::v3::Address { fn from(address: EndpointAddress) -> Self { Self { address: Some(address.into()), @@ -271,10 +271,10 @@ impl TryFrom for EndpointAddress { } } -impl TryFrom for EndpointAddress { +impl TryFrom for EndpointAddress { type Error = eyre::Error; - fn try_from(value: crate::xds::config::core::v3::Address) -> Result { + fn try_from(value: crate::net::xds::config::core::v3::Address) -> Result { match value.address { Some(address) => Self::try_from(address), _ => Err(eyre::eyre!("No address found")), @@ -282,10 +282,12 @@ impl TryFrom for EndpointAddress { } } -impl TryFrom for EndpointAddress { +impl TryFrom for EndpointAddress { type Error = eyre::Error; - fn try_from(value: crate::xds::config::endpoint::v3::Endpoint) -> Result { + fn try_from( + value: crate::net::xds::config::endpoint::v3::Endpoint, + ) -> Result { match value.address { Some(address) => Self::try_from(address), _ => Err(eyre::eyre!("Missing address in endpoint")), diff --git a/src/endpoint/locality.rs b/src/net/endpoint/locality.rs similarity index 100% rename from src/endpoint/locality.rs rename to src/net/endpoint/locality.rs diff --git a/src/metadata.rs b/src/net/endpoint/metadata.rs similarity index 94% rename from src/metadata.rs rename to src/net/endpoint/metadata.rs index d89795fef0..3c0329b257 100644 --- a/src/metadata.rs +++ b/src/net/endpoint/metadata.rs @@ -77,7 +77,7 @@ impl std::fmt::Display for Value { Self::Bool(value) => value.fmt(f), Self::Number(value) => value.fmt(f), Self::String(value) => value.fmt(f), - Self::Bytes(value) => crate::utils::base64_encode(value).fmt(f), + Self::Bytes(value) => crate::codec::base64::encode(value).fmt(f), Self::List(values) => { write!(f, "[")?; let mut first = true; @@ -246,14 +246,14 @@ impl + Default> From> for prost_typ let mut prost_struct = prost_types::Struct::default(); prost_struct.fields.insert( String::from("quilkin.dev"), - crate::prost::value_from_struct(metadata.known.into()), + crate::codec::prost::value_from_struct(metadata.known.into()), ); prost_struct.fields.extend( metadata .unknown .into_iter() - .map(|(k, v)| (k, crate::prost::from_json(v))), + .map(|(k, v)| (k, crate::codec::prost::from_json(v))), ); prost_struct @@ -267,14 +267,14 @@ impl + Default + Clone> From<&'_ MetadataView> let mut prost_struct = prost_types::Struct::default(); prost_struct.fields.insert( String::from("quilkin.dev"), - crate::prost::value_from_struct(metadata.known.clone().into()), + crate::codec::prost::value_from_struct(metadata.known.clone().into()), ); prost_struct.fields.extend( metadata .unknown .iter() - .map(|(k, v)| (k.clone(), crate::prost::from_json(v.clone()))), + .map(|(k, v)| (k.clone(), crate::codec::prost::from_json(v.clone()))), ); prost_struct @@ -305,7 +305,7 @@ where Ok(Self { known, - unknown: crate::prost::mapping_from_kind(value).unwrap_or_default(), + unknown: crate::codec::prost::mapping_from_kind(value).unwrap_or_default(), }) } } diff --git a/src/metadata/symbol.rs b/src/net/endpoint/metadata/symbol.rs similarity index 98% rename from src/metadata/symbol.rs rename to src/net/endpoint/metadata/symbol.rs index f11c97a4e5..d610896f86 100644 --- a/src/metadata/symbol.rs +++ b/src/net/endpoint/metadata/symbol.rs @@ -151,7 +151,7 @@ impl Symbol { match self.resolve(metadata) { Some(Value::Number(value)) => Some(Vec::from(value.to_be_bytes()).into()), Some(Value::Bytes(bytes)) => Some(bytes.clone()), - Some(Value::String(string)) => Some(crate::utils::base64_decode(string).ok()?.into()), + Some(Value::String(string)) => Some(crate::codec::base64::decode(string).ok()?.into()), _ => None, } } diff --git a/src/maxmind_db.rs b/src/net/maxmind_db.rs similarity index 100% rename from src/maxmind_db.rs rename to src/net/maxmind_db.rs diff --git a/src/xds.rs b/src/net/xds.rs similarity index 95% rename from src/xds.rs rename to src/net/xds.rs index 7d86d7c6ab..4070795ed4 100644 --- a/src/xds.rs +++ b/src/net/xds.rs @@ -99,7 +99,7 @@ mod xds { resource_names: response .resources .into_iter() - .map(crate::xds::Resource::try_from) + .map(crate::net::xds::Resource::try_from) .map(|result| result.map(|resource| resource.name().to_owned())) .collect::, _>>()?, type_url: response.type_url, @@ -149,17 +149,17 @@ mod tests { use std::sync::Arc; - use crate::test_utils::AddressType; - use crate::{config::Config, endpoint::Endpoint, filters::*}; + use crate::test::AddressType; + use crate::{config::Config, filters::*, net::endpoint::Endpoint}; #[tokio::test] async fn token_routing() { - let mut helper = crate::test_utils::TestHelper::default(); + let mut helper = crate::test::TestHelper::default(); let token = "mytoken"; let address = { let mut addr = Endpoint::new(helper.run_echo_server(&AddressType::Ipv6).await); addr.metadata.known.tokens.insert(token.into()); - crate::test_utils::map_to_localhost(&mut addr.address).await; + crate::test::map_to_localhost(&mut addr.address).await; addr }; let clusters = crate::cluster::ClusterMap::default(); @@ -168,7 +168,7 @@ mod tests { clusters.insert_default([address].into()); tracing::debug!(?clusters); - let xds_port = crate::test_utils::available_addr(&AddressType::Random) + let xds_port = crate::test::available_addr(&AddressType::Random) .await .port(); let xds_config: Arc = serde_json::from_value(serde_json::json!({ @@ -179,7 +179,7 @@ mod tests { .map(Arc::new) .unwrap(); - let client_addr = crate::test_utils::available_addr(&AddressType::Random).await; + let client_addr = crate::test::available_addr(&AddressType::Random).await; let client_config = serde_json::from_value(serde_json::json!({ "version": "v1alpha1", "id": "test-proxy", @@ -314,7 +314,8 @@ mod tests { let concat_bytes = vec![("b", "c,"), ("d", "e")]; for (b1, b2) in concat_bytes.into_iter() { let socket = std::net::UdpSocket::bind((std::net::Ipv6Addr::LOCALHOST, 0)).unwrap(); - let local_addr: crate::endpoint::EndpointAddress = socket.local_addr().unwrap().into(); + let local_addr: crate::net::endpoint::EndpointAddress = + socket.local_addr().unwrap().into(); config.clusters.modify(|clusters| { let mut cluster = clusters.default_entry(); diff --git a/src/xds/client.rs b/src/net/xds/client.rs similarity index 99% rename from src/xds/client.rs rename to src/net/xds/client.rs index dac9b745e8..8ece02d62e 100644 --- a/src/xds/client.rs +++ b/src/net/xds/client.rs @@ -29,7 +29,7 @@ use tryhard::{ use crate::{ cli::Admin, config::Config, - xds::{ + net::xds::{ config::core::v3::Node, relay::aggregated_control_plane_discovery_service_client::AggregatedControlPlaneDiscoveryServiceClient, service::discovery::v3::{ @@ -396,7 +396,7 @@ impl MdsStream { tracing::trace!("starting relay client stream task"); loop { let initial_response = DiscoveryResponse { - control_plane: Some(crate::xds::config::core::v3::ControlPlane { + control_plane: Some(crate::net::xds::config::core::v3::ControlPlane { identifier: (&*identifier).into(), }), ..<_>::default() @@ -585,7 +585,7 @@ pub fn handle_discovery_responses( let mut request = DiscoveryRequest::try_from(response)?; if let Err(error) = result { super::metrics::nacks(&control_plane_identifier, &request.type_url).inc(); - request.error_detail = Some(crate::xds::google::rpc::Status { + request.error_detail = Some(crate::net::xds::google::rpc::Status { code: 3, message: error.to_string(), ..<_>::default() diff --git a/src/xds/metrics.rs b/src/net/xds/metrics.rs similarity index 100% rename from src/xds/metrics.rs rename to src/net/xds/metrics.rs diff --git a/src/xds/resource.rs b/src/net/xds/resource.rs similarity index 95% rename from src/xds/resource.rs rename to src/net/xds/resource.rs index 235dd4bc46..4f07bd07fd 100644 --- a/src/xds/resource.rs +++ b/src/net/xds/resource.rs @@ -16,7 +16,7 @@ use prost::Message; -use crate::xds::config::listener::v3::Listener; +use crate::net::xds::config::listener::v3::Listener; pub type ResourceMap = enum_map::EnumMap; @@ -49,7 +49,7 @@ impl Resource { Self::Cluster(cluster) => cluster .locality .clone() - .map(|locality| crate::endpoint::Locality::from(locality).to_string()) + .map(|locality| crate::net::endpoint::Locality::from(locality).to_string()) .unwrap_or_default(), Self::Listener(listener) => listener.name.to_string(), } @@ -102,7 +102,7 @@ impl ResourceType { ) -> Result { Ok(prost_types::Any { type_url: self.type_url().into(), - value: crate::prost::encode(message)?, + value: crate::codec::prost::encode(message)?, }) } } diff --git a/src/xds/server.rs b/src/net/xds/server.rs similarity index 96% rename from src/xds/server.rs rename to src/net/xds/server.rs index 0e3e595f94..2ab77b3bf3 100644 --- a/src/xds/server.rs +++ b/src/net/xds/server.rs @@ -23,7 +23,7 @@ use tracing_futures::Instrument; use crate::{ config::Config, - xds::{ + net::xds::{ metrics, relay::aggregated_control_plane_discovery_service_server::{ AggregatedControlPlaneDiscoveryService, AggregatedControlPlaneDiscoveryServiceServer, @@ -74,7 +74,7 @@ pub(crate) fn control_plane_discovery_server( pub struct ControlPlane { config: Arc, idle_request_interval_secs: u64, - watchers: Arc>, + watchers: Arc>, } struct Watchers { @@ -95,11 +95,6 @@ impl Default for Watchers { } impl ControlPlane { - /// Creates a new server for managing [`Config`]. - pub fn new(config: Config, idle_request_interval_secs: u64) -> Self { - Self::from_arc(Arc::new(config), idle_request_interval_secs) - } - pub fn from_arc(config: Arc, idle_request_interval_secs: u64) -> Self { let this = Self { config, @@ -159,7 +154,7 @@ impl ControlPlane { .version .load(std::sync::atomic::Ordering::Relaxed) .to_string(); - response.control_plane = Some(crate::xds::config::core::v3::ControlPlane { + response.control_plane = Some(crate::net::xds::config::core::v3::ControlPlane { identifier: (*self.config.id.load()).clone(), }); response.nonce = nonce.to_string(); @@ -328,10 +323,10 @@ impl AggregatedControlPlaneDiscoveryService for ControlPlane { Arc::from(&*identifier), move |(mut requests, _rx), _subscribed_resources| async move { tracing::info!(%identifier, "sending initial discovery request"); - crate::xds::client::MdsStream::discovery_request_without_cache( + crate::net::xds::client::MdsStream::discovery_request_without_cache( &identifier, &mut requests, - crate::xds::ResourceType::Cluster, + crate::net::xds::ResourceType::Cluster, &[], ) .map_err(|error| tonic::Status::internal(error.to_string()))?; @@ -353,10 +348,10 @@ impl AggregatedControlPlaneDiscoveryService for ControlPlane { requests.send(ack?)?; } else { tracing::debug!("exceeded idle interval, sending request"); - crate::xds::client::MdsStream::discovery_request_without_cache( + crate::net::xds::client::MdsStream::discovery_request_without_cache( &identifier, &mut requests, - crate::xds::ResourceType::Cluster, + crate::net::xds::ResourceType::Cluster, &[], ) .map_err(|error| tonic::Status::internal(error.to_string()))?; @@ -381,7 +376,7 @@ mod tests { use tokio::time::timeout; use super::*; - use crate::xds::{ + use crate::net::xds::{ config::{ core::v3::Node, listener::v3::{FilterChain, Listener}, @@ -408,7 +403,7 @@ mod tests { version_info: String::new(), resources: vec![prost_types::Any { type_url: LISTENER_TYPE.type_url().into(), - value: crate::prost::encode(&Listener { + value: crate::codec::prost::encode(&Listener { filter_chains: vec![FilterChain { filters: vec![], ..<_>::default() diff --git a/src/proxy.rs b/src/proxy.rs deleted file mode 100644 index 73edd51362..0000000000 --- a/src/proxy.rs +++ /dev/null @@ -1,193 +0,0 @@ -/* - * Copyright 2020 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -use std::{net::SocketAddr, sync::Arc}; - -pub use sessions::{Session, SessionKey, SessionPool}; - -use crate::{ - filters::{Filter, ReadContext}, - utils::net::DualStackLocalSocket, - Config, -}; - -mod sessions; - -/// Packet received from local port -#[derive(Debug)] -struct DownstreamPacket { - asn_info: Option, - contents: Vec, - received_at: i64, - source: SocketAddr, -} - -/// Represents the required arguments to run a worker task that -/// processes packets received downstream. -pub(crate) struct DownstreamReceiveWorkerConfig { - /// ID of the worker. - pub worker_id: usize, - /// Socket with reused port from which the worker receives packets. - pub socket: Arc, - pub config: Arc, - pub sessions: Arc, -} - -impl DownstreamReceiveWorkerConfig { - pub fn spawn(self) { - let Self { - worker_id, - socket, - config, - sessions, - } = self; - - tokio::spawn(async move { - // Initialize a buffer for the UDP packet. We use the maximum size of a UDP - // packet, which is the maximum value of 16 a bit integer. - let mut buf = vec![0; 1 << 16]; - let mut last_received_at = None; - loop { - tracing::debug!( - id = worker_id, - port = ?socket.local_ipv6_addr().map(|addr| addr.port()), - "Awaiting packet" - ); - - tokio::select! { - result = socket.recv_from(&mut buf) => { - match result { - Ok((size, mut source)) => { - crate::utils::net::to_canonical(&mut source); - let packet = DownstreamPacket { - received_at: chrono::Utc::now().timestamp_nanos_opt().unwrap(), - asn_info: crate::maxmind_db::MaxmindDb::lookup(source.ip()), - contents: buf[..size].to_vec(), - source, - }; - - if let Some(last_received_at) = last_received_at { - crate::metrics::packet_jitter( - crate::metrics::READ, - packet.asn_info.as_ref(), - ) - .set(packet.received_at - last_received_at); - } - last_received_at = Some(packet.received_at); - - Self::spawn_process_task(packet, source, worker_id, &config, &sessions) - } - Err(error) => { - tracing::error!(%error, "error receiving packet"); - return; - } - } - } - } - } - }); - } - - #[inline] - fn spawn_process_task( - packet: DownstreamPacket, - source: std::net::SocketAddr, - worker_id: usize, - config: &Arc, - sessions: &Arc, - ) { - tracing::trace!( - id = worker_id, - size = packet.contents.len(), - source = %source, - contents=&*crate::utils::base64_encode(&packet.contents), - "received packet from downstream" - ); - - tokio::spawn({ - let config = config.clone(); - let sessions = sessions.clone(); - - async move { - let timer = crate::metrics::processing_time(crate::metrics::READ).start_timer(); - - let asn_info = packet.asn_info.clone(); - let asn_info = asn_info.as_ref(); - match Self::process_downstream_received_packet(packet, config, sessions).await { - Ok(size) => { - crate::metrics::packets_total(crate::metrics::READ, asn_info).inc(); - crate::metrics::bytes_total(crate::metrics::READ, asn_info) - .inc_by(size as u64); - } - Err(error) => { - let source = error.to_string(); - crate::metrics::errors_total(crate::metrics::READ, &source, asn_info).inc(); - crate::metrics::packets_dropped_total( - crate::metrics::READ, - &source, - asn_info, - ) - .inc(); - } - } - - timer.stop_and_record(); - } - }); - } - - /// Processes a packet by running it through the filter chain. - async fn process_downstream_received_packet( - packet: DownstreamPacket, - config: Arc, - sessions: Arc, - ) -> Result { - let endpoints: Vec<_> = config.clusters.read().endpoints().collect(); - if endpoints.is_empty() { - return Err(PipelineError::NoUpstreamEndpoints); - } - - let filters = config.filters.load(); - let mut context = ReadContext::new(endpoints, packet.source.into(), packet.contents); - filters.read(&mut context).await?; - let mut bytes_written = 0; - - for endpoint in context.endpoints.iter() { - let session_key = SessionKey { - source: packet.source, - dest: endpoint.address.to_socket_addr().await?, - }; - - bytes_written += sessions - .send(session_key, packet.asn_info.clone(), &context.contents) - .await?; - } - - Ok(bytes_written) - } -} - -#[derive(thiserror::Error, Debug)] -pub enum PipelineError { - #[error("No upstream endpoints available")] - NoUpstreamEndpoints, - #[error("filter {0}")] - Filter(#[from] crate::filters::FilterError), - #[error("qcmp: {0}")] - Qcmp(#[from] crate::protocol::Error), - #[error("OS level error: {0}")] - Io(#[from] std::io::Error), -} diff --git a/src/test_utils.rs b/src/test.rs similarity index 98% rename from src/test_utils.rs rename to src/test.rs index 633e5c2360..a15eb66311 100644 --- a/src/test_utils.rs +++ b/src/test.rs @@ -23,10 +23,10 @@ use tracing_subscriber::EnvFilter; use crate::{ config::Config, - endpoint::{Endpoint, EndpointAddress}, filters::{prelude::*, FilterRegistry}, - metadata::Value, - utils::net::DualStackLocalSocket, + net::endpoint::metadata::Value, + net::endpoint::{Endpoint, EndpointAddress}, + net::DualStackLocalSocket, }; static LOG_ONCE: Once = Once::new(); @@ -240,7 +240,7 @@ impl TestHelper { let socket = create_socket().await; // sometimes give ipv6, sometimes ipv4. let mut addr = get_address(address_type, &socket); - crate::test_utils::map_addr_to_localhost(&mut addr); + crate::test::map_addr_to_localhost(&mut addr); let mut shutdown = self.get_shutdown_subscriber().await; let local_addr = addr; tokio::spawn(async move { @@ -398,7 +398,7 @@ mod tests { use tokio::time::timeout; - use crate::test_utils::{AddressType, TestHelper}; + use crate::test::{AddressType, TestHelper}; #[tokio::test] async fn test_echo_server() { diff --git a/tests/capture.rs b/tests/capture.rs index 626166df7d..05cb655e64 100644 --- a/tests/capture.rs +++ b/tests/capture.rs @@ -20,10 +20,9 @@ use tokio::time::{timeout, Duration}; use quilkin::{ config::Filter, - endpoint::Endpoint, filters::{Capture, StaticFilter, TokenRouter}, - metadata::MetadataView, - test_utils::{AddressType, TestHelper}, + net::endpoint::{metadata::MetadataView, Endpoint}, + test::{AddressType, TestHelper}, }; /// This test covers both token_router and capture filters, @@ -32,7 +31,7 @@ use quilkin::{ async fn token_router() { let mut t = TestHelper::default(); let mut echo = t.run_echo_server(&AddressType::Random).await; - quilkin::test_utils::map_to_localhost(&mut echo).await; + quilkin::test::map_to_localhost(&mut echo).await; let server_port = 12348; let server_proxy = quilkin::cli::Proxy { port: server_port, diff --git a/tests/compress.rs b/tests/compress.rs index 6c9d05da62..abc3c7f441 100644 --- a/tests/compress.rs +++ b/tests/compress.rs @@ -18,9 +18,9 @@ use tokio::time::{timeout, Duration}; use quilkin::{ config::Filter, - endpoint::Endpoint, filters::{Compress, StaticFilter}, - test_utils::{available_addr, AddressType, TestHelper}, + net::endpoint::Endpoint, + test::{available_addr, AddressType, TestHelper}, }; #[tokio::test] @@ -30,7 +30,7 @@ async fn client_and_server() { // create server configuration as let mut server_addr = available_addr(&AddressType::Random).await; - quilkin::test_utils::map_addr_to_localhost(&mut server_addr); + quilkin::test::map_addr_to_localhost(&mut server_addr); let yaml = " on_read: DECOMPRESS on_write: COMPRESS diff --git a/tests/concatenate.rs b/tests/concatenate.rs index a74aca866b..9e4e4e0769 100644 --- a/tests/concatenate.rs +++ b/tests/concatenate.rs @@ -20,9 +20,9 @@ use tokio::time::{timeout, Duration}; use quilkin::{ config::Filter, - endpoint::Endpoint, filters::{Concatenate, StaticFilter}, - test_utils::{AddressType, TestHelper}, + net::endpoint::Endpoint, + test::{AddressType, TestHelper}, }; #[tokio::test] diff --git a/tests/filter_order.rs b/tests/filter_order.rs index 58401267b1..cbd9a2a2b0 100644 --- a/tests/filter_order.rs +++ b/tests/filter_order.rs @@ -20,9 +20,9 @@ use tokio::time::{timeout, Duration}; use quilkin::{ config::Filter, - endpoint::Endpoint, filters::{Compress, Concatenate, StaticFilter}, - test_utils::{AddressType, TestHelper}, + net::endpoint::Endpoint, + test::{AddressType, TestHelper}, }; #[tokio::test] @@ -53,7 +53,7 @@ on_write: DECOMPRESS }) .await; - quilkin::test_utils::map_to_localhost(&mut echo).await; + quilkin::test::map_to_localhost(&mut echo).await; let server_port = 12346; let server_proxy = quilkin::cli::Proxy { port: server_port, diff --git a/tests/filters.rs b/tests/filters.rs index a57f317a92..ff95efa63b 100644 --- a/tests/filters.rs +++ b/tests/filters.rs @@ -22,9 +22,9 @@ use tokio::time::timeout; use quilkin::{ config::Filter, - endpoint::Endpoint, filters::{Debug, StaticFilter}, - test_utils::{load_test_filters, AddressType, TestHelper}, + net::endpoint::Endpoint, + test::{load_test_filters, AddressType, TestHelper}, }; #[tokio::test] diff --git a/tests/firewall.rs b/tests/firewall.rs index 10ac11f882..a8a6a79d8b 100644 --- a/tests/firewall.rs +++ b/tests/firewall.rs @@ -23,9 +23,9 @@ use tokio::{ use quilkin::{ config::Filter, - endpoint::Endpoint, filters::{Firewall, StaticFilter}, - test_utils::{available_addr, AddressType, TestHelper}, + net::endpoint::Endpoint, + test::{available_addr, AddressType, TestHelper}, }; #[tokio::test] diff --git a/tests/health.rs b/tests/health.rs index 83ceb1af52..62f57a1262 100644 --- a/tests/health.rs +++ b/tests/health.rs @@ -17,7 +17,7 @@ use std::panic; use hyper::{Client, Uri}; -use quilkin::{endpoint::Endpoint, test_utils::TestHelper}; +use quilkin::{net::endpoint::Endpoint, test::TestHelper}; const LIVE_ADDRESS: &str = "http://localhost:9093/live"; diff --git a/tests/load_balancer.rs b/tests/load_balancer.rs index ca2b4aa6c1..1c0724a865 100644 --- a/tests/load_balancer.rs +++ b/tests/load_balancer.rs @@ -21,9 +21,9 @@ use tokio::time::timeout; use quilkin::{ config::Filter, - endpoint::Endpoint, filters::{LoadBalancer, StaticFilter}, - test_utils::{AddressType, TestHelper}, + net::endpoint::Endpoint, + test::{AddressType, TestHelper}, }; #[tokio::test] diff --git a/tests/local_rate_limit.rs b/tests/local_rate_limit.rs index 55068c5ba3..60beaeed62 100644 --- a/tests/local_rate_limit.rs +++ b/tests/local_rate_limit.rs @@ -20,9 +20,9 @@ use tokio::time::timeout; use quilkin::{ config::Filter, - endpoint::Endpoint, filters::{LocalRateLimit, StaticFilter}, - test_utils::{available_addr, AddressType, TestHelper}, + net::endpoint::Endpoint, + test::{available_addr, AddressType, TestHelper}, }; #[tokio::test] @@ -36,7 +36,7 @@ period: 1 let echo = t.run_echo_server(&AddressType::Random).await; let mut server_addr = available_addr(&AddressType::Random).await; - quilkin::test_utils::map_addr_to_localhost(&mut server_addr); + quilkin::test::map_addr_to_localhost(&mut server_addr); let server_proxy = quilkin::cli::Proxy { port: server_addr.port(), ..<_>::default() diff --git a/tests/match.rs b/tests/match.rs index 265920c654..477e062683 100644 --- a/tests/match.rs +++ b/tests/match.rs @@ -20,9 +20,9 @@ use tokio::time::{timeout, Duration}; use quilkin::{ config::Filter, - endpoint::Endpoint, filters::{Capture, Match, StaticFilter}, - test_utils::{AddressType, TestHelper}, + net::endpoint::Endpoint, + test::{AddressType, TestHelper}, }; #[tokio::test] diff --git a/tests/metrics.rs b/tests/metrics.rs index 708b44f921..aa2d7436b7 100644 --- a/tests/metrics.rs +++ b/tests/metrics.rs @@ -17,8 +17,8 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use quilkin::{ - endpoint::Endpoint, - test_utils::{AddressType, TestHelper}, + net::endpoint::Endpoint, + test::{AddressType, TestHelper}, }; #[tokio::test] @@ -27,13 +27,13 @@ async fn metrics_server() { // create an echo server as an endpoint. let echo = t.run_echo_server(&AddressType::Random).await; - let metrics_port = quilkin::test_utils::available_addr(&AddressType::Random) + let metrics_port = quilkin::test::available_addr(&AddressType::Random) .await .port(); // create server configuration - let mut server_addr = quilkin::test_utils::available_addr(&AddressType::Random).await; - quilkin::test_utils::map_addr_to_localhost(&mut server_addr); + let mut server_addr = quilkin::test::available_addr(&AddressType::Random).await; + quilkin::test::map_addr_to_localhost(&mut server_addr); let server_proxy = quilkin::cli::Proxy { port: server_addr.port(), ..<_>::default() diff --git a/tests/no_filter.rs b/tests/no_filter.rs index a034d7a5bf..54c5e3f8bf 100644 --- a/tests/no_filter.rs +++ b/tests/no_filter.rs @@ -18,8 +18,8 @@ use tokio::time::timeout; use tokio::time::Duration; use quilkin::{ - endpoint::Endpoint, - test_utils::{available_addr, AddressType, TestHelper}, + net::endpoint::Endpoint, + test::{available_addr, AddressType, TestHelper}, }; #[tokio::test] diff --git a/tests/qcmp.rs b/tests/qcmp.rs index 82b2a49caa..596a0e7ab8 100644 --- a/tests/qcmp.rs +++ b/tests/qcmp.rs @@ -19,14 +19,14 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use tokio::time::Duration; use quilkin::{ - protocol::Protocol, - test_utils::{AddressType, TestHelper}, + codec::qcmp::Protocol, + test::{AddressType, TestHelper}, }; #[tokio::test] async fn proxy_ping() { let mut t = TestHelper::default(); - let server_port = quilkin::test_utils::available_addr(&AddressType::Random) + let server_port = quilkin::test::available_addr(&AddressType::Random) .await .port(); let server_proxy = quilkin::cli::Proxy { @@ -41,7 +41,7 @@ async fn proxy_ping() { #[tokio::test] async fn agent_ping() { - let qcmp_port = quilkin::test_utils::available_addr(&AddressType::Random) + let qcmp_port = quilkin::test::available_addr(&AddressType::Random) .await .port(); let agent = quilkin::cli::Agent { diff --git a/tests/token_router.rs b/tests/token_router.rs index 92d03a2502..2d5dac8626 100644 --- a/tests/token_router.rs +++ b/tests/token_router.rs @@ -20,10 +20,9 @@ use tokio::time::{timeout, Duration}; use quilkin::{ config::Filter, - endpoint::Endpoint, filters::{Capture, StaticFilter, TokenRouter}, - metadata::MetadataView, - test_utils::{AddressType, TestHelper}, + net::endpoint::{metadata::MetadataView, Endpoint}, + test::{AddressType, TestHelper}, }; /// This test covers both token_router and capture filters, @@ -32,7 +31,7 @@ use quilkin::{ async fn token_router() { let mut t = TestHelper::default(); let mut echo = t.run_echo_server(&AddressType::Ipv6).await; - quilkin::test_utils::map_to_localhost(&mut echo).await; + quilkin::test::map_to_localhost(&mut echo).await; let capture_yaml = " suffix: