diff --git a/nexus/flow-rs/src/grpc.rs b/nexus/flow-rs/src/grpc.rs index 2770f192e4..1f44573392 100644 --- a/nexus/flow-rs/src/grpc.rs +++ b/nexus/flow-rs/src/grpc.rs @@ -1,6 +1,4 @@ -use std::time::Duration; -use anyhow::Context; use catalog::WorkflowDetails; use pt::{ flow_model::{FlowJob, QRepFlowJob}, @@ -20,36 +18,6 @@ pub struct FlowGrpcClient { health_client: health_client::HealthClient, } -async fn connect_with_retries(grpc_endpoint: String) -> anyhow::Result { - let mut retries = 0; - let max_retries = 3; - let delay = Duration::from_secs(5); - - loop { - match tonic::transport::Channel::from_shared(grpc_endpoint.clone())? - .connect() - .await - { - Ok(channel) => return Ok(channel), - Err(e) if retries < max_retries => { - retries += 1; - tracing::warn!( - "Failed to connect to flow server at {}, error {}, retrying in {} seconds...", - grpc_endpoint, - e, - delay.as_secs() - ); - tokio::time::sleep(delay).await; - } - Err(e) => { - return Err(e).with_context(|| { - format!("failed to connect to flow server at {}", grpc_endpoint) - }) - } - } - } -} - impl FlowGrpcClient { // create a new grpc client to the flow server using flow server address pub async fn new(flow_server_addr: &str) -> anyhow::Result { @@ -60,8 +28,8 @@ impl FlowGrpcClient { let grpc_endpoint = format!("{}/grpc", flow_server_addr); tracing::info!("connecting to flow server at {}", grpc_endpoint); - // Create a gRPC channel and connect to the server - let channel = connect_with_retries(grpc_endpoint).await?; + // Create a gRPC channel + let channel = tonic::transport::Channel::from_shared(grpc_endpoint.clone())?.connect_lazy(); // construct a grpc client to the flow server let client = peerdb_route::flow_service_client::FlowServiceClient::new(channel.clone()); @@ -349,24 +317,21 @@ impl FlowGrpcClient { self.start_query_replication_flow(&cfg).await } - pub async fn is_healthy(&mut self) -> anyhow::Result { + pub async fn is_healthy(&mut self) -> bool { let health_check_req = tonic_health::pb::HealthCheckRequest { service: "".to_string(), }; - self.health_client - .check(health_check_req) - .await - .map_or_else( - |e| { - tracing::error!("failed to check health of flow server: {}", e); - Ok(false) - }, - |response| { - let status = response.into_inner().status; - tracing::info!("flow server health status: {:?}", status); - Ok(status == (tonic_health::ServingStatus::Serving as i32)) - }, - ) + match self.health_client.check(health_check_req).await { + Ok(response) => { + let status = response.into_inner().status; + tracing::info!("flow server health status: {:?}", status); + status == (tonic_health::ServingStatus::Serving as i32) + } + Err(e) => { + tracing::error!("failed to check health of flow server: {}", e); + false + } + } } } diff --git a/nexus/server/src/main.rs b/nexus/server/src/main.rs index cd8454c977..1e38b2e979 100644 --- a/nexus/server/src/main.rs +++ b/nexus/server/src/main.rs @@ -1362,20 +1362,14 @@ pub async fn main() -> anyhow::Result<()> { let listener = TcpListener::bind(&server_addr).await.unwrap(); tracing::info!("Listening on {}", server_addr); - let flow_server_addr = args.flow_api_url.clone(); - let mut flow_handler: Option>> = None; // log that we accept mirror commands if we have a flow server - if let Some(addr) = &flow_server_addr { - let mut handler = FlowGrpcClient::new(addr).await?; - if handler.is_healthy().await? { - flow_handler = Some(Arc::new(Mutex::new(handler))); - tracing::info!("MIRROR commands enabled, flow server: {}", addr); - } else { - tracing::info!("MIRROR commands disabled, flow server: {}", addr); - } + let flow_handler = if let Some(ref addr) = args.flow_api_url { + tracing::info!("MIRROR commands enabled"); + Some(Arc::new(Mutex::new(FlowGrpcClient::new(addr).await?))) } else { tracing::info!("MIRROR commands disabled"); - } + None + }; loop { let (mut socket, _) = listener.accept().await.unwrap();