Skip to content

Commit

Permalink
refactor: more resilent flow recover
Browse files Browse the repository at this point in the history
  • Loading branch information
discord9 committed Dec 26, 2024
1 parent 294c4c5 commit ebe053d
Showing 1 changed file with 18 additions and 29 deletions.
47 changes: 18 additions & 29 deletions src/flow/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use common_meta::node_manager::{Flownode, NodeManagerRef};
use common_query::request::QueryRequest;
use common_query::Output;
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::tracing::info;
use common_telemetry::{info, warn};
use futures::{FutureExt, TryStreamExt};
use greptime_proto::v1::flow::{flow_server, FlowRequest, FlowResponse, InsertRequests};
use itertools::Itertools;
Expand All @@ -45,7 +45,7 @@ use query::region_query::RegionQueryHandler;
use query::{QueryEngine, QueryEngineFactory, QueryEngineRef};
use servers::error::{AlreadyStartedSnafu, StartGrpcSnafu, TcpBindSnafu, TcpIncomingSnafu};
use servers::server::Server;
use session::context::{QueryContextBuilder, QueryContextRef};
use session::context::QueryContextRef;
use snafu::{ensure, OptionExt, ResultExt};
use tokio::net::TcpListener;
use tokio::sync::{broadcast, oneshot, Mutex};
Expand Down Expand Up @@ -361,12 +361,6 @@ impl FlownodeBuilder {
.context(ExternalSnafu)?
.context(FlowNotFoundSnafu { id: flow_id })?;

let sink_table_name = [
info.sink_table_name().catalog_name.clone(),
info.sink_table_name().schema_name.clone(),
info.sink_table_name().table_name.clone(),
];

let source_table_ids = info.source_table_ids().to_vec();

for src in &source_table_ids {
Expand All @@ -380,27 +374,22 @@ impl FlownodeBuilder {
}
}

let args = CreateFlowArgs {
flow_id: flow_id as _,
sink_table_name,
source_table_ids: info.source_table_ids().to_vec(),
// because recover should only happen on restart the `create_if_not_exists` and `or_replace` can be arbitrary value(since flow doesn't exist)
// but for the sake of consistency and to make sure recover of flow actually happen, we set both to true
// (which is also fine since checks for not allow both to be true is on metasrv and we already pass that)
create_if_not_exists: true,
or_replace: true,
expire_after: info.expire_after(),
comment: Some(info.comment().clone()),
sql: info.raw_sql().clone(),
flow_options: info.options().clone(),
query_ctx: Some(
QueryContextBuilder::default()
.current_catalog(info.catalog_name().clone())
.build(),
),
};
manager.create_flow(args).await?;
did_recover.push(flow_id);
let args = CreateFlowArgs::from_flow_info(flow_id as _, info, true, true);

match manager.create_flow(args).await {
Ok(Some(res)) => {
did_recover.push(res as FlowId);
}
Ok(None) => {
warn!(
"Failed to recover flow_id={:?}, flow already exists",
flow_id
);
}
Err(err) => {
common_telemetry::error!(err; "Failed to recover flow_id={:?}", flow_id);
}
}
}

Ok(did_recover)
Expand Down

0 comments on commit ebe053d

Please sign in to comment.