diff --git a/src/flow/src/heartbeat.rs b/src/flow/src/heartbeat.rs index 9cef02eac13c..07bc7e5ed7e1 100644 --- a/src/flow/src/heartbeat.rs +++ b/src/flow/src/heartbeat.rs @@ -103,6 +103,11 @@ impl HeartbeatTask { warn!("Heartbeat task started multiple times"); return Ok(()); } + + self.create_streams().await + } + + pub async fn create_streams(&self) -> Result<(), Error> { info!("Start to establish the heartbeat connection to metasrv."); let (req_sender, resp_stream) = self .meta_client @@ -231,6 +236,8 @@ impl HeartbeatTask { // set the timeout to half of the report interval so that it wouldn't delay heartbeat if something went horribly wrong latest_report = query_flow_state(&query_stat_size, report_interval / 2).await; } + + info!("flownode heartbeat task stopped."); }); } @@ -274,7 +281,7 @@ impl HeartbeatTask { info!("Try to re-establish the heartbeat connection to metasrv."); - if self.start().await.is_ok() { + if self.create_streams().await.is_ok() { break; } }