Skip to content

Commit

Permalink
refactor: per review
Browse files Browse the repository at this point in the history
  • Loading branch information
discord9 committed Dec 18, 2024
1 parent 139967e commit 577a5b2
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 17 deletions.
5 changes: 2 additions & 3 deletions src/catalog/src/system_schema/information_schema/flows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,9 +213,8 @@ impl InformationSchemaFlowsBuilder {
.await;

let flow_stat = {
let information_extension =
utils::information_extension(&self.catalog_manager).unwrap();
information_extension.flow_stats().await?.clone()
let information_extension = utils::information_extension(&self.catalog_manager)?;
information_extension.flow_stats().await?
};

while let Some((flow_name, flow_id)) = stream
Expand Down
18 changes: 9 additions & 9 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -769,15 +769,15 @@ impl InformationExtension for StandaloneInformationExtension {
}

async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error> {
let state = self
.flow_worker_manager
.read()
.await
.as_ref()
.unwrap()
.gen_state_report()
.await;
Ok(Some(state))
Ok(Some(
self.flow_worker_manager
.read()
.await
.as_ref()
.unwrap()
.gen_state_report()
.await,
))
}
}

Expand Down
10 changes: 7 additions & 3 deletions src/flow/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,12 @@ use crate::error::ExternalSnafu;
use crate::utils::SizeReportSender;
use crate::{Error, FlownodeOptions};

async fn query_flow_state(query_stat_size: &Option<SizeReportSender>) -> Option<FlowStat> {
async fn query_flow_state(
query_stat_size: &Option<SizeReportSender>,
timeout: Duration,
) -> Option<FlowStat> {
if let Some(report_requester) = query_stat_size.as_ref() {
let ret = report_requester.query().await;
let ret = report_requester.query(timeout).await;
match ret {
Ok(latest) => Some(latest),
Err(err) => {
Expand Down Expand Up @@ -220,7 +223,8 @@ impl HeartbeatTask {
}
// after sending heartbeat, try to get the latest report
// TODO(discord9): consider a better place to update the size report
latest_report = query_flow_state(&query_stat_size).await;
// set the timeout to half of the report interval so that it wouldn't delay heartbeat if something went horribly wrong
latest_report = query_flow_state(&query_stat_size, report_interval / 2).await;
}
});
}
Expand Down
4 changes: 2 additions & 2 deletions src/flow/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,15 @@ impl SizeReportSender {
}

/// Query the size report, will timeout after one second if no response
pub async fn query(&self) -> crate::Result<FlowStat> {
pub async fn query(&self, timeout: std::time::Duration) -> crate::Result<FlowStat> {
let (tx, rx) = oneshot::channel();
self.inner.send(tx).await.map_err(|_| {
InternalSnafu {
reason: "failed to send size report request due to receiver dropped",
}
.build()
})?;
let timeout = tokio::time::timeout(std::time::Duration::from_secs(1), rx);
let timeout = tokio::time::timeout(timeout, rx);
timeout
.await
.map_err(|_elapsed| {
Expand Down

0 comments on commit 577a5b2

Please sign in to comment.