Skip to content

Commit

Permalink
feat: introduce FlowRouteValue (#4263)
Browse files Browse the repository at this point in the history
* feat: introduce `FlowRouteKey` and `FlowRouteValue`

* feat: put `FlowRouteValue` values in flow creation

* feat: use `FlowRouteValue`

* refactor: remove `PeerLookupServiceRef` in `DdlContext`

* chore: remove unused code

* Update src/common/meta/src/key.rs

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>

* chore: apply suggestions from CR

---------

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
  • Loading branch information
WenyXu and coderabbitai[bot] authored Jul 3, 2024
1 parent 8e306f3 commit ee9a5d7
Show file tree
Hide file tree
Showing 16 changed files with 428 additions and 115 deletions.
2 changes: 0 additions & 2 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ use common_meta::key::flow::{FlowMetadataManager, FlowMetadataManagerRef};
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
use common_meta::node_manager::NodeManagerRef;
use common_meta::peer::StandalonePeerLookupService;
use common_meta::region_keeper::MemoryRegionKeeper;
use common_meta::sequence::SequenceBuilder;
use common_meta::wal_options_allocator::{WalOptionsAllocator, WalOptionsAllocatorRef};
Expand Down Expand Up @@ -566,7 +565,6 @@ impl StartCommand {
table_metadata_allocator,
flow_metadata_manager,
flow_metadata_allocator,
peer_lookup_service: Arc::new(StandalonePeerLookupService::new()),
region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
},
procedure_manager,
Expand Down
1 change: 1 addition & 0 deletions src/common/meta/src/cache/flow/table_flownode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ mod tests {
comment: "comment".to_string(),
options: Default::default(),
},
vec![],
)
.await
.unwrap();
Expand Down
3 changes: 0 additions & 3 deletions src/common/meta/src/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use crate::key::flow::FlowMetadataManagerRef;
use crate::key::table_route::PhysicalTableRouteValue;
use crate::key::TableMetadataManagerRef;
use crate::node_manager::NodeManagerRef;
use crate::peer::PeerLookupServiceRef;
use crate::region_keeper::MemoryRegionKeeperRef;
use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse};
Expand Down Expand Up @@ -146,8 +145,6 @@ pub struct DdlContext {
pub flow_metadata_manager: FlowMetadataManagerRef,
/// Allocator for flow metadata.
pub flow_metadata_allocator: FlowMetadataAllocatorRef,
/// look up peer by id.
pub peer_lookup_service: PeerLookupServiceRef,
/// controller of region failure detector.
pub region_failure_detector_controller: RegionFailureDetectorControllerRef,
}
Expand Down
41 changes: 26 additions & 15 deletions src/common/meta/src/ddl/create_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ use crate::ddl::DdlContext;
use crate::error::{self, Result};
use crate::instruction::{CacheIdent, CreateFlow};
use crate::key::flow::flow_info::FlowInfoValue;
use crate::key::flow::flow_route::FlowRouteValue;
use crate::key::table_name::TableNameKey;
use crate::key::FlowId;
use crate::key::{FlowId, FlowPartitionId};
use crate::lock_key::{CatalogLock, FlowNameLock, TableNameLock};
use crate::peer::Peer;
use crate::rpc::ddl::{CreateFlowTask, QueryContext};
Expand Down Expand Up @@ -170,9 +171,10 @@ impl CreateFlowProcedure {
// Safety: The flow id must be allocated.
let flow_id = self.data.flow_id.unwrap();
// TODO(weny): Support `or_replace`.
let (flow_info, flow_routes) = (&self.data).into();
self.context
.flow_metadata_manager
.create_flow_metadata(flow_id, (&self.data).into())
.create_flow_metadata(flow_id, flow_info, flow_routes)
.await?;
info!("Created flow metadata for flow {flow_id}");
self.data.state = CreateFlowState::InvalidateFlowCache;
Expand Down Expand Up @@ -292,7 +294,7 @@ impl From<&CreateFlowData> for CreateRequest {
}
}

