diff --git a/src/flow/src/server.rs b/src/flow/src/server.rs index 4e01958b1c4f..88195481a5cb 100644 --- a/src/flow/src/server.rs +++ b/src/flow/src/server.rs @@ -31,7 +31,7 @@ use common_meta::node_manager::{Flownode, NodeManagerRef}; use common_query::request::QueryRequest; use common_query::Output; use common_recordbatch::SendableRecordBatchStream; -use common_telemetry::tracing::info; +use common_telemetry::{info, warn}; use futures::{FutureExt, TryStreamExt}; use greptime_proto::v1::flow::{flow_server, FlowRequest, FlowResponse, InsertRequests}; use itertools::Itertools; @@ -45,7 +45,7 @@ use query::region_query::RegionQueryHandler; use query::{QueryEngine, QueryEngineFactory, QueryEngineRef}; use servers::error::{AlreadyStartedSnafu, StartGrpcSnafu, TcpBindSnafu, TcpIncomingSnafu}; use servers::server::Server; -use session::context::{QueryContextBuilder, QueryContextRef}; +use session::context::QueryContextRef; use snafu::{ensure, OptionExt, ResultExt}; use tokio::net::TcpListener; use tokio::sync::{broadcast, oneshot, Mutex}; @@ -361,12 +361,6 @@ impl FlownodeBuilder { .context(ExternalSnafu)? .context(FlowNotFoundSnafu { id: flow_id })?; - let sink_table_name = [ - info.sink_table_name().catalog_name.clone(), - info.sink_table_name().schema_name.clone(), - info.sink_table_name().table_name.clone(), - ]; - let source_table_ids = info.source_table_ids().to_vec(); for src in &source_table_ids { @@ -380,27 +374,22 @@ impl FlownodeBuilder { } } - let args = CreateFlowArgs { - flow_id: flow_id as _, - sink_table_name, - source_table_ids: info.source_table_ids().to_vec(), - // because recover should only happen on restart the `create_if_not_exists` and `or_replace` can be arbitrary value(since flow doesn't exist) - // but for the sake of consistency and to make sure recover of flow actually happen, we set both to true - // (which is also fine since checks for not allow both to be true is on metasrv and we already pass that) - create_if_not_exists: true, - or_replace: true, - expire_after: info.expire_after(), - comment: Some(info.comment().clone()), - sql: info.raw_sql().clone(), - flow_options: info.options().clone(), - query_ctx: Some( - QueryContextBuilder::default() - .current_catalog(info.catalog_name().clone()) - .build(), - ), - }; - manager.create_flow(args).await?; - did_recover.push(flow_id); + let args = CreateFlowArgs::from_flow_info(flow_id as _, info, true, true); + + match manager.create_flow(args).await { + Ok(Some(res)) => { + did_recover.push(res as FlowId); + } + Ok(None) => { + warn!( + "Failed to recover flow_id={:?}, flow already exists", + flow_id + ); + } + Err(err) => { + common_telemetry::error!(err; "Failed to recover flow_id={:?}", flow_id); + } + } } Ok(did_recover)