diff --git a/src/catalog/src/system_schema/information_schema/flows.rs b/src/catalog/src/system_schema/information_schema/flows.rs index 36721f70e827..5d35cfbbe431 100644 --- a/src/catalog/src/system_schema/information_schema/flows.rs +++ b/src/catalog/src/system_schema/information_schema/flows.rs @@ -213,9 +213,8 @@ impl InformationSchemaFlowsBuilder { .await; let flow_stat = { - let information_extension = - utils::information_extension(&self.catalog_manager).unwrap(); - information_extension.flow_stats().await?.clone() + let information_extension = utils::information_extension(&self.catalog_manager)?; + information_extension.flow_stats().await? }; while let Some((flow_name, flow_id)) = stream diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index bd3e151e88bc..8490e14147b2 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -769,15 +769,15 @@ impl InformationExtension for StandaloneInformationExtension { } async fn flow_stats(&self) -> std::result::Result, Self::Error> { - let state = self - .flow_worker_manager - .read() - .await - .as_ref() - .unwrap() - .gen_state_report() - .await; - Ok(Some(state)) + Ok(Some( + self.flow_worker_manager + .read() + .await + .as_ref() + .unwrap() + .gen_state_report() + .await, + )) } } diff --git a/src/flow/src/heartbeat.rs b/src/flow/src/heartbeat.rs index 60d1fddd9224..69159d1d2a01 100644 --- a/src/flow/src/heartbeat.rs +++ b/src/flow/src/heartbeat.rs @@ -38,9 +38,12 @@ use crate::error::ExternalSnafu; use crate::utils::SizeReportSender; use crate::{Error, FlownodeOptions}; -async fn query_flow_state(query_stat_size: &Option) -> Option { +async fn query_flow_state( + query_stat_size: &Option, + timeout: Duration, +) -> Option { if let Some(report_requester) = query_stat_size.as_ref() { - let ret = report_requester.query().await; + let ret = report_requester.query(timeout).await; match ret { Ok(latest) => Some(latest), Err(err) => { @@ -220,7 +223,8 @@ impl HeartbeatTask { } // after sending heartbeat, try to get the latest report // TODO(discord9): consider a better place to update the size report - latest_report = query_flow_state(&query_stat_size).await; + // set the timeout to half of the report interval so that it wouldn't delay heartbeat if something went horribly wrong + latest_report = query_flow_state(&query_stat_size, report_interval / 2).await; } }); } diff --git a/src/flow/src/utils.rs b/src/flow/src/utils.rs index b050b29335c1..5e01d0bfa423 100644 --- a/src/flow/src/utils.rs +++ b/src/flow/src/utils.rs @@ -56,7 +56,7 @@ impl SizeReportSender { } /// Query the size report, will timeout after one second if no response - pub async fn query(&self) -> crate::Result { + pub async fn query(&self, timeout: std::time::Duration) -> crate::Result { let (tx, rx) = oneshot::channel(); self.inner.send(tx).await.map_err(|_| { InternalSnafu { @@ -64,7 +64,7 @@ impl SizeReportSender { } .build() })?; - let timeout = tokio::time::timeout(std::time::Duration::from_secs(1), rx); + let timeout = tokio::time::timeout(timeout, rx); timeout .await .map_err(|_elapsed| {