Skip to content

Commit

Permalink
peer-server: connect lazily
Browse files Browse the repository at this point in the history
While running `dev-peerdb.sh` I saw peer-server failing due to grpc connection taking too long. Replace connect_with_retries with tonic's connect_lazy

Also remove health check, if commandline includes grpc endpoint that endpoint should be used, regardless of health
  • Loading branch information
serprex committed Nov 15, 2023
1 parent 173142e commit 13f5482
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 60 deletions.
63 changes: 14 additions & 49 deletions nexus/flow-rs/src/grpc.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
use std::time::Duration;

use anyhow::Context;
use catalog::WorkflowDetails;
use pt::{
flow_model::{FlowJob, QRepFlowJob},
Expand All @@ -20,36 +18,6 @@ pub struct FlowGrpcClient {
health_client: health_client::HealthClient<tonic::transport::Channel>,
}

async fn connect_with_retries(grpc_endpoint: String) -> anyhow::Result<tonic::transport::Channel> {
let mut retries = 0;
let max_retries = 3;
let delay = Duration::from_secs(5);

loop {
match tonic::transport::Channel::from_shared(grpc_endpoint.clone())?
.connect()
.await
{
Ok(channel) => return Ok(channel),
Err(e) if retries < max_retries => {
retries += 1;
tracing::warn!(
"Failed to connect to flow server at {}, error {}, retrying in {} seconds...",
grpc_endpoint,
e,
delay.as_secs()
);
tokio::time::sleep(delay).await;
}
Err(e) => {
return Err(e).with_context(|| {
format!("failed to connect to flow server at {}", grpc_endpoint)
})
}
}
}
}

impl FlowGrpcClient {
// create a new grpc client to the flow server using flow server address
pub async fn new(flow_server_addr: &str) -> anyhow::Result<Self> {
Expand All @@ -60,8 +28,8 @@ impl FlowGrpcClient {
let grpc_endpoint = format!("{}/grpc", flow_server_addr);
tracing::info!("connecting to flow server at {}", grpc_endpoint);

// Create a gRPC channel and connect to the server
let channel = connect_with_retries(grpc_endpoint).await?;
// Create a gRPC channel
let channel = tonic::transport::Channel::from_shared(grpc_endpoint.clone())?.connect_lazy();

// construct a grpc client to the flow server
let client = peerdb_route::flow_service_client::FlowServiceClient::new(channel.clone());
Expand Down Expand Up @@ -349,24 +317,21 @@ impl FlowGrpcClient {
self.start_query_replication_flow(&cfg).await
}

pub async fn is_healthy(&mut self) -> anyhow::Result<bool> {
pub async fn is_healthy(&mut self) -> bool {
let health_check_req = tonic_health::pb::HealthCheckRequest {
service: "".to_string(),
};

self.health_client
.check(health_check_req)
.await
.map_or_else(
|e| {
tracing::error!("failed to check health of flow server: {}", e);
Ok(false)
},
|response| {
let status = response.into_inner().status;
tracing::info!("flow server health status: {:?}", status);
Ok(status == (tonic_health::ServingStatus::Serving as i32))
},
)
match self.health_client.check(health_check_req).await {
Ok(response) => {
let status = response.into_inner().status;
tracing::info!("flow server health status: {:?}", status);
status == (tonic_health::ServingStatus::Serving as i32)
}
Err(e) => {
tracing::error!("failed to check health of flow server: {}", e);
false
}
}
}
}
16 changes: 5 additions & 11 deletions nexus/server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1362,20 +1362,14 @@ pub async fn main() -> anyhow::Result<()> {
let listener = TcpListener::bind(&server_addr).await.unwrap();
tracing::info!("Listening on {}", server_addr);

let flow_server_addr = args.flow_api_url.clone();
let mut flow_handler: Option<Arc<Mutex<FlowGrpcClient>>> = None;
// log that we accept mirror commands if we have a flow server
if let Some(addr) = &flow_server_addr {
let mut handler = FlowGrpcClient::new(addr).await?;
if handler.is_healthy().await? {
flow_handler = Some(Arc::new(Mutex::new(handler)));
tracing::info!("MIRROR commands enabled, flow server: {}", addr);
} else {
tracing::info!("MIRROR commands disabled, flow server: {}", addr);
}
let flow_handler = if let Some(ref addr) = args.flow_api_url {
tracing::info!("MIRROR commands enabled");
Some(Arc::new(Mutex::new(FlowGrpcClient::new(addr).await?)))
} else {
tracing::info!("MIRROR commands disabled");
}
None
};

loop {
let (mut socket, _) = listener.accept().await.unwrap();
Expand Down

0 comments on commit 13f5482

Please sign in to comment.