diff --git a/build/Makefile b/build/Makefile index 566031faea..cfa805c860 100644 --- a/build/Makefile +++ b/build/Makefile @@ -38,7 +38,7 @@ endif # Set this value if you want to use an external registry REPOSITORY ?= "" IMAGE_TAG ?= ${REPOSITORY}quilkin:$(package_version) -PREV_IMAGE_TAG ?= us-docker.pkg.dev/quilkin/release/quilkin:0.9.0 +PREV_IMAGE_TAG ?= us-docker.pkg.dev/quilkin/release/quilkin:0.8.0 MINIKUBE_PROFILE ?= quilkin CARGO_TARGET_DIR ?= /workspace/target/build-image common_rust_args := -v $(project_path):/workspace -w /workspace \ diff --git a/crates/agones/src/provider.rs b/crates/agones/src/provider.rs index 42fa4f1a9e..151f1e24b5 100644 --- a/crates/agones/src/provider.rs +++ b/crates/agones/src/provider.rs @@ -52,6 +52,7 @@ mod tests { }; const PROXY_DEPLOYMENT: &str = "quilkin-xds-proxies"; + const SLOW: Duration = Duration::from_secs(60); #[tokio::test] #[serial] @@ -94,7 +95,7 @@ mod tests { .await .unwrap(); - let response = timeout(Duration::from_secs(30), rx.recv()) + let response = timeout(SLOW, rx.recv()) .await .expect("should receive packet from GameServer") .unwrap(); @@ -102,7 +103,7 @@ mod tests { // Proxy Deployment should be ready, since there is now an endpoint if timeout( - Duration::from_secs(30), + SLOW, await_condition(deployments.clone(), PROXY_DEPLOYMENT, is_deployment_ready()), ) .await diff --git a/crates/agones/src/relay.rs b/crates/agones/src/relay.rs index 47a839d668..b8c3626d0f 100644 --- a/crates/agones/src/relay.rs +++ b/crates/agones/src/relay.rs @@ -122,6 +122,8 @@ mod tests { .is_err() { debug_pods(&client, format!("role={relay_proxy_name}")).await; + debug_pods(&client, "role=relay".into()).await; + debug_pods(&client, "role=agent".into()).await; panic!("Quilkin proxy deployment should be ready"); } @@ -170,7 +172,8 @@ mod tests { } if !failed { debug_pods(&client, format!("role={relay_proxy_name}")).await; - debug_pods(&client, "role=xds".into()).await; + debug_pods(&client, "role=relay".into()).await; + debug_pods(&client, "role=agent".into()).await; } assert!(failed, "Packet should have failed"); diff --git a/crates/xds/src/client.rs b/crates/xds/src/client.rs index 6a5295de4e..ef7172dfb6 100644 --- a/crates/xds/src/client.rs +++ b/crates/xds/src/client.rs @@ -409,18 +409,17 @@ impl Drop for DeltaSubscription { impl AdsClient { /// Attempts to start a new delta stream to the xDS management server, if the /// management server does not support delta xDS we return the client as an error + #[allow(clippy::type_complexity)] pub async fn delta_subscribe( self, config: Arc, is_healthy: Arc, notifier: Option>, - resources: impl IntoIterator)>, + resources: &'static [(&'static str, &'static [(&'static str, Vec)])], ) -> Result { - let resource_subscriptions: Vec<_> = resources.into_iter().collect(); - let identifier = String::from(&*self.identifier); - let (mut ds, stream) = match DeltaClientStream::connect( + let (mut ds, mut stream) = match DeltaClientStream::connect( self.client.clone(), identifier.clone(), ) @@ -433,6 +432,49 @@ impl AdsClient { } }; + async fn handle_first_response( + stream: &mut tonic::Streaming, + resources: &'static [(&'static str, &'static [(&'static str, Vec)])], + ) -> eyre::Result<&'static [(&'static str, Vec)]> { + let resource_subscriptions = if let Some(first) = stream.message().await? { + let mut rsubs = None; + if first.type_url == "ignore-me" { + if !first.system_version_info.is_empty() { + rsubs = resources.iter().find_map(|(vers, subs)| { + (*vers == first.system_version_info).then_some(subs) + }); + } + } else { + tracing::warn!("expected `ignore-me` response from management server"); + } + + if let Some(subs) = rsubs { + subs + } else { + let Some(subs) = resources + .iter() + .find_map(|(vers, subs)| vers.is_empty().then_some(subs)) + else { + eyre::bail!("failed to find fallback resource subscription set"); + }; + + subs + } + } else { + eyre::bail!("expected at least one response from the management server"); + }; + + Ok(resource_subscriptions) + } + + let resource_subscriptions = match handle_first_response(&mut stream, resources).await { + Ok(rs) => rs, + Err(error) => { + tracing::error!(%error, "failed to acquire matching resource subscriptions based on response from management sever"); + return Err(self); + } + }; + // Send requests for our resource subscriptions, in this first request we // won't have any resources, but if we reconnect to management servers in // the future we'll send the resources we already have locally to hopefully @@ -454,6 +496,7 @@ impl AdsClient { async move { tracing::trace!("starting xDS delta stream task"); let mut stream = stream; + let mut resource_subscriptions = resource_subscriptions; loop { tracing::trace!("creating discovery response handler"); @@ -504,6 +547,9 @@ impl AdsClient { (ds, stream) = DeltaClientStream::connect(new_client, identifier.clone()).await?; + + resource_subscriptions = handle_first_response(&mut stream, resources).await?; + ds.refresh(&identifier, resource_subscriptions.to_vec(), &local) .await?; } diff --git a/crates/xds/src/config.rs b/crates/xds/src/config.rs index fb67e8d474..51c30bff21 100644 --- a/crates/xds/src/config.rs +++ b/crates/xds/src/config.rs @@ -195,7 +195,10 @@ pub trait Configuration: Send + Sync + Sized + 'static { subscribed: crate::server::ControlPlane, ) -> impl std::future::Future + Send + 'static; - fn interested_resources(&self) -> impl Iterator)>; + fn interested_resources( + &self, + server_version: &str, + ) -> impl Iterator)>; } pub struct DeltaDiscoveryRes { diff --git a/crates/xds/src/server.rs b/crates/xds/src/server.rs index eefc74ad20..8a4b125655 100644 --- a/crates/xds/src/server.rs +++ b/crates/xds/src/server.rs @@ -34,6 +34,8 @@ use crate::{ net::TcpListener, }; +const VERSION_INFO: &str = "9"; + pub struct ControlPlane { pub config: Arc, pub idle_request_interval: Duration, @@ -218,8 +220,7 @@ impl ControlPlane { control_plane: Some(control_plane_id.clone()), type_url: type_url.into(), removed_resources, - // Only used for debugging, not really useful - system_version_info: String::new(), + system_version_info: VERSION_INFO.into(), }; tracing::trace!( @@ -248,14 +249,22 @@ impl ControlPlane { control_plane: None, type_url: message.type_url, removed_resources: Vec::new(), - // Only used for debugging, not really useful - system_version_info: String::new(), + system_version_info: VERSION_INFO.into(), } } else { tracing::debug!(client = %node_id, resource_type = %message.type_url, "initial delta response"); let type_url = message.type_url.clone(); - responder(Some(message), &type_url, &mut client_tracker)?.unwrap() + responder(Some(message), &type_url, &mut client_tracker)?.unwrap_or( + DeltaDiscoveryResponse { + resources: Vec::new(), + nonce: String::new(), + control_plane: None, + type_url, + removed_resources: Vec::new(), + system_version_info: VERSION_INFO.into(), + }, + ) } }; @@ -406,17 +415,25 @@ impl AggregatedControlPlaneDiscoveryService for tracing::info!("control plane discovery delta stream attempt"); let mut responses = responses.into_inner(); - let Some(identifier) = responses + + fn handle_first_response( + res: DeltaDiscoveryResponse, + ) -> Result<(String, String), tonic::Status> { + let Some(identifier) = res.control_plane.map(|cp| cp.identifier) else { + return Err(tonic::Status::invalid_argument( + "DeltaDiscoveryResponse.control_plane.identifier is required in the first message", + )); + }; + + Ok((identifier, res.system_version_info)) + } + + let first_response = responses .next() .await - .ok_or_else(|| tonic::Status::cancelled("received empty first response"))?? - .control_plane - .map(|cp| cp.identifier) - else { - return Err(tonic::Status::invalid_argument( - "DeltaDiscoveryResponse.control_plane.identifier is required in the first message", - )); - }; + .ok_or_else(|| tonic::Status::cancelled("received empty first response"))??; + + let (identifier, server_version) = handle_first_response(first_response)?; tracing::info!(identifier, "new control plane delta discovery stream"); let config = self.config.clone(); @@ -429,12 +446,16 @@ impl AggregatedControlPlaneDiscoveryService for tracing::info!(identifier, "sending initial delta discovery request"); let local = Arc::new(crate::config::LocalVersions::new( - config.interested_resources().map(|(n, _)| n), + config.interested_resources(&server_version).map(|(n, _)| n), )); - ds.refresh(&identifier, config.interested_resources().collect(), &local) - .await - .map_err(|error| tonic::Status::internal(error.to_string()))?; + ds.refresh( + &identifier, + config.interested_resources(&server_version).collect(), + &local, + ) + .await + .map_err(|error| tonic::Status::internal(error.to_string()))?; let mut response_stream = crate::config::handle_delta_discovery_responses( identifier.clone(), @@ -456,9 +477,13 @@ impl AggregatedControlPlaneDiscoveryService for .map_err(|_| tonic::Status::internal("this should not be reachable"))?; } else { tracing::trace!("exceeded idle interval, sending request"); - ds.refresh(&identifier, config.interested_resources().collect(), &local) - .await - .map_err(|error| tonic::Status::internal(error.to_string()))?; + ds.refresh( + &identifier, + config.interested_resources(&server_version).collect(), + &local, + ) + .await + .map_err(|error| tonic::Status::internal(error.to_string()))?; } } } diff --git a/examples/xds/src/main.rs b/examples/xds/src/main.rs index 6d6180aa52..73a1db23b5 100644 --- a/examples/xds/src/main.rs +++ b/examples/xds/src/main.rs @@ -59,7 +59,10 @@ impl xds::config::Configuration for ClientConfig { unreachable!(); } - fn interested_resources(&self) -> impl Iterator)> { + fn interested_resources( + &self, + _server_version: &str, + ) -> impl Iterator)> { [].into_iter() } @@ -145,13 +148,13 @@ impl xds::config::Configuration for ServerConfig { } } - Ok(DeltaDiscoveryRes { - resources, - removed, - }) + Ok(DeltaDiscoveryRes { resources, removed }) } - fn interested_resources(&self) -> impl Iterator)> { + fn interested_resources( + &self, + _server_version: &str, + ) -> impl Iterator)> { [(TYPE, Vec::new())].into_iter() } @@ -164,7 +167,9 @@ impl xds::config::Configuration for ServerConfig { async move { loop { - if item_watcher.recv().await.is_err() { break; }; + if item_watcher.recv().await.is_err() { + break; + }; control_plane.push_update(TYPE); } } @@ -189,17 +194,19 @@ async fn main() { let relay_listener = xds::net::TcpListener::bind(None).unwrap(); let addr = relay_listener.local_addr(); - let server = xds::server::ControlPlane::from_arc(sc.clone(), std::time::Duration::from_secs(60)).management_server(relay_listener).unwrap(); + let server = + xds::server::ControlPlane::from_arc(sc.clone(), std::time::Duration::from_secs(60)) + .management_server(relay_listener) + .unwrap(); - tokio::task::spawn(async move { - server.await - }); + tokio::task::spawn(async move { server.await }); let client = xds::client::AdsClient::connect( "client".into(), vec![format!("http://{addr}").try_into().unwrap()], ) - .await.unwrap(); + .await + .unwrap(); let (stx, srx) = tokio::sync::oneshot::channel(); @@ -207,15 +214,15 @@ async fn main() { let cc = cc.clone(); async move { let _stream = client - .delta_subscribe( - cc, - Arc::new(std::sync::atomic::AtomicBool::new(true)), - None, - [ - (TYPE, Vec::new()), - ], - ) - .await.map_err(|_| "failed to subscribe").unwrap(); + .delta_subscribe( + cc, + Arc::new(std::sync::atomic::AtomicBool::new(true)), + None, + &[("", &[(TYPE, Vec::new())])], + ) + .await + .map_err(|_| "failed to subscribe") + .unwrap(); srx.await.unwrap(); } diff --git a/src/cli.rs b/src/cli.rs index 89282ec973..b4ce2a78e5 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -123,12 +123,12 @@ impl Cli { LogFormats::Auto => { use std::io::IsTerminal; if !std::io::stdout().is_terminal() { - subscriber.json().init(); + subscriber.with_ansi(false).json().init(); } else { subscriber.init(); } } - LogFormats::Json => subscriber.json().init(), + LogFormats::Json => subscriber.with_ansi(false).json().init(), LogFormats::Plain => subscriber.init(), LogFormats::Pretty => subscriber.pretty().init(), } @@ -181,7 +181,31 @@ impl Cli { tracing::debug!(cli = ?self, "config parameters"); let config = Arc::new(match Self::read_config(self.config)? { - Some(config) => config, + Some(mut config) => { + // Workaround deficiency in serde flatten + untagged + if matches!(self.command, Commands::Agent(..)) { + config.datacenter = match config.datacenter { + crate::config::DatacenterConfig::Agent { + icao_code, + qcmp_port, + } => crate::config::DatacenterConfig::Agent { + icao_code, + qcmp_port, + }, + crate::config::DatacenterConfig::NonAgent { datacenters } => { + eyre::ensure!(datacenters.read().is_empty(), "starting an agent, but the configuration file has `datacenters` set"); + crate::config::DatacenterConfig::Agent { + icao_code: crate::config::Slot::new( + crate::config::IcaoCode::default(), + ), + qcmp_port: crate::config::Slot::new(0), + } + } + }; + } + + config + } None if matches!(self.command, Commands::Agent(..)) => Config::default_agent(), None => Config::default_non_agent(), }); diff --git a/src/components/proxy.rs b/src/components/proxy.rs index e04f3145f6..a4d60413e8 100644 --- a/src/components/proxy.rs +++ b/src/components/proxy.rs @@ -220,6 +220,26 @@ impl Proxy { shutdown_rx.clone(), ); + #[allow(clippy::type_complexity)] + const SUBS: &[(&str, &[(&str, Vec)])] = &[ + ( + "9", + &[ + (crate::xds::CLUSTER_TYPE, Vec::new()), + (crate::xds::DATACENTER_TYPE, Vec::new()), + (crate::xds::FILTER_CHAIN_TYPE, Vec::new()), + ], + ), + ( + "", + &[ + (crate::xds::CLUSTER_TYPE, Vec::new()), + (crate::xds::DATACENTER_TYPE, Vec::new()), + (crate::xds::LISTENER_TYPE, Vec::new()), + ], + ), + ]; + if !self.management_servers.is_empty() { { let mut lock = ready.xds_is_healthy.write(); @@ -259,16 +279,7 @@ impl Proxy { ready.xds_is_healthy.read().as_ref().unwrap().clone(); let _stream = client - .delta_subscribe( - config.clone(), - xds_is_healthy.clone(), - tx, - [ - (crate::xds::CLUSTER_TYPE, Vec::new()), - (crate::xds::DATACENTER_TYPE, Vec::new()), - (crate::xds::FILTER_CHAIN_TYPE, Vec::new()), - ], - ) + .delta_subscribe(config.clone(), xds_is_healthy.clone(), tx, SUBS) .await .map_err(|_| eyre::eyre!("failed to acquire delta stream"))?; diff --git a/src/config.rs b/src/config.rs index c28fdd7d8b..38785f966d 100644 --- a/src/config.rs +++ b/src/config.rs @@ -110,7 +110,10 @@ impl quilkin_xds::config::Configuration for Config { self.delta_discovery_request(client_state) } - fn interested_resources(&self) -> impl Iterator)> { + fn interested_resources( + &self, + _server_version: &str, + ) -> impl Iterator)> { [ (crate::xds::CLUSTER_TYPE, Vec::new()), (crate::xds::DATACENTER_TYPE, Vec::new()), @@ -248,6 +251,21 @@ impl Config { cache_control: None, }); } + crate::xds::ResourceType::Listener => { + let resource = crate::xds::Resource::Listener( + crate::net::cluster::proto::FilterChain::try_from(&*self.filters.load())?, + ); + let any = resource.try_encode()?; + + resources.push(XdsResource { + name: "listener".into(), + version: "0".into(), + resource: Some(any), + aliases: Vec::new(), + ttl: None, + cache_control: None, + }); + } crate::xds::ResourceType::Datacenter => match &self.datacenter { DatacenterConfig::Agent { qcmp_port, @@ -391,7 +409,7 @@ impl Config { let resource_type: crate::xds::ResourceType = type_url.parse()?; match resource_type { - crate::xds::ResourceType::FilterChain => { + crate::xds::ResourceType::FilterChain | crate::xds::ResourceType::Listener => { // Server should only ever send exactly one filter chain, more or less indicates a bug let Some(res) = resources.pop() else { eyre::bail!("no resources in delta response"); @@ -406,12 +424,13 @@ impl Config { eyre::bail!("filter chain response did not contain a resource payload"); }; - let crate::xds::Resource::FilterChain(resource) = - crate::xds::Resource::try_decode(resource)? - else { - eyre::bail!( - "filter chain response contained a non-FilterChain resource payload" - ); + let resource = match crate::xds::Resource::try_decode(resource)? { + crate::xds::Resource::FilterChain(r) | crate::xds::Resource::Listener(r) => r, + res => { + eyre::bail!( + "filter chain response contained a {} resource payload", res.type_url() + ); + } }; let fc = diff --git a/src/test.rs b/src/test.rs index a11f789c5f..984194c945 100644 --- a/src/test.rs +++ b/src/test.rs @@ -39,6 +39,7 @@ pub fn enable_log(filter: impl Into) { LOG_ONCE.call_once(|| { tracing_subscriber::fmt() .pretty() + .with_ansi(false) .with_env_filter(filter) .init() }); diff --git a/src/xds.rs b/src/xds.rs index 1289c0113f..05c93a3366 100644 --- a/src/xds.rs +++ b/src/xds.rs @@ -21,19 +21,25 @@ use prost_types::Any; pub const CLUSTER_TYPE: &str = "type.googleapis.com/quilkin.config.v1alpha1.Cluster"; pub const DATACENTER_TYPE: &str = "type.googleapis.com/quilkin.config.v1alpha1.Datacenter"; pub const FILTER_CHAIN_TYPE: &str = "type.googleapis.com/quilkin.config.v1alpha1.FilterChain"; +pub const LISTENER_TYPE: &str = "type.googleapis.com/envoy.config.listener.v3.Listener"; const PREFIX: &str = "type.googleapis.com/quilkin.config.v1alpha1."; pub enum Resource { Cluster(proto::Cluster), Datacenter(proto::Datacenter), FilterChain(proto::FilterChain), + Listener(proto::FilterChain), } impl Resource { #[inline] pub fn try_decode(any: Any) -> Result { let Some(suffix) = any.type_url.strip_prefix(PREFIX) else { - eyre::bail!("unknown resource type '{}'", any.type_url); + if any.type_url == LISTENER_TYPE { + return Self::decode_listener(&any.value); + } else { + eyre::bail!("unknown resource type '{}'", any.type_url); + } }; Ok(match suffix { @@ -44,6 +50,65 @@ impl Resource { }) } + #[inline] + fn decode_listener(buf: &[u8]) -> eyre::Result { + let mut listener = + quilkin_xds::generated::envoy::config::listener::v3::Listener::decode(buf)?; + eyre::ensure!( + !listener.filter_chains.is_empty(), + "{LISTENER_TYPE} resource had no filter chains" + ); + eyre::ensure!( + listener.filter_chains.len() == 1, + "{LISTENER_TYPE} resource had more than one filter chain" + ); + let filter_chain = listener.filter_chains.swap_remove(0); + + let filters = filter_chain + .filters + .into_iter() + .map(|filter| { + use quilkin_xds::generated::envoy::config::listener::v3::filter::ConfigType; + + let config = if let Some(config_type) = filter.config_type { + let config = match config_type { + ConfigType::TypedConfig(any) => any, + ConfigType::ConfigDiscovery(_) => { + eyre::bail!("ConfigDiscovery is not supported") + } + }; + + let len = config.value.len(); + + let json_value = match crate::filters::FilterRegistry::get_factory(&filter.name) + .ok_or_else(|| { + crate::filters::CreationError::NotFound(filter.name.clone()) + })? + .encode_config_to_json(config) + { + Ok(jv) => jv, + Err(err) => { + tracing::error!("wtf {} {len} {err:#}", filter.name); + return Err(err.into()); + } + }; + + Some(serde_json::to_string(&json_value)?) + } else { + None + }; + + Ok(proto::Filter { + name: filter.name, + label: None, + config, + }) + }) + .collect::>>()?; + + Ok(Self::Listener(proto::FilterChain { filters })) + } + #[inline] pub fn try_encode(&self) -> Result { let (value, type_url) = match self { @@ -62,6 +127,28 @@ impl Resource { f.encode(&mut value)?; (value, FILTER_CHAIN_TYPE) } + Self::Listener(f) => { + let l = quilkin_xds::generated::envoy::config::listener::v3::Listener { + filter_chains: vec![quilkin_xds::generated::envoy::config::listener::v3::FilterChain { + filters: f.filters.iter().map(|f| { + quilkin_xds::generated::envoy::config::listener::v3::Filter { + name: f.name.clone(), + config_type: if let Some(cfg) = &f.config { + let jval: serde_json::Value = serde_json::from_str(cfg).unwrap(); + Some(quilkin_xds::generated::envoy::config::listener::v3::filter::ConfigType::TypedConfig(crate::filters::FilterRegistry::get_factory(&f.name).unwrap().encode_config_to_protobuf(jval).unwrap())) + } else { + None + }, + } + }).collect(), + ..Default::default() + }], + ..Default::default() + }; + let mut value = Vec::with_capacity(l.encoded_len()); + l.encode(&mut value)?; + (value, LISTENER_TYPE) + } }; Ok(Any { @@ -76,6 +163,7 @@ impl Resource { Self::Cluster(_) => CLUSTER_TYPE, Self::Datacenter(_) => DATACENTER_TYPE, Self::FilterChain(_) => FILTER_CHAIN_TYPE, + Self::Listener(_) => LISTENER_TYPE, } } } @@ -85,13 +173,18 @@ pub enum ResourceType { Cluster, Datacenter, FilterChain, + Listener, } impl std::str::FromStr for ResourceType { type Err = eyre::Error; fn from_str(s: &str) -> Result { let Some(suffix) = s.strip_prefix(PREFIX) else { - eyre::bail!("unknown resource type '{s}'"); + if s == LISTENER_TYPE { + return Ok(Self::Listener); + } else { + eyre::bail!("unknown resource type '{s}'"); + } }; Ok(match suffix {