From 053f5bd3373992e7a91cb106450e60f0fc5423db Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Wed, 15 Nov 2023 17:44:06 +0000 Subject: [PATCH] peer-server: connect lazily (#663) While running `dev-peerdb.sh` I saw peer-server failing due to grpc connection taking too long. Replace connect_with_retries with tonic's connect_lazy Also remove health check, if commandline includes grpc endpoint that endpoint should be used, regardless of health The underlying principle of this PR is that grpc server should be able to come up & down as it pleases & peer-server will use it when it's up (& ideally fail loudly but non-terminally while it's down). I'm not sure this PR achieves that principle tho --- nexus/flow-rs/src/grpc.rs | 63 +++++++++------------------------------ nexus/server/src/main.rs | 16 ++++------ 2 files changed, 19 insertions(+), 60 deletions(-) diff --git a/nexus/flow-rs/src/grpc.rs b/nexus/flow-rs/src/grpc.rs index 2770f192e4..1f44573392 100644 --- a/nexus/flow-rs/src/grpc.rs +++ b/nexus/flow-rs/src/grpc.rs @@ -1,6 +1,4 @@ -use std::time::Duration; -use anyhow::Context; use catalog::WorkflowDetails; use pt::{ flow_model::{FlowJob, QRepFlowJob}, @@ -20,36 +18,6 @@ pub struct FlowGrpcClient { health_client: health_client::HealthClient, } -async fn connect_with_retries(grpc_endpoint: String) -> anyhow::Result { - let mut retries = 0; - let max_retries = 3; - let delay = Duration::from_secs(5); - - loop { - match tonic::transport::Channel::from_shared(grpc_endpoint.clone())? - .connect() - .await - { - Ok(channel) => return Ok(channel), - Err(e) if retries < max_retries => { - retries += 1; - tracing::warn!( - "Failed to connect to flow server at {}, error {}, retrying in {} seconds...", - grpc_endpoint, - e, - delay.as_secs() - ); - tokio::time::sleep(delay).await; - } - Err(e) => { - return Err(e).with_context(|| { - format!("failed to connect to flow server at {}", grpc_endpoint) - }) - } - } - } -} - impl FlowGrpcClient { // create a new grpc client to the flow server using flow server address pub async fn new(flow_server_addr: &str) -> anyhow::Result { @@ -60,8 +28,8 @@ impl FlowGrpcClient { let grpc_endpoint = format!("{}/grpc", flow_server_addr); tracing::info!("connecting to flow server at {}", grpc_endpoint); - // Create a gRPC channel and connect to the server - let channel = connect_with_retries(grpc_endpoint).await?; + // Create a gRPC channel + let channel = tonic::transport::Channel::from_shared(grpc_endpoint.clone())?.connect_lazy(); // construct a grpc client to the flow server let client = peerdb_route::flow_service_client::FlowServiceClient::new(channel.clone()); @@ -349,24 +317,21 @@ impl FlowGrpcClient { self.start_query_replication_flow(&cfg).await } - pub async fn is_healthy(&mut self) -> anyhow::Result { + pub async fn is_healthy(&mut self) -> bool { let health_check_req = tonic_health::pb::HealthCheckRequest { service: "".to_string(), }; - self.health_client - .check(health_check_req) - .await - .map_or_else( - |e| { - tracing::error!("failed to check health of flow server: {}", e); - Ok(false) - }, - |response| { - let status = response.into_inner().status; - tracing::info!("flow server health status: {:?}", status); - Ok(status == (tonic_health::ServingStatus::Serving as i32)) - }, - ) + match self.health_client.check(health_check_req).await { + Ok(response) => { + let status = response.into_inner().status; + tracing::info!("flow server health status: {:?}", status); + status == (tonic_health::ServingStatus::Serving as i32) + } + Err(e) => { + tracing::error!("failed to check health of flow server: {}", e); + false + } + } } } diff --git a/nexus/server/src/main.rs b/nexus/server/src/main.rs index cd8454c977..1e38b2e979 100644 --- a/nexus/server/src/main.rs +++ b/nexus/server/src/main.rs @@ -1362,20 +1362,14 @@ pub async fn main() -> anyhow::Result<()> { let listener = TcpListener::bind(&server_addr).await.unwrap(); tracing::info!("Listening on {}", server_addr); - let flow_server_addr = args.flow_api_url.clone(); - let mut flow_handler: Option>> = None; // log that we accept mirror commands if we have a flow server - if let Some(addr) = &flow_server_addr { - let mut handler = FlowGrpcClient::new(addr).await?; - if handler.is_healthy().await? { - flow_handler = Some(Arc::new(Mutex::new(handler))); - tracing::info!("MIRROR commands enabled, flow server: {}", addr); - } else { - tracing::info!("MIRROR commands disabled, flow server: {}", addr); - } + let flow_handler = if let Some(ref addr) = args.flow_api_url { + tracing::info!("MIRROR commands enabled"); + Some(Arc::new(Mutex::new(FlowGrpcClient::new(addr).await?))) } else { tracing::info!("MIRROR commands disabled"); - } + None + }; loop { let (mut socket, _) = listener.accept().await.unwrap();