impl From<&CreateFlowData> for FlowInfoValue {
impl From<&CreateFlowData> for (FlowInfoValue, Vec<(FlowPartitionId, FlowRouteValue)>) {
fn from(value: &CreateFlowData) -> Self {
let CreateFlowTask {
catalog_name,
Expand All @@ -311,17 +313,26 @@ impl From<&CreateFlowData> for FlowInfoValue {
.enumerate()
.map(|(idx, peer)| (idx as u32, peer.id))
.collect::<BTreeMap<_, _>>();

FlowInfoValue {
source_table_ids: value.source_table_ids.clone(),
sink_table_name,
flownode_ids,
catalog_name,
flow_name,
raw_sql: sql,
expire_after,
comment,
options,
}
let flow_routes = value
.peers
.iter()
.enumerate()
.map(|(idx, peer)| (idx as u32, FlowRouteValue { peer: peer.clone() }))
.collect::<Vec<_>>();

(
FlowInfoValue {
source_table_ids: value.source_table_ids.clone(),
sink_table_name,
flownode_ids,
catalog_name,
flow_name,
raw_sql: sql,
expire_after,
comment,
options,
},
flow_routes,
)
}
}
25 changes: 10 additions & 15 deletions src/common/meta/src/ddl/drop_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,17 @@ use common_procedure::{
use common_telemetry::info;
use futures::future::join_all;
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
use snafu::{ensure, ResultExt};
use strum::AsRefStr;

use super::utils::{add_peer_context_if_needed, handle_retry_error};
use crate::cache_invalidator::Context;
use crate::ddl::DdlContext;
use crate::error::{self, Result, UnexpectedSnafu};
use crate::error::{self, Result};
use crate::flow_name::FlowName;
use crate::instruction::{CacheIdent, DropFlow};
use crate::key::flow::flow_info::FlowInfoValue;
use crate::key::flow::flow_route::FlowRouteValue;
use crate::lock_key::{CatalogLock, FlowLock};
use crate::rpc::ddl::DropFlowTask;
use crate::{metrics, ClusterId};
Expand All @@ -58,6 +59,7 @@ impl DropFlowProcedure {
cluster_id,
task,
flow_info_value: None,
flow_route_values: vec![],
},
}
}
Expand Down Expand Up @@ -102,18 +104,9 @@ impl DropFlowProcedure {
let flownode_ids = &self.data.flow_info_value.as_ref().unwrap().flownode_ids;
let flow_id = self.data.task.flow_id;
let mut drop_flow_tasks = Vec::with_capacity(flownode_ids.len());
let cluster_id = self.data.cluster_id;

for flownode in flownode_ids.values() {
let peer = self
.context
.peer_lookup_service
.flownode(cluster_id, *flownode)
.await?
.with_context(|| UnexpectedSnafu {
err_msg: "Attempted to drop flow on a node that could not be found. Consider verifying node availability.",
})?;
let requester = self.context.node_manager.flownode(&peer).await;

for FlowRouteValue { peer } in &self.data.flow_route_values {
let requester = self.context.node_manager.flownode(peer).await;
let request = FlowRequest {
body: Some(flow_request::Body::Drop(DropRequest {
flow_id: Some(api::v1::FlowId { id: flow_id }),
Expand All @@ -124,12 +117,13 @@ impl DropFlowProcedure {
drop_flow_tasks.push(async move {
if let Err(err) = requester.handle(request).await {
if err.status_code() != StatusCode::FlowNotFound {
return Err(add_peer_context_if_needed(peer)(err));
return Err(add_peer_context_if_needed(peer.clone())(err));
}
}
Ok(())
});
}

join_all(drop_flow_tasks)
.await
.into_iter()
Expand Down Expand Up @@ -227,6 +221,7 @@ pub(crate) struct DropFlowData {
cluster_id: ClusterId,
task: DropFlowTask,
pub(crate) flow_info_value: Option<FlowInfoValue>,
pub(crate) flow_route_values: Vec<FlowRouteValue>,
}

/// The state of drop flow
Expand Down
19 changes: 18 additions & 1 deletion src/common/meta/src/ddl/drop_flow/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
// limitations under the License.

use common_catalog::format_full_flow_name;
use snafu::OptionExt;
use futures::TryStreamExt;
use snafu::{ensure, OptionExt};

use crate::ddl::drop_flow::DropFlowProcedure;
use crate::error::{self, Result};
Expand All @@ -32,7 +33,23 @@ impl DropFlowProcedure {
.with_context(|| error::FlowNotFoundSnafu {
flow_name: format_full_flow_name(catalog_name, flow_name),
})?;

let flow_route_values = self
.context
.flow_metadata_manager
.flow_route_manager()
.routes(self.data.task.flow_id)
.map_ok(|(_, value)| value)
.try_collect::<Vec<_>>()
.await?;
ensure!(
!flow_route_values.is_empty(),
error::FlowRouteNotFoundSnafu {
flow_name: format_full_flow_name(catalog_name, flow_name),
}
);
self.data.flow_info_value = Some(flow_info_value);
self.data.flow_route_values = flow_route_values;

Ok(())
}
Expand Down
3 changes: 1 addition & 2 deletions src/common/meta/src/ddl_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -810,7 +810,7 @@ mod tests {
use crate::key::TableMetadataManager;
use crate::kv_backend::memory::MemoryKvBackend;
use crate::node_manager::{DatanodeRef, FlownodeRef, NodeManager};
use crate::peer::{Peer, StandalonePeerLookupService};
use crate::peer::Peer;
use crate::region_keeper::MemoryRegionKeeper;
use crate::sequence::SequenceBuilder;
use crate::state_store::KvStateStore;
Expand Down Expand Up @@ -855,7 +855,6 @@ mod tests {
flow_metadata_manager,
flow_metadata_allocator,
memory_region_keeper: Arc::new(MemoryRegionKeeper::default()),
peer_lookup_service: Arc::new(StandalonePeerLookupService::new()),
region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
},
procedure_manager.clone(),
Expand Down
8 changes: 8 additions & 0 deletions src/common/meta/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,13 @@ pub enum Error {
location: Location,
},

#[snafu(display("Flow route not found: '{}'", flow_name))]
FlowRouteNotFound {
flow_name: String,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Schema nod found, schema: {}", table_schema))]
SchemaNotFound {
table_schema: String,
Expand Down Expand Up @@ -708,6 +715,7 @@ impl ErrorExt for Error {
| DelimiterNotFound { .. } => StatusCode::InvalidArguments,

FlowNotFound { .. } => StatusCode::FlowNotFound,
FlowRouteNotFound { .. } => StatusCode::Unexpected,
FlowAlreadyExists { .. } => StatusCode::FlowAlreadyExists,

ViewNotFound { .. } | TableNotFound { .. } => StatusCode::TableNotFound,
Expand Down
18 changes: 13 additions & 5 deletions src/common/meta/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,19 @@
//! 6. Flow info key: `__flow/info/{flow_id}`
//! - Stores metadata of the flow.
//!
//! 7. Flow name key: `__flow/name/{catalog}/{flow_name}`
//! 7. Flow route key: `__flow/route/{flow_id}/{partition_id}`
//! - Stores route of the flow.
//!
//! 8. Flow name key: `__flow/name/{catalog}/{flow_name}`
//! - Mapping {catalog}/{flow_name} to {flow_id}
//!
//! 8. Flownode flow key: `__flow/flownode/{flownode_id}/{flow_id}/{partition_id}`
//! 9. Flownode flow key: `__flow/flownode/{flownode_id}/{flow_id}/{partition_id}`
//! - Mapping {flownode_id} to {flow_id}
//!
//! 9. Table flow key: `__flow/source_table/{table_id}/{flownode_id}/{flow_id}/{partition_id}`
//! 10. Table flow key: `__flow/source_table/{table_id}/{flownode_id}/{flow_id}/{partition_id}`
//! - Mapping source table's {table_id} to {flownode_id}
//! - Used in `Flownode` booting.
//! 10. View info key: `__view_info/{view_id}`
//! 11. View info key: `__view_info/{view_id}`
//! - The value is a [ViewInfoValue] struct; it contains the encoded logical plan.
//! - This key is mainly used in constructing the view in Datanode and Frontend.
//!
Expand All @@ -65,6 +68,9 @@
//! __flow/
//! info/
//! {flow_id}
//! route/
//! {flow_id}/
//! {partition_id}
//!
//! name/
//! {catalog_name}
Expand Down Expand Up @@ -105,6 +111,7 @@ use common_catalog::consts::{
};
use common_telemetry::warn;
use datanode_table::{DatanodeTableKey, DatanodeTableManager, DatanodeTableValue};
use flow::flow_route::FlowRouteValue;
use lazy_static::lazy_static;
use regex::Regex;
use serde::de::DeserializeOwned;
Expand Down Expand Up @@ -1185,7 +1192,8 @@ impl_table_meta_value! {
ViewInfoValue,
DatanodeTableValue,
FlowInfoValue,
FlowNameValue
FlowNameValue,
FlowRouteValue
}

impl_optional_meta_value! {
Expand Down
Loading

0 comments on commit ee9a5d7

Please sign in to comment.