Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

peer-server: connect lazily #663

Merged
merged 1 commit into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 14 additions & 49 deletions nexus/flow-rs/src/grpc.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
use std::time::Duration;

use anyhow::Context;
use catalog::WorkflowDetails;
use pt::{
flow_model::{FlowJob, QRepFlowJob},
Expand All @@ -20,36 +18,6 @@ pub struct FlowGrpcClient {
health_client: health_client::HealthClient<tonic::transport::Channel>,
}

async fn connect_with_retries(grpc_endpoint: String) -> anyhow::Result<tonic::transport::Channel> {
let mut retries = 0;
let max_retries = 3;
let delay = Duration::from_secs(5);

loop {
match tonic::transport::Channel::from_shared(grpc_endpoint.clone())?
.connect()
.await
{
Ok(channel) => return Ok(channel),
Err(e) if retries < max_retries => {
retries += 1;
tracing::warn!(
"Failed to connect to flow server at {}, error {}, retrying in {} seconds...",
grpc_endpoint,
e,
delay.as_secs()
);
tokio::time::sleep(delay).await;
}
Err(e) => {
return Err(e).with_context(|| {
format!("failed to connect to flow server at {}", grpc_endpoint)
})
}
}
}
}

impl FlowGrpcClient {
// create a new grpc client to the flow server using flow server address
pub async fn new(flow_server_addr: &str) -> anyhow::Result<Self> {
Expand All @@ -60,8 +28,8 @@ impl FlowGrpcClient {
let grpc_endpoint = format!("{}/grpc", flow_server_addr);
tracing::info!("connecting to flow server at {}", grpc_endpoint);

// Create a gRPC channel and connect to the server
let channel = connect_with_retries(grpc_endpoint).await?;
// Create a gRPC channel
let channel = tonic::transport::Channel::from_shared(grpc_endpoint.clone())?.connect_lazy();

// construct a grpc client to the flow server
let client = peerdb_route::flow_service_client::FlowServiceClient::new(channel.clone());
Expand Down Expand Up @@ -349,24 +317,21 @@ impl FlowGrpcClient {
self.start_query_replication_flow(&cfg).await
}

pub async fn is_healthy(&mut self) -> anyhow::Result<bool> {
pub async fn is_healthy(&mut self) -> bool {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed signature to bool since this function was always returning Ok(bool). This function is no longer used

let health_check_req = tonic_health::pb::HealthCheckRequest {
service: "".to_string(),
};

self.health_client
.check(health_check_req)
.await
.map_or_else(
|e| {
tracing::error!("failed to check health of flow server: {}", e);
Ok(false)
},
|response| {
let status = response.into_inner().status;
tracing::info!("flow server health status: {:?}", status);
Ok(status == (tonic_health::ServingStatus::Serving as i32))
},
)
match self.health_client.check(health_check_req).await {
Ok(response) => {
let status = response.into_inner().status;
tracing::info!("flow server health status: {:?}", status);
status == (tonic_health::ServingStatus::Serving as i32)
}
Err(e) => {
tracing::error!("failed to check health of flow server: {}", e);
false
}
}
}
}
16 changes: 5 additions & 11 deletions nexus/server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1362,20 +1362,14 @@ pub async fn main() -> anyhow::Result<()> {
let listener = TcpListener::bind(&server_addr).await.unwrap();
tracing::info!("Listening on {}", server_addr);

let flow_server_addr = args.flow_api_url.clone();
let mut flow_handler: Option<Arc<Mutex<FlowGrpcClient>>> = None;
// log that we accept mirror commands if we have a flow server
if let Some(addr) = &flow_server_addr {
let mut handler = FlowGrpcClient::new(addr).await?;
if handler.is_healthy().await? {
flow_handler = Some(Arc::new(Mutex::new(handler)));
tracing::info!("MIRROR commands enabled, flow server: {}", addr);
} else {
tracing::info!("MIRROR commands disabled, flow server: {}", addr);
}
let flow_handler = if let Some(ref addr) = args.flow_api_url {
tracing::info!("MIRROR commands enabled");
Some(Arc::new(Mutex::new(FlowGrpcClient::new(addr).await?)))
} else {
tracing::info!("MIRROR commands disabled");
}
None
};

loop {
let (mut socket, _) = listener.accept().await.unwrap();
Expand Down