From a160fc7b5c086ba27d205914c70bfd36aa2d6dbe Mon Sep 17 00:00:00 2001 From: Erin Power Date: Tue, 26 Sep 2023 09:49:11 +0200 Subject: [PATCH] Add health checks for each service --- benches/throughput.rs | 3 +- examples/quilkin-filter-example/src/main.rs | 4 +- src/admin.rs | 160 --------------- src/cli.rs | 84 +++++--- src/cli/admin.rs | 211 ++++++++++++++++++++ src/{ => cli}/admin/health.rs | 0 src/cli/agent.rs | 35 +++- src/cli/manage.rs | 24 ++- src/cli/proxy.rs | 48 ++++- src/cli/relay.rs | 105 ++++++---- src/config/providers.rs | 27 ++- src/config/watch/agones.rs | 20 +- src/config/watch/fs.rs | 9 +- src/lib.rs | 1 - src/test_utils.rs | 9 +- src/xds.rs | 10 +- src/xds/client.rs | 42 +++- src/xds/server.rs | 9 +- tests/qcmp.rs | 3 +- 19 files changed, 535 insertions(+), 269 deletions(-) delete mode 100644 src/admin.rs create mode 100644 src/cli/admin.rs rename src/{ => cli}/admin/health.rs (100%) diff --git a/benches/throughput.rs b/benches/throughput.rs index 6480075e78..2cd558d259 100644 --- a/benches/throughput.rs +++ b/benches/throughput.rs @@ -43,7 +43,8 @@ fn run_quilkin(port: u16, endpoint: SocketAddr) { runtime.block_on(async move { let (_shutdown_tx, shutdown_rx) = tokio::sync::watch::channel::<()>(()); - proxy.run(config, shutdown_rx).await.unwrap(); + let admin = quilkin::cli::Admin::Proxy(<_>::default()); + proxy.run(config, admin, shutdown_rx).await.unwrap(); }); }); } diff --git a/examples/quilkin-filter-example/src/main.rs b/examples/quilkin-filter-example/src/main.rs index 57e2c5ccad..1175c52d6c 100644 --- a/examples/quilkin-filter-example/src/main.rs +++ b/examples/quilkin-filter-example/src/main.rs @@ -113,6 +113,8 @@ async fn main() -> quilkin::Result<()> { ) }); - proxy.run(config.into(), shutdown_rx).await + let admin = quilkin::cli::Admin::Proxy(<_>::default()); + + proxy.run(config.into(), admin, shutdown_rx).await } // ANCHOR_END: run diff --git a/src/admin.rs b/src/admin.rs deleted file mode 100644 index c18afe4126..0000000000 --- a/src/admin.rs +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Copyright 2021 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -mod health; - -use std::convert::Infallible; -use std::sync::Arc; - -use hyper::service::{make_service_fn, service_fn}; -use hyper::{Body, Method, Request, Response, Server as HyperServer, StatusCode}; - -use self::health::Health; -use crate::config::Config; - -pub const PORT: u16 = 8000; - -/// Define which mode Quilkin is in. -#[derive(Copy, Clone, Debug)] -pub enum Mode { - Proxy, - Xds, -} - -pub fn server( - mode: Mode, - config: Arc, - address: Option, -) -> tokio::task::JoinHandle> { - let address = address.unwrap_or_else(|| (std::net::Ipv6Addr::UNSPECIFIED, PORT).into()); - let health = Health::new(); - tracing::info!(address = %address, "Starting admin endpoint"); - - let make_svc = make_service_fn(move |_conn| { - let config = config.clone(); - let health = health.clone(); - async move { - let config = config.clone(); - let health = health.clone(); - Ok::<_, Infallible>(service_fn(move |req| { - let config = config.clone(); - let health = health.clone(); - async move { Ok::<_, Infallible>(handle_request(req, mode, config, health)) } - })) - } - }); - - tokio::spawn(HyperServer::bind(&address).serve(make_svc)) -} - -fn handle_request( - request: Request, - mode: Mode, - config: Arc, - health: Health, -) -> Response { - match (request.method(), request.uri().path()) { - (&Method::GET, "/metrics") => collect_metrics(), - (&Method::GET, "/live" | "/livez") => health.check_healthy(), - (&Method::GET, "/ready" | "/readyz") => match mode { - Mode::Proxy => check_proxy_readiness(&config), - Mode::Xds => health.check_healthy(), - }, - (&Method::GET, "/config") => match serde_json::to_string(&config) { - Ok(body) => Response::builder() - .status(StatusCode::OK) - .header( - "Content-Type", - hyper::header::HeaderValue::from_static("application/json"), - ) - .body(Body::from(body)) - .unwrap(), - Err(err) => Response::builder() - .status(StatusCode::INTERNAL_SERVER_ERROR) - .body(Body::from(format!("failed to create config dump: {err}"))) - .unwrap(), - }, - (_, _) => { - let mut response = Response::new(Body::empty()); - *response.status_mut() = StatusCode::NOT_FOUND; - response - } - } -} - -fn check_proxy_readiness(config: &Config) -> Response { - if config.clusters.read().endpoints().count() > 0 { - return Response::new("ok".into()); - } - - let mut response = Response::new(Body::empty()); - *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; - response -} - -fn collect_metrics() -> Response { - let mut response = Response::new(Body::empty()); - let mut buffer = vec![]; - let encoder = prometheus::TextEncoder::new(); - let body = - prometheus::Encoder::encode(&encoder, &crate::metrics::registry().gather(), &mut buffer) - .map_err(|error| tracing::warn!(%error, "Failed to encode metrics")) - .and_then(|_| { - String::from_utf8(buffer) - .map(Body::from) - .map_err(|error| tracing::warn!(%error, "Failed to convert metrics to utf8")) - }); - - match body { - Ok(body) => { - *response.body_mut() = body; - } - Err(_) => { - *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; - } - }; - - response -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::endpoint::Endpoint; - - #[tokio::test] - async fn collect_metrics() { - let response = super::collect_metrics(); - assert_eq!(response.status(), hyper::StatusCode::OK); - } - - #[test] - fn check_proxy_readiness() { - let config = Config::default(); - assert_eq!(config.clusters.read().endpoints().count(), 0); - - let response = super::check_proxy_readiness(&config); - assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR); - - config - .clusters - .write() - .insert_default([Endpoint::new((std::net::Ipv4Addr::LOCALHOST, 25999).into())].into()); - - let response = super::check_proxy_readiness(&config); - assert_eq!(response.status(), StatusCode::OK); - } -} diff --git a/src/cli.rs b/src/cli.rs index ebaba32e35..dfd3db33e4 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -14,6 +14,8 @@ * limitations under the License. */ +pub(crate) mod admin; + use std::{ path::{Path, PathBuf}, sync::Arc, @@ -23,12 +25,12 @@ use clap::builder::TypedValueParser; use clap::crate_version; use tokio::{signal, sync::watch}; -use crate::{admin::Mode, Config}; +use crate::Config; use strum_macros::{Display, EnumString}; pub use self::{ - agent::Agent, generate_config_schema::GenerateConfigSchema, manage::Manage, proxy::Proxy, - qcmp::Qcmp, relay::Relay, + admin::Admin, agent::Agent, generate_config_schema::GenerateConfigSchema, manage::Manage, + proxy::Proxy, qcmp::Qcmp, relay::Relay, }; macro_rules! define_port { @@ -106,10 +108,21 @@ pub enum Commands { } impl Commands { - pub fn admin_mode(&self) -> Option { + pub fn admin_mode(&self) -> Option { match self { - Self::Proxy(_) | Self::Agent(_) => Some(Mode::Proxy), - Self::Relay(_) | Self::Manage(_) => Some(Mode::Xds), + Self::Proxy(proxy) => Some(Admin::Proxy(proxy::RuntimeConfig { + idle_request_interval_secs: proxy.idle_request_interval_secs, + ..<_>::default() + })), + Self::Agent(agent) => Some(Admin::Agent(agent::RuntimeConfig { + idle_request_interval_secs: agent.idle_request_interval_secs, + ..<_>::default() + })), + Self::Relay(relay) => Some(Admin::Relay(relay::RuntimeConfig { + idle_request_interval_secs: relay.idle_request_interval_secs, + ..<_>::default() + })), + Self::Manage(_) => Some(Admin::Xds(<_>::default())), Self::GenerateConfigSchema(_) | Self::Qcmp(_) => None, } } @@ -148,24 +161,24 @@ impl Cli { "Starting Quilkin" ); - if let Commands::Qcmp(Qcmp::Ping(ping)) = self.command { - return ping.run().await; + // Non-long running commands (e.g. ones with no administration server) + // are executed here. + match self.command { + Commands::Qcmp(Qcmp::Ping(ping)) => return ping.run().await, + Commands::GenerateConfigSchema(generator) => { + return generator.generate_config_schema(); + } + _ => {} } tracing::debug!(cli = ?self, "config parameters"); let config = Arc::new(Self::read_config(self.config)?); - let _admin_task = self - .command - .admin_mode() - .filter(|_| !self.no_admin) - .map(|mode| { - tokio::spawn(crate::admin::server( - mode, - config.clone(), - self.admin_address, - )) - }); + let mode = self.command.admin_mode().unwrap(); + + if !self.no_admin { + mode.server(config.clone(), self.admin_address); + } let (shutdown_tx, shutdown_rx) = watch::channel::<()>(()); @@ -191,37 +204,45 @@ impl Cli { let fut = tryhard::retry_fn({ let shutdown_rx = shutdown_rx.clone(); + let mode = mode.clone(); move || match self.command.clone() { Commands::Agent(agent) => { let config = config.clone(); let shutdown_rx = shutdown_rx.clone(); - tokio::spawn( - async move { agent.run(config.clone(), shutdown_rx.clone()).await }, - ) + let mode = mode.clone(); + tokio::spawn(async move { + agent.run(config.clone(), mode, shutdown_rx.clone()).await + }) } Commands::Proxy(runner) => { let config = config.clone(); let shutdown_rx = shutdown_rx.clone(); - tokio::spawn( - async move { runner.run(config.clone(), shutdown_rx.clone()).await }, - ) + let mode = mode.clone(); + tokio::spawn(async move { + runner + .run(config.clone(), mode.clone(), shutdown_rx.clone()) + .await + }) } Commands::Manage(manager) => { let config = config.clone(); let shutdown_rx = shutdown_rx.clone(); + let mode = mode.clone(); tokio::spawn(async move { - manager.manage(config.clone(), shutdown_rx.clone()).await + manager + .manage(config.clone(), mode, shutdown_rx.clone()) + .await }) } - Commands::GenerateConfigSchema(generator) => { - tokio::spawn(std::future::ready(generator.generate_config_schema())) - } Commands::Relay(relay) => { let config = config.clone(); let shutdown_rx = shutdown_rx.clone(); - tokio::spawn(async move { relay.relay(config, shutdown_rx.clone()).await }) + let mode = mode.clone(); + tokio::spawn( + async move { relay.relay(config, mode, shutdown_rx.clone()).await }, + ) } - Commands::Qcmp(_) => unreachable!(), + Commands::GenerateConfigSchema(_) | Commands::Qcmp(_) => unreachable!(), } }) .retries(3) @@ -354,6 +375,7 @@ mod tests { region: None, sub_zone: None, zone: None, + idle_request_interval_secs: admin::IDLE_REQUEST_INTERVAL_SECS, qcmp_port: crate::test_utils::available_addr(&AddressType::Random) .await .port(), diff --git a/src/cli/admin.rs b/src/cli/admin.rs new file mode 100644 index 0000000000..27c8ffa5c1 --- /dev/null +++ b/src/cli/admin.rs @@ -0,0 +1,211 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +mod health; + +use std::convert::Infallible; +use std::sync::Arc; + +use hyper::service::{make_service_fn, service_fn}; +use hyper::{Body, Method, Request, Response, Server as HyperServer, StatusCode}; + +use self::health::Health; +use crate::config::Config; + +use super::{agent, manage, proxy, relay}; + +pub const PORT: u16 = 8000; + +pub(crate) const IDLE_REQUEST_INTERVAL_SECS: u64 = 30; + +/// The runtime mode of Quilkin, which contains various runtime configurations +/// specific to a mode. +#[derive(Clone, Debug)] +pub enum Admin { + Proxy(proxy::RuntimeConfig), + Relay(relay::RuntimeConfig), + Xds(manage::RuntimeConfig), + Agent(agent::RuntimeConfig), +} + +impl Admin { + #[track_caller] + pub fn unwrap_agent(&self) -> &agent::RuntimeConfig { + match self { + Self::Agent(config) => config, + _ => panic!("attempted to unwrap agent config when not in agent mode"), + } + } + + #[track_caller] + pub fn unwrap_proxy(&self) -> &proxy::RuntimeConfig { + match self { + Self::Proxy(config) => config, + _ => panic!("attempted to unwrap proxy config when not in proxy mode"), + } + } + + #[track_caller] + pub fn unwrap_relay(&self) -> &relay::RuntimeConfig { + match self { + Self::Relay(config) => config, + _ => panic!("attempted to unwrap relay config when not in relay mode"), + } + } + + pub fn idle_request_interval_secs(&self) -> u64 { + match self { + Self::Proxy(config) => config.idle_request_interval_secs, + Self::Agent(config) => config.idle_request_interval_secs, + Self::Relay(config) => config.idle_request_interval_secs, + _ => IDLE_REQUEST_INTERVAL_SECS, + } + } + + pub fn server( + &self, + config: Arc, + address: Option, + ) -> tokio::task::JoinHandle> { + let address = address.unwrap_or_else(|| (std::net::Ipv6Addr::UNSPECIFIED, PORT).into()); + let health = Health::new(); + tracing::info!(address = %address, "Starting admin endpoint"); + + let mode = self.clone(); + let make_svc = make_service_fn(move |_conn| { + let config = config.clone(); + let health = health.clone(); + let mode = mode.clone(); + async move { + let config = config.clone(); + let health = health.clone(); + let mode = mode.clone(); + Ok::<_, Infallible>(service_fn(move |req| { + let config = config.clone(); + let health = health.clone(); + let mode = mode.clone(); + async move { Ok::<_, Infallible>(mode.handle_request(req, config, health)) } + })) + } + }); + + tokio::spawn(HyperServer::bind(&address).serve(make_svc)) + } + + fn is_healthy(&self, config: &Config) -> bool { + match &self { + Self::Proxy(proxy) => proxy.is_healthy(config), + Self::Agent(agent) => agent.is_healthy(), + Self::Xds(manage) => manage.is_healthy(), + Self::Relay(relay) => relay.is_healthy(), + } + } + + fn handle_request( + &self, + request: Request, + config: Arc, + health: Health, + ) -> Response { + match (request.method(), request.uri().path()) { + (&Method::GET, "/metrics") => collect_metrics(), + (&Method::GET, "/live" | "/livez") => health.check_healthy(), + (&Method::GET, "/ready" | "/readyz") => check_readiness(|| self.is_healthy(&config)), + (&Method::GET, "/config") => match serde_json::to_string(&config) { + Ok(body) => Response::builder() + .status(StatusCode::OK) + .header( + "Content-Type", + hyper::header::HeaderValue::from_static("application/json"), + ) + .body(Body::from(body)) + .unwrap(), + Err(err) => Response::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .body(Body::from(format!("failed to create config dump: {err}"))) + .unwrap(), + }, + (_, _) => { + let mut response = Response::new(Body::empty()); + *response.status_mut() = StatusCode::NOT_FOUND; + response + } + } + } +} + +fn check_readiness(check: impl Fn() -> bool) -> Response { + if (check)() { + return Response::new("ok".into()); + } + + let mut response = Response::new(Body::empty()); + *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; + response +} + +fn collect_metrics() -> Response { + let mut response = Response::new(Body::empty()); + let mut buffer = vec![]; + let encoder = prometheus::TextEncoder::new(); + let body = + prometheus::Encoder::encode(&encoder, &crate::metrics::registry().gather(), &mut buffer) + .map_err(|error| tracing::warn!(%error, "Failed to encode metrics")) + .and_then(|_| { + String::from_utf8(buffer) + .map(Body::from) + .map_err(|error| tracing::warn!(%error, "Failed to convert metrics to utf8")) + }); + + match body { + Ok(body) => { + *response.body_mut() = body; + } + Err(_) => { + *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; + } + }; + + response +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::endpoint::Endpoint; + + #[tokio::test] + async fn collect_metrics() { + let response = super::collect_metrics(); + assert_eq!(response.status(), hyper::StatusCode::OK); + } + + #[test] + fn check_proxy_readiness() { + let config = crate::Config::default(); + assert_eq!(config.clusters.read().endpoints().count(), 0); + + let admin = Admin::Proxy(<_>::default()); + assert!(!admin.is_healthy(&config)); + + config + .clusters + .write() + .insert_default([Endpoint::new((std::net::Ipv4Addr::LOCALHOST, 25999).into())].into()); + + assert!(admin.is_healthy(&config)); + } +} diff --git a/src/admin/health.rs b/src/cli/admin/health.rs similarity index 100% rename from src/admin/health.rs rename to src/cli/admin/health.rs diff --git a/src/cli/agent.rs b/src/cli/agent.rs index 9ef76506e1..ea75b31dc8 100644 --- a/src/cli/agent.rs +++ b/src/cli/agent.rs @@ -14,8 +14,12 @@ * limitations under the License. */ -use std::sync::Arc; +use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, +}; +use super::Admin; use crate::config::Config; define_port!(7600); @@ -47,6 +51,10 @@ pub struct Agent { /// The configuration source for a management server. #[clap(subcommand)] pub provider: Option, + /// The interval in seconds at which the agent will wait for a discovery + /// request from a relay server before restarting the connection. + #[clap(long, env = "QUILKIN_IDLE_REQUEST_INTERVAL_SECS", default_value_t = super::admin::IDLE_REQUEST_INTERVAL_SECS)] + pub idle_request_interval_secs: u64, } impl Default for Agent { @@ -58,6 +66,7 @@ impl Default for Agent { zone: <_>::default(), sub_zone: <_>::default(), provider: <_>::default(), + idle_request_interval_secs: super::admin::IDLE_REQUEST_INTERVAL_SECS, } } } @@ -66,6 +75,7 @@ impl Agent { pub async fn run( &self, config: Arc, + mode: Admin, mut shutdown_rx: tokio::sync::watch::Receiver<()>, ) -> crate::Result<()> { let locality = (self.region.is_some() || self.zone.is_some() || self.sub_zone.is_some()) @@ -75,14 +85,21 @@ impl Agent { sub_zone: self.sub_zone.clone().unwrap_or_default(), }); + let runtime_config = mode.unwrap_agent(); + let _mds_task = if !self.relay.is_empty() { let _provider_task = match self.provider.as_ref() { - Some(provider) => Some(provider.spawn(config.clone(), locality.clone())), + Some(provider) => Some(provider.spawn( + config.clone(), + runtime_config.provider_is_healthy.clone(), + locality.clone(), + )), None => return Err(eyre::eyre!("no configuration provider given")), }; let task = crate::xds::client::MdsClient::connect( String::clone(&config.id.load()), + mode.clone(), self.relay.clone(), ); @@ -99,3 +116,17 @@ impl Agent { shutdown_rx.changed().await.map_err(From::from) } } + +#[derive(Clone, Debug, Default)] +pub struct RuntimeConfig { + pub idle_request_interval_secs: u64, + pub provider_is_healthy: Arc, + pub relay_is_healthy: Arc, +} + +impl RuntimeConfig { + pub fn is_healthy(&self) -> bool { + self.provider_is_healthy.load(Ordering::SeqCst) + && self.relay_is_healthy.load(Ordering::SeqCst) + } +} diff --git a/src/cli/manage.rs b/src/cli/manage.rs index 22b226bd03..d808000f50 100644 --- a/src/cli/manage.rs +++ b/src/cli/manage.rs @@ -14,6 +14,13 @@ * limitations under the License. */ +use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, +}; + +use super::Admin; + use futures::TryFutureExt; define_port!(7800); @@ -49,6 +56,7 @@ impl Manage { pub async fn manage( &self, config: std::sync::Arc, + mode: Admin, mut shutdown_rx: tokio::sync::watch::Receiver<()>, ) -> crate::Result<()> { let locality = (self.region.is_some() || self.zone.is_some() || self.sub_zone.is_some()) @@ -64,12 +72,15 @@ impl Manage { .modify(|map| map.update_unlocated_endpoints(locality.clone())); } - let provider_task = self.provider.spawn(config.clone(), locality.clone()); + let provider_task = self + .provider + .spawn(config.clone(), <_>::default(), locality.clone()); let _relay_stream = if !self.relay.is_empty() { tracing::info!("connecting to relay server"); let client = crate::xds::client::MdsClient::connect( String::clone(&config.id.load()), + mode.clone(), self.relay.clone(), ) .await?; @@ -89,3 +100,14 @@ impl Manage { } } } + +#[derive(Clone, Debug, Default)] +pub struct RuntimeConfig { + pub provider_is_healthy: Arc, +} + +impl RuntimeConfig { + pub fn is_healthy(&self) -> bool { + self.provider_is_healthy.load(Ordering::SeqCst) + } +} diff --git a/src/cli/proxy.rs b/src/cli/proxy.rs index 0b1e891a89..c4cdca4498 100644 --- a/src/cli/proxy.rs +++ b/src/cli/proxy.rs @@ -14,10 +14,18 @@ * limitations under the License. */ -use std::{net::SocketAddr, sync::Arc, time::Duration}; +use std::{ + net::SocketAddr, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + time::Duration, +}; use tonic::transport::Endpoint; +use super::Admin; use crate::{proxy::SessionMap, xds::ResourceType, Config, Result}; #[cfg(doc)] @@ -48,7 +56,7 @@ pub struct Proxy { pub to: Vec, /// The interval in seconds at which the relay will send a discovery request /// to an management server after receiving no updates. - #[clap(long, env = "QUILKIN_IDLE_REQUEST_INTERVAL_SECS", default_value_t = crate::xds::server::IDLE_REQUEST_INTERVAL_SECS)] + #[clap(long, env = "QUILKIN_IDLE_REQUEST_INTERVAL_SECS", default_value_t = super::admin::IDLE_REQUEST_INTERVAL_SECS)] pub idle_request_interval_secs: u64, } @@ -60,7 +68,7 @@ impl Default for Proxy { port: PORT, qcmp_port: QCMP_PORT, to: <_>::default(), - idle_request_interval_secs: crate::xds::server::IDLE_REQUEST_INTERVAL_SECS, + idle_request_interval_secs: super::admin::IDLE_REQUEST_INTERVAL_SECS, } } } @@ -70,6 +78,7 @@ impl Proxy { pub async fn run( &self, config: std::sync::Arc, + mode: Admin, mut shutdown_rx: tokio::sync::watch::Receiver<()>, ) -> crate::Result<()> { const SESSION_TIMEOUT_SECONDS: Duration = Duration::from_secs(60); @@ -114,11 +123,21 @@ impl Proxy { tracing::info!(port = self.port, proxy_id = &*id, "Starting"); let sessions = SessionMap::new(SESSION_TIMEOUT_SECONDS, SESSION_EXPIRY_POLL_INTERVAL); + let runtime_config = mode.unwrap_proxy(); let _xds_stream = if !self.management_server.is_empty() { - let client = - crate::xds::AdsClient::connect(String::clone(&id), self.management_server.clone()) - .await?; + { + let mut lock = runtime_config.xds_is_healthy.write(); + let check: Arc = <_>::default(); + *lock = Some(check.clone()); + } + + let client = crate::xds::AdsClient::connect( + String::clone(&id), + mode.clone(), + self.management_server.clone(), + ) + .await?; let mut stream = client.xds_client_stream(config.clone(), self.idle_request_interval_secs); @@ -399,3 +418,20 @@ mod tests { ); } } + +#[derive(Clone, Debug, Default)] +pub struct RuntimeConfig { + pub idle_request_interval_secs: u64, + // RwLock as this check is conditional on the proxy using xDS. + pub xds_is_healthy: Arc>>>, +} + +impl RuntimeConfig { + pub fn is_healthy(&self, config: &Config) -> bool { + self.xds_is_healthy + .read() + .as_ref() + .map_or(true, |health| health.load(Ordering::SeqCst)) + && config.clusters.read().endpoints().count() != 0 + } +} diff --git a/src/cli/relay.rs b/src/cli/relay.rs index fc075653a4..72e1f020a0 100644 --- a/src/cli/relay.rs +++ b/src/cli/relay.rs @@ -14,7 +14,10 @@ * limitations under the License. */ -use std::sync::Arc; +use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, +}; use futures::StreamExt; @@ -36,7 +39,7 @@ pub struct Relay { pub xds_port: u16, /// The interval in seconds at which the relay will send a discovery request /// to an management server after receiving no updates. - #[clap(long, env = "QUILKIN_IDLE_REQUEST_INTERVAL_SECS", default_value_t = crate::xds::server::IDLE_REQUEST_INTERVAL_SECS)] + #[clap(long, env = "QUILKIN_IDLE_REQUEST_INTERVAL_SECS", default_value_t = super::admin::IDLE_REQUEST_INTERVAL_SECS)] pub idle_request_interval_secs: u64, #[clap(subcommand)] pub providers: Option, @@ -47,7 +50,7 @@ impl Default for Relay { Self { mds_port: PORT, xds_port: super::manage::PORT, - idle_request_interval_secs: crate::xds::server::IDLE_REQUEST_INTERVAL_SECS, + idle_request_interval_secs: super::admin::IDLE_REQUEST_INTERVAL_SECS, providers: None, } } @@ -57,6 +60,7 @@ impl Relay { pub async fn relay( &self, config: Arc, + mode: crate::cli::Admin, mut shutdown_rx: tokio::sync::watch::Receiver<()>, ) -> crate::Result<()> { let xds_server = crate::xds::server::spawn(self.xds_port, config.clone()); @@ -65,6 +69,7 @@ impl Relay { self.idle_request_interval_secs, config.clone(), )); + let runtime_config = mode.unwrap_relay(); let _provider_task = if let Some(Providers::Agones { config_namespace, .. @@ -72,45 +77,65 @@ impl Relay { { let config = config.clone(); let config_namespace = config_namespace.clone(); - Some(tokio::spawn(Providers::task(move || { - let config = config.clone(); - let config_namespace = config_namespace.clone(); - async move { - let client = tokio::time::timeout( - std::time::Duration::from_secs(5), - kube::Client::try_default(), - ) - .await??; + let provider_is_healthy = runtime_config.provider_is_healthy.clone(); + Some(tokio::spawn(Providers::task( + provider_is_healthy.clone(), + move || { + let config = config.clone(); + let config_namespace = config_namespace.clone(); + let provider_is_healthy = provider_is_healthy.clone(); + async move { + let client = tokio::time::timeout( + std::time::Duration::from_secs(5), + kube::Client::try_default(), + ) + .await??; - let configmap_reflector = - crate::config::providers::k8s::update_filters_from_configmap( - client.clone(), - config_namespace, - config.clone(), - ); + let configmap_reflector = + crate::config::providers::k8s::update_filters_from_configmap( + client.clone(), + config_namespace, + config.clone(), + ); - tokio::pin!(configmap_reflector); + tokio::pin!(configmap_reflector); - loop { - match configmap_reflector.next().await { - Some(Ok(_)) => (), - Some(Err(error)) => return Err(error), - None => break, + loop { + match configmap_reflector.next().await { + Some(Ok(_)) => { + provider_is_healthy.store(true, Ordering::SeqCst); + } + Some(Err(error)) => { + provider_is_healthy.store(false, Ordering::SeqCst); + return Err(error); + } + None => { + provider_is_healthy.store(false, Ordering::SeqCst); + break; + } + } } - } - tracing::info!("configmap stream ending"); - Ok(()) - } - }))) + tracing::info!("configmap stream ending"); + Ok(()) + } + }, + ))) } else if let Some(Providers::File { path }) = &self.providers { let config = config.clone(); let path = path.clone(); - Some(tokio::spawn(Providers::task(move || { - let config = config.clone(); - let path = path.clone(); - async move { crate::config::watch::fs(config, path, None).await } - }))) + let provider_is_healthy = runtime_config.provider_is_healthy.clone(); + Some(tokio::spawn(Providers::task( + provider_is_healthy.clone(), + move || { + let config = config.clone(); + let path = path.clone(); + let provider_is_healthy = provider_is_healthy.clone(); + async move { + crate::config::watch::fs(config, provider_is_healthy, path, None).await + } + }, + ))) } else { None }; @@ -126,3 +151,15 @@ impl Relay { } } } + +#[derive(Clone, Debug, Default)] +pub struct RuntimeConfig { + pub idle_request_interval_secs: u64, + pub provider_is_healthy: Arc, +} + +impl RuntimeConfig { + pub fn is_healthy(&self) -> bool { + self.provider_is_healthy.load(Ordering::SeqCst) + } +} diff --git a/src/config/providers.rs b/src/config/providers.rs index 2bd331a3e1..ef543a71a1 100644 --- a/src/config/providers.rs +++ b/src/config/providers.rs @@ -1,3 +1,7 @@ +use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, +}; pub mod k8s; const RETRIES: u32 = 25; @@ -41,33 +45,47 @@ impl Providers { pub fn spawn( &self, config: std::sync::Arc, + health_check: Arc, locality: Option, ) -> tokio::task::JoinHandle> { match &self { Self::Agones { gameservers_namespace, config_namespace, - } => tokio::spawn(Self::task({ + } => tokio::spawn(Self::task(health_check.clone(), { let gameservers_namespace = gameservers_namespace.clone(); let config_namespace = config_namespace.clone(); + let health_check = health_check.clone(); move || { crate::config::watch::agones( gameservers_namespace.clone(), config_namespace.clone(), + health_check.clone(), locality.clone(), config.clone(), ) } })), - Self::File { path } => tokio::spawn(Self::task({ + Self::File { path } => tokio::spawn(Self::task(health_check.clone(), { let path = path.clone(); - move || crate::config::watch::fs(config.clone(), path.clone(), locality.clone()) + let health_check = health_check.clone(); + move || { + crate::config::watch::fs( + config.clone(), + health_check.clone(), + path.clone(), + locality.clone(), + ) + } })), } } #[tracing::instrument(level = "trace", skip_all)] - pub async fn task(task: impl FnMut() -> F) -> crate::Result<()> + pub async fn task( + health_check: Arc, + task: impl FnMut() -> F, + ) -> crate::Result<()> where F: std::future::Future>, { @@ -76,6 +94,7 @@ impl Providers { .exponential_backoff(BACKOFF_STEP) .max_delay(MAX_DELAY) .on_retry(|attempt, _, error: &eyre::Error| { + health_check.store(false, Ordering::SeqCst); let error = error.to_string(); async move { tracing::warn!(%attempt, %error, "provider task error, retrying"); diff --git a/src/config/watch/agones.rs b/src/config/watch/agones.rs index cf2e0dbfa9..5e7e1d9bfc 100644 --- a/src/config/watch/agones.rs +++ b/src/config/watch/agones.rs @@ -15,13 +15,17 @@ */ use futures::TryStreamExt; -use std::sync::Arc; +use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, +}; use crate::{endpoint::Locality, Config}; pub async fn watch( gameservers_namespace: impl AsRef, config_namespace: impl AsRef, + health_check: Arc, locality: Option, config: Arc, ) -> crate::Result<()> { @@ -45,11 +49,15 @@ pub async fn watch( tokio::pin!(gameserver_reflector); loop { - let Some(_) = tokio::select! { - result = configmap_reflector.try_next() => result?, - result = gameserver_reflector.try_next() => result?, - } else { - break Ok(()); + let result = tokio::select! { + result = configmap_reflector.try_next() => result, + result = gameserver_reflector.try_next() => result, }; + + match result { + Ok(Some(_)) => health_check.store(true, Ordering::SeqCst), + Ok(None) => break Err(eyre::eyre!("kubernetes watch stream terminated")), + Err(error) => break Err(error), + } } } diff --git a/src/config/watch/fs.rs b/src/config/watch/fs.rs index 4d699cc1b1..bbf95ec3c0 100644 --- a/src/config/watch/fs.rs +++ b/src/config/watch/fs.rs @@ -14,7 +14,10 @@ * limitations under the License. */ -use std::sync::Arc; +use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, +}; use notify::Watcher; use tracing::Instrument; @@ -23,6 +26,7 @@ use crate::Config; pub async fn watch( config: Arc, + health_check: Arc, path: impl Into, locality: Option, ) -> crate::Result<()> { @@ -42,6 +46,7 @@ pub async fn watch( let buf = tokio::fs::read(&path).await?; tracing::info!("applying initial configuration"); config.update_from_json(serde_yaml::from_slice(&buf)?, locality.clone())?; + health_check.store(true, Ordering::SeqCst); watcher.watch(&path, notify::RecursiveMode::Recursive)?; tracing::info!("watching file"); @@ -83,7 +88,7 @@ mod tests { tokio::fs::write(&file_path, serde_yaml::to_string(&source).unwrap()) .await .unwrap(); - let _handle = tokio::spawn(watch(dest.clone(), file_path.clone(), None)); + let _handle = tokio::spawn(watch(dest.clone(), <_>::default(), file_path.clone(), None)); tokio::time::sleep(std::time::Duration::from_millis(100)).await; source.clusters.modify(|clusters| { diff --git a/src/lib.rs b/src/lib.rs index 4eec6241df..475eac7136 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -16,7 +16,6 @@ #![deny(unused_must_use)] -mod admin; mod maxmind_db; mod proxy; diff --git a/src/test_utils.rs b/src/test_utils.rs index f70636a03b..a780ce4521 100644 --- a/src/test_utils.rs +++ b/src/test_utils.rs @@ -273,17 +273,14 @@ impl TestHelper { ) { let (shutdown_tx, shutdown_rx) = watch::channel::<()>(()); self.server_shutdown_tx.push(Some(shutdown_tx)); + let mode = crate::cli::Admin::Proxy(<_>::default()); if let Some(address) = with_admin { - tokio::spawn(crate::admin::server( - crate::admin::Mode::Proxy, - config.clone(), - address, - )); + mode.server(config.clone(), address); } tokio::spawn(async move { - server.run(config, shutdown_rx).await.unwrap(); + server.run(config, mode, shutdown_rx).await.unwrap(); }); } diff --git a/src/xds.rs b/src/xds.rs index 7c17261bcb..556dd6e0a9 100644 --- a/src/xds.rs +++ b/src/xds.rs @@ -198,7 +198,12 @@ mod tests { ..<_>::default() }; - tokio::spawn(async move { client_proxy.run(client_config, shutdown_rx).await }); + let proxy_admin = crate::cli::Admin::Proxy(<_>::default()); + tokio::spawn(async move { + client_proxy + .run(client_config, proxy_admin, shutdown_rx) + .await + }); tokio::time::sleep(std::time::Duration::from_millis(50)).await; tokio::time::sleep(std::time::Duration::from_millis(50)).await; @@ -296,13 +301,14 @@ mod tests { tokio::spawn(server::spawn(23456, config.clone())); let client = Client::connect( "test-client".into(), + crate::cli::Admin::Xds(<_>::default()), vec!["http://127.0.0.1:23456".try_into().unwrap()], ) .await .unwrap(); let mut stream = client.xds_client_stream( config.clone(), - crate::xds::server::IDLE_REQUEST_INTERVAL_SECS, + crate::cli::admin::IDLE_REQUEST_INTERVAL_SECS, ); tokio::time::sleep(std::time::Duration::from_millis(500)).await; diff --git a/src/xds/client.rs b/src/xds/client.rs index 7d189a9085..4881dbf0d2 100644 --- a/src/xds/client.rs +++ b/src/xds/client.rs @@ -14,7 +14,7 @@ * limitations under the License. */ -use std::{collections::HashSet, sync::Arc, time::Duration}; +use std::{collections::HashSet, sync::atomic::Ordering, sync::Arc, time::Duration}; use futures::StreamExt; use rand::Rng; @@ -27,6 +27,7 @@ use tryhard::{ }; use crate::{ + cli::Admin, config::Config, xds::{ config::core::v3::Node, @@ -106,16 +107,22 @@ pub struct Client { client: C, identifier: Arc, management_servers: Vec, + mode: Admin, } impl Client { #[tracing::instrument(skip_all, level = "trace", fields(servers = ?management_servers))] - pub async fn connect(identifier: String, management_servers: Vec) -> Result { + pub async fn connect( + identifier: String, + mode: Admin, + management_servers: Vec, + ) -> Result { let client = Self::connect_with_backoff(&management_servers).await?; Ok(Self { client, identifier: Arc::from(identifier), management_servers, + mode, }) } @@ -240,6 +247,7 @@ impl AdsStream { client, identifier, management_servers, + .. }: &AdsClient, config: Arc, idle_request_interval_secs: u64, @@ -358,12 +366,14 @@ impl MdsStream { client, identifier, management_servers, + mode, }: &MdsClient, config: Arc, ) -> Self { let mut client = client.clone(); let identifier = identifier.clone(); let management_servers = management_servers.clone(); + let mode = mode.clone(); Self::connect( identifier.clone(), move |(requests, mut rx), _| async move { @@ -390,15 +400,33 @@ impl MdsStream { let control_plane = super::server::ControlPlane::from_arc( config.clone(), - super::server::IDLE_REQUEST_INTERVAL_SECS, + mode.idle_request_interval_secs(), ); let mut stream = control_plane.stream_aggregated_resources(stream).await?; - while let Some(result) = stream.next().await { - let response = result?; - tracing::debug!(config=%serde_json::to_value(&config).unwrap(), "received discovery response"); - requests.send(response)?; + mode.unwrap_agent() + .relay_is_healthy + .store(true, Ordering::SeqCst); + + loop { + let timeout = tokio::time::timeout( + std::time::Duration::from_secs(mode.idle_request_interval_secs()), + stream.next(), + ); + + match timeout.await { + Ok(Some(result)) => { + let response = result?; + tracing::debug!(config=%serde_json::to_value(&config).unwrap(), "received discovery response"); + requests.send(response)?; + } + _ => break, + } } + mode.unwrap_agent() + .relay_is_healthy + .store(false, Ordering::SeqCst); + tracing::warn!("lost connection to relay server, retrying"); client = MdsClient::connect_with_backoff(&management_servers) .await diff --git a/src/xds/server.rs b/src/xds/server.rs index b7bdaaf07e..95d81c4899 100644 --- a/src/xds/server.rs +++ b/src/xds/server.rs @@ -38,8 +38,6 @@ use crate::{ }, }; -pub(crate) const IDLE_REQUEST_INTERVAL_SECS: u64 = 30; - #[tracing::instrument(skip_all)] pub fn spawn( port: u16, @@ -47,7 +45,7 @@ pub fn spawn( ) -> impl std::future::Future> { let server = AggregatedDiscoveryServiceServer::new(ControlPlane::from_arc( config, - IDLE_REQUEST_INTERVAL_SECS, + crate::cli::admin::IDLE_REQUEST_INTERVAL_SECS, )); let server = tonic::transport::Server::builder().add_service(server); tracing::info!("serving management server on port `{port}`"); @@ -424,7 +422,10 @@ mod tests { }; let config = Arc::new(Config::default()); - let client = ControlPlane::from_arc(config.clone(), IDLE_REQUEST_INTERVAL_SECS); + let client = ControlPlane::from_arc( + config.clone(), + crate::cli::admin::IDLE_REQUEST_INTERVAL_SECS, + ); let (tx, rx) = tokio::sync::mpsc::channel(256); let mut request = DiscoveryRequest { diff --git a/tests/qcmp.rs b/tests/qcmp.rs index 045cd044b3..82b2a49caa 100644 --- a/tests/qcmp.rs +++ b/tests/qcmp.rs @@ -50,9 +50,10 @@ async fn agent_ping() { }; let server_config = std::sync::Arc::new(quilkin::Config::default()); let (_tx, rx) = tokio::sync::watch::channel(()); + let admin = quilkin::cli::Admin::Agent(<_>::default()); tokio::spawn(async move { agent - .run(server_config, rx) + .run(server_config, admin, rx) .await .expect("Agent should run") });