diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 5544abc83a20..f97643c914a0 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -32,7 +32,6 @@ use common_meta::key::flow::{FlowMetadataManager, FlowMetadataManagerRef}; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::KvBackendRef; use common_meta::node_manager::NodeManagerRef; -use common_meta::peer::StandalonePeerLookupService; use common_meta::region_keeper::MemoryRegionKeeper; use common_meta::sequence::SequenceBuilder; use common_meta::wal_options_allocator::{WalOptionsAllocator, WalOptionsAllocatorRef}; @@ -566,7 +565,6 @@ impl StartCommand { table_metadata_allocator, flow_metadata_manager, flow_metadata_allocator, - peer_lookup_service: Arc::new(StandalonePeerLookupService::new()), region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl), }, procedure_manager, diff --git a/src/common/meta/src/cache/flow/table_flownode.rs b/src/common/meta/src/cache/flow/table_flownode.rs index faf62b8c36f6..b952d056ab70 100644 --- a/src/common/meta/src/cache/flow/table_flownode.rs +++ b/src/common/meta/src/cache/flow/table_flownode.rs @@ -184,6 +184,7 @@ mod tests { comment: "comment".to_string(), options: Default::default(), }, + vec![], ) .await .unwrap(); diff --git a/src/common/meta/src/ddl.rs b/src/common/meta/src/ddl.rs index 008153a94284..e49648726872 100644 --- a/src/common/meta/src/ddl.rs +++ b/src/common/meta/src/ddl.rs @@ -26,7 +26,6 @@ use crate::key::flow::FlowMetadataManagerRef; use crate::key::table_route::PhysicalTableRouteValue; use crate::key::TableMetadataManagerRef; use crate::node_manager::NodeManagerRef; -use crate::peer::PeerLookupServiceRef; use crate::region_keeper::MemoryRegionKeeperRef; use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse}; use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse}; @@ -146,8 +145,6 @@ pub struct DdlContext { pub flow_metadata_manager: FlowMetadataManagerRef, /// Allocator for flow metadata. pub flow_metadata_allocator: FlowMetadataAllocatorRef, - /// look up peer by id. - pub peer_lookup_service: PeerLookupServiceRef, /// controller of region failure detector. pub region_failure_detector_controller: RegionFailureDetectorControllerRef, } diff --git a/src/common/meta/src/ddl/create_flow.rs b/src/common/meta/src/ddl/create_flow.rs index afa437ed6ca4..2217ccf8d5dd 100644 --- a/src/common/meta/src/ddl/create_flow.rs +++ b/src/common/meta/src/ddl/create_flow.rs @@ -41,8 +41,9 @@ use crate::ddl::DdlContext; use crate::error::{self, Result}; use crate::instruction::{CacheIdent, CreateFlow}; use crate::key::flow::flow_info::FlowInfoValue; +use crate::key::flow::flow_route::FlowRouteValue; use crate::key::table_name::TableNameKey; -use crate::key::FlowId; +use crate::key::{FlowId, FlowPartitionId}; use crate::lock_key::{CatalogLock, FlowNameLock, TableNameLock}; use crate::peer::Peer; use crate::rpc::ddl::{CreateFlowTask, QueryContext}; @@ -170,9 +171,10 @@ impl CreateFlowProcedure { // Safety: The flow id must be allocated. let flow_id = self.data.flow_id.unwrap(); // TODO(weny): Support `or_replace`. + let (flow_info, flow_routes) = (&self.data).into(); self.context .flow_metadata_manager - .create_flow_metadata(flow_id, (&self.data).into()) + .create_flow_metadata(flow_id, flow_info, flow_routes) .await?; info!("Created flow metadata for flow {flow_id}"); self.data.state = CreateFlowState::InvalidateFlowCache; @@ -292,7 +294,7 @@ impl From<&CreateFlowData> for CreateRequest { } } -impl From<&CreateFlowData> for FlowInfoValue { +impl From<&CreateFlowData> for (FlowInfoValue, Vec<(FlowPartitionId, FlowRouteValue)>) { fn from(value: &CreateFlowData) -> Self { let CreateFlowTask { catalog_name, @@ -311,17 +313,26 @@ impl From<&CreateFlowData> for FlowInfoValue { .enumerate() .map(|(idx, peer)| (idx as u32, peer.id)) .collect::>(); - - FlowInfoValue { - source_table_ids: value.source_table_ids.clone(), - sink_table_name, - flownode_ids, - catalog_name, - flow_name, - raw_sql: sql, - expire_after, - comment, - options, - } + let flow_routes = value + .peers + .iter() + .enumerate() + .map(|(idx, peer)| (idx as u32, FlowRouteValue { peer: peer.clone() })) + .collect::>(); + + ( + FlowInfoValue { + source_table_ids: value.source_table_ids.clone(), + sink_table_name, + flownode_ids, + catalog_name, + flow_name, + raw_sql: sql, + expire_after, + comment, + options, + }, + flow_routes, + ) } } diff --git a/src/common/meta/src/ddl/drop_flow.rs b/src/common/meta/src/ddl/drop_flow.rs index 51b10451bcd8..eed57d446fbe 100644 --- a/src/common/meta/src/ddl/drop_flow.rs +++ b/src/common/meta/src/ddl/drop_flow.rs @@ -25,16 +25,17 @@ use common_procedure::{ use common_telemetry::info; use futures::future::join_all; use serde::{Deserialize, Serialize}; -use snafu::{ensure, OptionExt, ResultExt}; +use snafu::{ensure, ResultExt}; use strum::AsRefStr; use super::utils::{add_peer_context_if_needed, handle_retry_error}; use crate::cache_invalidator::Context; use crate::ddl::DdlContext; -use crate::error::{self, Result, UnexpectedSnafu}; +use crate::error::{self, Result}; use crate::flow_name::FlowName; use crate::instruction::{CacheIdent, DropFlow}; use crate::key::flow::flow_info::FlowInfoValue; +use crate::key::flow::flow_route::FlowRouteValue; use crate::lock_key::{CatalogLock, FlowLock}; use crate::rpc::ddl::DropFlowTask; use crate::{metrics, ClusterId}; @@ -58,6 +59,7 @@ impl DropFlowProcedure { cluster_id, task, flow_info_value: None, + flow_route_values: vec![], }, } } @@ -102,18 +104,9 @@ impl DropFlowProcedure { let flownode_ids = &self.data.flow_info_value.as_ref().unwrap().flownode_ids; let flow_id = self.data.task.flow_id; let mut drop_flow_tasks = Vec::with_capacity(flownode_ids.len()); - let cluster_id = self.data.cluster_id; - - for flownode in flownode_ids.values() { - let peer = self - .context - .peer_lookup_service - .flownode(cluster_id, *flownode) - .await? - .with_context(|| UnexpectedSnafu { - err_msg: "Attempted to drop flow on a node that could not be found. Consider verifying node availability.", - })?; - let requester = self.context.node_manager.flownode(&peer).await; + + for FlowRouteValue { peer } in &self.data.flow_route_values { + let requester = self.context.node_manager.flownode(peer).await; let request = FlowRequest { body: Some(flow_request::Body::Drop(DropRequest { flow_id: Some(api::v1::FlowId { id: flow_id }), @@ -124,12 +117,13 @@ impl DropFlowProcedure { drop_flow_tasks.push(async move { if let Err(err) = requester.handle(request).await { if err.status_code() != StatusCode::FlowNotFound { - return Err(add_peer_context_if_needed(peer)(err)); + return Err(add_peer_context_if_needed(peer.clone())(err)); } } Ok(()) }); } + join_all(drop_flow_tasks) .await .into_iter() @@ -227,6 +221,7 @@ pub(crate) struct DropFlowData { cluster_id: ClusterId, task: DropFlowTask, pub(crate) flow_info_value: Option, + pub(crate) flow_route_values: Vec, } /// The state of drop flow diff --git a/src/common/meta/src/ddl/drop_flow/metadata.rs b/src/common/meta/src/ddl/drop_flow/metadata.rs index b20a259d9103..68f99dd4b420 100644 --- a/src/common/meta/src/ddl/drop_flow/metadata.rs +++ b/src/common/meta/src/ddl/drop_flow/metadata.rs @@ -13,7 +13,8 @@ // limitations under the License. use common_catalog::format_full_flow_name; -use snafu::OptionExt; +use futures::TryStreamExt; +use snafu::{ensure, OptionExt}; use crate::ddl::drop_flow::DropFlowProcedure; use crate::error::{self, Result}; @@ -32,7 +33,23 @@ impl DropFlowProcedure { .with_context(|| error::FlowNotFoundSnafu { flow_name: format_full_flow_name(catalog_name, flow_name), })?; + + let flow_route_values = self + .context + .flow_metadata_manager + .flow_route_manager() + .routes(self.data.task.flow_id) + .map_ok(|(_, value)| value) + .try_collect::>() + .await?; + ensure!( + !flow_route_values.is_empty(), + error::FlowRouteNotFoundSnafu { + flow_name: format_full_flow_name(catalog_name, flow_name), + } + ); self.data.flow_info_value = Some(flow_info_value); + self.data.flow_route_values = flow_route_values; Ok(()) } diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index 567498a38dbc..b9adcc9fb8e8 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -810,7 +810,7 @@ mod tests { use crate::key::TableMetadataManager; use crate::kv_backend::memory::MemoryKvBackend; use crate::node_manager::{DatanodeRef, FlownodeRef, NodeManager}; - use crate::peer::{Peer, StandalonePeerLookupService}; + use crate::peer::Peer; use crate::region_keeper::MemoryRegionKeeper; use crate::sequence::SequenceBuilder; use crate::state_store::KvStateStore; @@ -855,7 +855,6 @@ mod tests { flow_metadata_manager, flow_metadata_allocator, memory_region_keeper: Arc::new(MemoryRegionKeeper::default()), - peer_lookup_service: Arc::new(StandalonePeerLookupService::new()), region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl), }, procedure_manager.clone(), diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index fdd130f8c8d0..53a8eb0aacc5 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -371,6 +371,13 @@ pub enum Error { location: Location, }, + #[snafu(display("Flow route not found: '{}'", flow_name))] + FlowRouteNotFound { + flow_name: String, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Schema nod found, schema: {}", table_schema))] SchemaNotFound { table_schema: String, @@ -708,6 +715,7 @@ impl ErrorExt for Error { | DelimiterNotFound { .. } => StatusCode::InvalidArguments, FlowNotFound { .. } => StatusCode::FlowNotFound, + FlowRouteNotFound { .. } => StatusCode::Unexpected, FlowAlreadyExists { .. } => StatusCode::FlowAlreadyExists, ViewNotFound { .. } | TableNotFound { .. } => StatusCode::TableNotFound, diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index 130f776dd539..d6ea96808d22 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -39,16 +39,19 @@ //! 6. Flow info key: `__flow/info/{flow_id}` //! - Stores metadata of the flow. //! -//! 7. Flow name key: `__flow/name/{catalog}/{flow_name}` +//! 7. Flow route key: `__flow/route/{flow_id}/{partition_id}` +//! - Stores route of the flow. +//! +//! 8. Flow name key: `__flow/name/{catalog}/{flow_name}` //! - Mapping {catalog}/{flow_name} to {flow_id} //! -//! 8. Flownode flow key: `__flow/flownode/{flownode_id}/{flow_id}/{partition_id}` +//! 9. Flownode flow key: `__flow/flownode/{flownode_id}/{flow_id}/{partition_id}` //! - Mapping {flownode_id} to {flow_id} //! -//! 9. Table flow key: `__flow/source_table/{table_id}/{flownode_id}/{flow_id}/{partition_id}` +//! 10. Table flow key: `__flow/source_table/{table_id}/{flownode_id}/{flow_id}/{partition_id}` //! - Mapping source table's {table_id} to {flownode_id} //! - Used in `Flownode` booting. -//! 10. View info key: `__view_info/{view_id}` +//! 11. View info key: `__view_info/{view_id}` //! - The value is a [ViewInfoValue] struct; it contains the encoded logical plan. //! - This key is mainly used in constructing the view in Datanode and Frontend. //! @@ -65,6 +68,9 @@ //! __flow/ //! info/ //! {flow_id} +//! route/ +//! {flow_id}/ +//! {partition_id} //! //! name/ //! {catalog_name} @@ -105,6 +111,7 @@ use common_catalog::consts::{ }; use common_telemetry::warn; use datanode_table::{DatanodeTableKey, DatanodeTableManager, DatanodeTableValue}; +use flow::flow_route::FlowRouteValue; use lazy_static::lazy_static; use regex::Regex; use serde::de::DeserializeOwned; @@ -1185,7 +1192,8 @@ impl_table_meta_value! { ViewInfoValue, DatanodeTableValue, FlowInfoValue, - FlowNameValue + FlowNameValue, + FlowRouteValue } impl_optional_meta_value! { diff --git a/src/common/meta/src/key/flow.rs b/src/common/meta/src/key/flow.rs index 07334b46b863..1bc6894664bc 100644 --- a/src/common/meta/src/key/flow.rs +++ b/src/common/meta/src/key/flow.rs @@ -14,6 +14,7 @@ pub mod flow_info; pub(crate) mod flow_name; +pub(crate) mod flow_route; pub(crate) mod flownode_flow; pub(crate) mod table_flow; @@ -21,12 +22,14 @@ use std::ops::Deref; use std::sync::Arc; use common_telemetry::info; +use flow_route::{FlowRouteKey, FlowRouteManager, FlowRouteValue}; use snafu::{ensure, OptionExt}; use self::flow_info::{FlowInfoKey, FlowInfoValue}; use self::flow_name::FlowNameKey; use self::flownode_flow::FlownodeFlowKey; use self::table_flow::TableFlowKey; +use super::FlowPartitionId; use crate::ensure_values; use crate::error::{self, Result}; use crate::key::flow::flow_info::FlowInfoManager; @@ -94,6 +97,7 @@ pub type FlowMetadataManagerRef = Arc; /// - Delete metadata of the flow. pub struct FlowMetadataManager { flow_info_manager: FlowInfoManager, + flow_route_manager: FlowRouteManager, flownode_flow_manager: FlownodeFlowManager, table_flow_manager: TableFlowManager, flow_name_manager: FlowNameManager, @@ -101,10 +105,11 @@ pub struct FlowMetadataManager { } impl FlowMetadataManager { - /// Returns a new [FlowMetadataManager]. + /// Returns a new [`FlowMetadataManager`]. pub fn new(kv_backend: KvBackendRef) -> Self { Self { flow_info_manager: FlowInfoManager::new(kv_backend.clone()), + flow_route_manager: FlowRouteManager::new(kv_backend.clone()), flow_name_manager: FlowNameManager::new(kv_backend.clone()), flownode_flow_manager: FlownodeFlowManager::new(kv_backend.clone()), table_flow_manager: TableFlowManager::new(kv_backend.clone()), @@ -112,22 +117,27 @@ impl FlowMetadataManager { } } - /// Returns the [FlowNameManager]. + /// Returns the [`FlowNameManager`]. pub fn flow_name_manager(&self) -> &FlowNameManager { &self.flow_name_manager } - /// Returns the [FlowManager]. + /// Returns the [`FlowInfoManager`]. pub fn flow_info_manager(&self) -> &FlowInfoManager { &self.flow_info_manager } - /// Returns the [FlownodeFlowManager]. + /// Returns the [`FlowRouteManager`]. + pub fn flow_route_manager(&self) -> &FlowRouteManager { + &self.flow_route_manager + } + + /// Returns the [`FlownodeFlowManager`]. pub fn flownode_flow_manager(&self) -> &FlownodeFlowManager { &self.flownode_flow_manager } - /// Returns the [TableFlowManager]. + /// Returns the [`TableFlowManager`]. pub fn table_flow_manager(&self) -> &TableFlowManager { &self.table_flow_manager } @@ -136,36 +146,42 @@ impl FlowMetadataManager { pub async fn create_flow_metadata( &self, flow_id: FlowId, - flow_value: FlowInfoValue, + flow_info: FlowInfoValue, + flow_routes: Vec<(FlowPartitionId, FlowRouteValue)>, ) -> Result<()> { let (create_flow_flow_name_txn, on_create_flow_flow_name_failure) = self .flow_name_manager - .build_create_txn(&flow_value.catalog_name, &flow_value.flow_name, flow_id)?; + .build_create_txn(&flow_info.catalog_name, &flow_info.flow_name, flow_id)?; let (create_flow_txn, on_create_flow_failure) = self .flow_info_manager - .build_create_txn(flow_id, &flow_value)?; + .build_create_txn(flow_id, &flow_info)?; + + let create_flow_routes_txn = self + .flow_route_manager + .build_create_txn(flow_id, flow_routes)?; let create_flownode_flow_txn = self .flownode_flow_manager - .build_create_txn(flow_id, flow_value.flownode_ids().clone()); + .build_create_txn(flow_id, flow_info.flownode_ids().clone()); let create_table_flow_txn = self.table_flow_manager.build_create_txn( flow_id, - flow_value.flownode_ids().clone(), - flow_value.source_table_ids(), + flow_info.flownode_ids().clone(), + flow_info.source_table_ids(), ); let txn = Txn::merge_all(vec![ create_flow_flow_name_txn, create_flow_txn, + create_flow_routes_txn, create_flownode_flow_txn, create_table_flow_txn, ]); info!( "Creating flow {}.{}({}), with {} txn operations", - flow_value.catalog_name, - flow_value.flow_name, + flow_info.catalog_name, + flow_info.flow_name, flow_id, txn.max_operations() ); @@ -185,14 +201,14 @@ impl FlowMetadataManager { if remote_flow_flow_name.flow_id() != flow_id { info!( "Trying to create flow {}.{}({}), but flow({}) already exists", - flow_value.catalog_name, - flow_value.flow_name, + flow_info.catalog_name, + flow_info.flow_name, flow_id, remote_flow_flow_name.flow_id() ); return error::FlowAlreadyExistsSnafu { - flow_name: format!("{}.{}", flow_value.catalog_name, flow_value.flow_name), + flow_name: format!("{}.{}", flow_info.catalog_name, flow_info.flow_name), } .fail(); } @@ -204,7 +220,7 @@ impl FlowMetadataManager { ), })?; let op_name = "creating flow"; - ensure_values!(*remote_flow, flow_value, op_name); + ensure_values!(*remote_flow, flow_info, op_name); } Ok(()) @@ -213,7 +229,7 @@ impl FlowMetadataManager { fn flow_metadata_keys(&self, flow_id: FlowId, flow_value: &FlowInfoValue) -> Vec> { let source_table_ids = flow_value.source_table_ids(); let mut keys = - Vec::with_capacity(2 + flow_value.flownode_ids.len() * (source_table_ids.len() + 1)); + Vec::with_capacity(2 + flow_value.flownode_ids.len() * (source_table_ids.len() + 2)); // Builds flow name key let flow_name = FlowNameKey::new(&flow_value.catalog_name, &flow_value.flow_name); keys.push(flow_name.to_bytes()); @@ -228,14 +244,13 @@ impl FlowMetadataManager { .iter() .for_each(|(&partition_id, &flownode_id)| { keys.push(FlownodeFlowKey::new(flownode_id, flow_id, partition_id).to_bytes()); - + keys.push(FlowRouteKey::new(flow_id, partition_id).to_bytes()); source_table_ids.iter().for_each(|&table_id| { keys.push( TableFlowKey::new(table_id, flownode_id, flow_id, partition_id).to_bytes(), ); }) }); - keys } @@ -268,6 +283,7 @@ mod tests { use crate::key::flow::table_flow::TableFlowKey; use crate::key::FlowPartitionId; use crate::kv_backend::memory::MemoryKvBackend; + use crate::peer::Peer; use crate::FlownodeId; #[derive(Debug)] @@ -339,13 +355,27 @@ mod tests { let flow_metadata_manager = FlowMetadataManager::new(mem_kv.clone()); let flow_id = 10; let flow_value = test_flow_info_value("flow", [(0, 1u64)].into(), vec![1024, 1025, 1026]); + let flow_routes = vec![ + ( + 1u32, + FlowRouteValue { + peer: Peer::empty(1), + }, + ), + ( + 2, + FlowRouteValue { + peer: Peer::empty(2), + }, + ), + ]; flow_metadata_manager - .create_flow_metadata(flow_id, flow_value.clone()) + .create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone()) .await .unwrap(); // Creates again. flow_metadata_manager - .create_flow_metadata(flow_id, flow_value.clone()) + .create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone()) .await .unwrap(); let got = flow_metadata_manager @@ -354,6 +384,29 @@ mod tests { .await .unwrap() .unwrap(); + let routes = flow_metadata_manager + .flow_route_manager() + .routes(flow_id) + .try_collect::>() + .await + .unwrap(); + assert_eq!( + routes, + vec![ + ( + FlowRouteKey::new(flow_id, 1), + FlowRouteValue { + peer: Peer::empty(1), + }, + ), + ( + FlowRouteKey::new(flow_id, 2), + FlowRouteValue { + peer: Peer::empty(2), + }, + ), + ] + ); assert_eq!(got, flow_value); let flows = flow_metadata_manager .flownode_flow_manager() @@ -379,13 +432,27 @@ mod tests { let flow_metadata_manager = FlowMetadataManager::new(mem_kv); let flow_id = 10; let flow_value = test_flow_info_value("flow", [(0, 1u64)].into(), vec![1024, 1025, 1026]); + let flow_routes = vec![ + ( + 1u32, + FlowRouteValue { + peer: Peer::empty(1), + }, + ), + ( + 2, + FlowRouteValue { + peer: Peer::empty(2), + }, + ), + ]; flow_metadata_manager - .create_flow_metadata(flow_id, flow_value.clone()) + .create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone()) .await .unwrap(); // Creates again let err = flow_metadata_manager - .create_flow_metadata(flow_id + 1, flow_value) + .create_flow_metadata(flow_id + 1, flow_value, flow_routes.clone()) .await .unwrap_err(); assert_matches!(err, error::Error::FlowAlreadyExists { .. }); @@ -398,8 +465,22 @@ mod tests { let flow_id = 10; let catalog_name = "greptime"; let flow_value = test_flow_info_value("flow", [(0, 1u64)].into(), vec![1024, 1025, 1026]); + let flow_routes = vec![ + ( + 1u32, + FlowRouteValue { + peer: Peer::empty(1), + }, + ), + ( + 2, + FlowRouteValue { + peer: Peer::empty(2), + }, + ), + ]; flow_metadata_manager - .create_flow_metadata(flow_id, flow_value.clone()) + .create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone()) .await .unwrap(); // Creates again. @@ -420,7 +501,7 @@ mod tests { options: Default::default(), }; let err = flow_metadata_manager - .create_flow_metadata(flow_id, flow_value) + .create_flow_metadata(flow_id, flow_value, flow_routes.clone()) .await .unwrap_err(); assert!(err.to_string().contains("Reads the different value")); @@ -432,8 +513,14 @@ mod tests { let flow_metadata_manager = FlowMetadataManager::new(mem_kv.clone()); let flow_id = 10; let flow_value = test_flow_info_value("flow", [(0, 1u64)].into(), vec![1024, 1025, 1026]); + let flow_routes = vec![( + 0u32, + FlowRouteValue { + peer: Peer::empty(1), + }, + )]; flow_metadata_manager - .create_flow_metadata(flow_id, flow_value.clone()) + .create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone()) .await .unwrap(); diff --git a/src/common/meta/src/key/flow/flow_route.rs b/src/common/meta/src/key/flow/flow_route.rs new file mode 100644 index 000000000000..e7d179ab3740 --- /dev/null +++ b/src/common/meta/src/key/flow/flow_route.rs @@ -0,0 +1,235 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use futures::stream::BoxStream; +use lazy_static::lazy_static; +use regex::Regex; +use serde::{Deserialize, Serialize}; +use snafu::OptionExt; + +use crate::error::{self, Result}; +use crate::key::flow::FlowScoped; +use crate::key::{BytesAdapter, FlowId, FlowPartitionId, MetaKey, TableMetaValue}; +use crate::kv_backend::txn::{Txn, TxnOp}; +use crate::kv_backend::KvBackendRef; +use crate::peer::Peer; +use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE}; +use crate::rpc::store::RangeRequest; +use crate::rpc::KeyValue; + +const FLOW_ROUTE_KEY_PREFIX: &str = "route"; + +lazy_static! { + static ref FLOW_ROUTE_KEY_PATTERN: Regex = + Regex::new(&format!("^{FLOW_ROUTE_KEY_PREFIX}/([0-9]+)/([0-9]+)$")).unwrap(); +} + +/// The key stores the route info of the flow. +/// +/// The layout: `__flow/route/{flow_id}/{partition_id}`. +#[derive(Debug, PartialEq)] +pub struct FlowRouteKey(FlowScoped); + +impl FlowRouteKey { + /// Returns a new [FlowRouteKey]. + pub fn new(flow_id: FlowId, partition_id: FlowPartitionId) -> FlowRouteKey { + let inner = FlowRouteKeyInner::new(flow_id, partition_id); + FlowRouteKey(FlowScoped::new(inner)) + } + + /// The prefix used to retrieve all [FlowRouteKey]s with the specified `flow_id`. + pub fn range_start_key(flow_id: FlowId) -> Vec { + let inner = BytesAdapter::from(FlowRouteKeyInner::prefix(flow_id).into_bytes()); + + FlowScoped::new(inner).to_bytes() + } + + /// Returns the [`FlowId`] + pub fn flow_id(&self) -> FlowId { + self.0.flow_id + } + + /// Returns the [`FlowPartitionId`] + pub fn partition_id(&self) -> FlowPartitionId { + self.0.partition_id + } +} + +impl<'a> MetaKey<'a, FlowRouteKey> for FlowRouteKey { + fn to_bytes(&self) -> Vec { + self.0.to_bytes() + } + + fn from_bytes(bytes: &'a [u8]) -> Result { + Ok(FlowRouteKey(FlowScoped::::from_bytes( + bytes, + )?)) + } +} + +/// The key of flow route metadata. +#[derive(Debug, Clone, Copy, PartialEq)] +struct FlowRouteKeyInner { + flow_id: FlowId, + partition_id: FlowPartitionId, +} + +impl FlowRouteKeyInner { + /// Returns a [FlowRouteKeyInner] with the specified `flow_id` and `partition_id`. + pub fn new(flow_id: FlowId, partition_id: FlowPartitionId) -> FlowRouteKeyInner { + FlowRouteKeyInner { + flow_id, + partition_id, + } + } + + fn prefix(flow_id: FlowId) -> String { + format!("{}/{flow_id}/", FLOW_ROUTE_KEY_PREFIX) + } +} + +impl<'a> MetaKey<'a, FlowRouteKeyInner> for FlowRouteKeyInner { + fn to_bytes(&self) -> Vec { + format!( + "{FLOW_ROUTE_KEY_PREFIX}/{}/{}", + self.flow_id, self.partition_id + ) + .into_bytes() + } + + fn from_bytes(bytes: &'a [u8]) -> Result { + let key = std::str::from_utf8(bytes).map_err(|e| { + error::InvalidTableMetadataSnafu { + err_msg: format!( + "FlowInfoKeyInner '{}' is not a valid UTF8 string: {e}", + String::from_utf8_lossy(bytes) + ), + } + .build() + })?; + let captures = + FLOW_ROUTE_KEY_PATTERN + .captures(key) + .context(error::InvalidTableMetadataSnafu { + err_msg: format!("Invalid FlowInfoKeyInner '{key}'"), + })?; + // Safety: pass the regex check above + let flow_id = captures[1].parse::().unwrap(); + let partition_id = captures[2].parse::().unwrap(); + + Ok(FlowRouteKeyInner { + flow_id, + partition_id, + }) + } +} + +/// The route info of flow. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct FlowRouteValue { + pub(crate) peer: Peer, +} + +impl FlowRouteValue { + /// Returns the `peer`. + pub fn peer(&self) -> &Peer { + &self.peer + } +} + +/// Decodes `KeyValue` to ([`FlowRouteKey`],[`FlowRouteValue`]). +pub fn flow_route_decoder(kv: KeyValue) -> Result<(FlowRouteKey, FlowRouteValue)> { + let key = FlowRouteKey::from_bytes(&kv.key)?; + let value = FlowRouteValue::try_from_raw_value(&kv.value)?; + Ok((key, value)) +} + +/// The manager of [FlowRouteKey]. +pub struct FlowRouteManager { + kv_backend: KvBackendRef, +} + +impl FlowRouteManager { + /// Returns a new [FlowRouteManager]. + pub fn new(kv_backend: KvBackendRef) -> Self { + Self { kv_backend } + } + + /// Retrieves all [FlowRouteValue]s of the specified `flow_id`. + pub fn routes( + &self, + flow_id: FlowId, + ) -> BoxStream<'static, Result<(FlowRouteKey, FlowRouteValue)>> { + let start_key = FlowRouteKey::range_start_key(flow_id); + let req = RangeRequest::new().with_prefix(start_key); + let stream = PaginationStream::new( + self.kv_backend.clone(), + req, + DEFAULT_PAGE_SIZE, + Arc::new(flow_route_decoder), + ); + + Box::pin(stream) + } + + /// Builds a create flow routes transaction. + /// + /// Puts `__flow/route/{flownode_id}/{partitions}` keys. + pub(crate) fn build_create_txn>( + &self, + flow_id: FlowId, + flow_routes: I, + ) -> Result { + let txns = flow_routes + .into_iter() + .map(|(partition_id, route)| { + let key = FlowRouteKey::new(flow_id, partition_id).to_bytes(); + + Ok(TxnOp::Put(key, route.try_as_raw_value()?)) + }) + .collect::>>()?; + + Ok(Txn::new().and_then(txns)) + } +} + +#[cfg(test)] +mod tests { + use super::FlowRouteKey; + use crate::key::MetaKey; + + #[test] + fn test_key_serialization() { + let flow_route_key = FlowRouteKey::new(1, 2); + assert_eq!(b"__flow/route/1/2".to_vec(), flow_route_key.to_bytes()); + } + + #[test] + fn test_key_deserialization() { + let bytes = b"__flow/route/1/2".to_vec(); + let key = FlowRouteKey::from_bytes(&bytes).unwrap(); + assert_eq!(key.flow_id(), 1); + assert_eq!(key.partition_id(), 2); + } + + #[test] + fn test_key_start_range() { + assert_eq!( + b"__flow/route/2/".to_vec(), + FlowRouteKey::range_start_key(2) + ); + } +} diff --git a/src/common/meta/src/peer.rs b/src/common/meta/src/peer.rs index 6151bc6d3c9b..af1739ef91bf 100644 --- a/src/common/meta/src/peer.rs +++ b/src/common/meta/src/peer.rs @@ -77,41 +77,3 @@ pub trait PeerLookupService { } pub type PeerLookupServiceRef = Arc; - -/// always return `Peer::new(0, "")` for any query -pub struct StandalonePeerLookupService { - default_peer: Peer, -} - -impl StandalonePeerLookupService { - pub fn new() -> Self { - Self { - default_peer: Peer::new(0, ""), - } - } -} - -impl Default for StandalonePeerLookupService { - fn default() -> Self { - Self::new() - } -} - -#[async_trait::async_trait] -impl PeerLookupService for StandalonePeerLookupService { - async fn datanode( - &self, - _cluster_id: ClusterId, - _id: DatanodeId, - ) -> Result, Error> { - Ok(Some(self.default_peer.clone())) - } - - async fn flownode( - &self, - _cluster_id: ClusterId, - _id: FlownodeId, - ) -> Result, Error> { - Ok(Some(self.default_peer.clone())) - } -} diff --git a/src/common/meta/src/test_util.rs b/src/common/meta/src/test_util.rs index 44c534dc32d8..3ceb47310885 100644 --- a/src/common/meta/src/test_util.rs +++ b/src/common/meta/src/test_util.rs @@ -33,7 +33,7 @@ use crate::kv_backend::KvBackendRef; use crate::node_manager::{ Datanode, DatanodeRef, Flownode, FlownodeRef, NodeManager, NodeManagerRef, }; -use crate::peer::{Peer, PeerLookupService, StandalonePeerLookupService}; +use crate::peer::{Peer, PeerLookupService}; use crate::region_keeper::MemoryRegionKeeper; use crate::sequence::SequenceBuilder; use crate::wal_options_allocator::WalOptionsAllocator; @@ -181,7 +181,6 @@ pub fn new_ddl_context_with_kv_backend( table_metadata_manager, flow_metadata_allocator, flow_metadata_manager, - peer_lookup_service: Arc::new(StandalonePeerLookupService::new()), region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl), } } diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index c8d351b3e63a..b6d1251c72b7 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -352,7 +352,6 @@ impl MetasrvBuilder { table_metadata_allocator: table_metadata_allocator.clone(), flow_metadata_manager: flow_metadata_manager.clone(), flow_metadata_allocator: flow_metadata_allocator.clone(), - peer_lookup_service, region_failure_detector_controller, }, procedure_manager.clone(), diff --git a/src/meta-srv/src/procedure/utils.rs b/src/meta-srv/src/procedure/utils.rs index eda1ae7cdf5b..67b0f496c520 100644 --- a/src/meta-srv/src/procedure/utils.rs +++ b/src/meta-srv/src/procedure/utils.rs @@ -119,7 +119,7 @@ pub mod test_data { use common_meta::key::TableMetadataManager; use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::node_manager::NodeManagerRef; - use common_meta::peer::{Peer, StandalonePeerLookupService}; + use common_meta::peer::Peer; use common_meta::region_keeper::MemoryRegionKeeper; use common_meta::rpc::router::RegionRoute; use common_meta::sequence::SequenceBuilder; @@ -225,7 +225,6 @@ pub mod test_data { flow_metadata_manager, flow_metadata_allocator, memory_region_keeper: Arc::new(MemoryRegionKeeper::new()), - peer_lookup_service: Arc::new(StandalonePeerLookupService::new()), region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl), } } diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index d34604318ae2..8a05ff81be14 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -28,7 +28,6 @@ use common_meta::ddl_manager::DdlManager; use common_meta::key::flow::FlowMetadataManager; use common_meta::key::TableMetadataManager; use common_meta::kv_backend::KvBackendRef; -use common_meta::peer::StandalonePeerLookupService; use common_meta::region_keeper::MemoryRegionKeeper; use common_meta::sequence::SequenceBuilder; use common_meta::wal_options_allocator::WalOptionsAllocator; @@ -197,7 +196,6 @@ impl GreptimeDbStandaloneBuilder { table_metadata_allocator, flow_metadata_manager, flow_metadata_allocator, - peer_lookup_service: Arc::new(StandalonePeerLookupService::new()), region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl), }, procedure_manager.clone(),