From c7f4bc4c8551e77a7c60b71a2dcdd33623d8490e Mon Sep 17 00:00:00 2001 From: WenyXu Date: Tue, 3 Dec 2024 07:46:08 +0000 Subject: [PATCH] feat: introduce cli tool for switching WAL --- src/cmd/src/cli.rs | 6 + src/cmd/src/cli/wal_switch.rs | 192 +++++++++++++++++ src/cmd/src/error.rs | 25 +++ src/common/meta/src/key.rs | 2 +- src/common/meta/src/kv_backend/test.rs | 10 +- src/common/meta/src/kv_backend/txn.rs | 2 +- src/common/meta/src/lib.rs | 2 +- src/common/meta/src/range_stream.rs | 2 +- src/common/meta/src/rpc/store.rs | 20 +- src/common/meta/src/{util.rs => utils.rs} | 6 + src/common/meta/src/utils/transformer.rs | 202 ++++++++++++++++++ .../reallocate_region_wal_options.rs | 159 ++++++++++++++ src/log-store/src/raft_engine/backend.rs | 2 +- src/meta-client/src/client/heartbeat.rs | 4 +- src/meta-srv/src/cluster.rs | 4 +- src/meta-srv/src/lease.rs | 4 +- 16 files changed, 616 insertions(+), 26 deletions(-) create mode 100644 src/cmd/src/cli/wal_switch.rs rename src/common/meta/src/{util.rs => utils.rs} (92%) create mode 100644 src/common/meta/src/utils/transformer.rs create mode 100644 src/common/meta/src/utils/transformer/reallocate_region_wal_options.rs diff --git a/src/cmd/src/cli.rs b/src/cmd/src/cli.rs index fc43e0997665..a553dfd15670 100644 --- a/src/cmd/src/cli.rs +++ b/src/cmd/src/cli.rs @@ -25,6 +25,7 @@ mod database; mod import; #[allow(unused)] mod repl; +mod wal_switch; use async_trait::async_trait; use bench::BenchTableMetadataCommand; @@ -32,6 +33,7 @@ use clap::Parser; use common_telemetry::logging::{LoggingOptions, TracingOptions}; pub use repl::Repl; use tracing_appender::non_blocking::WorkerGuard; +use wal_switch::{SwitchToLocalWalCommand, SwitchToRemoteWalCommand}; use self::export::ExportCommand; use crate::cli::import::ImportCommand; @@ -118,6 +120,8 @@ enum SubCommand { Bench(BenchTableMetadataCommand), Export(ExportCommand), Import(ImportCommand), + SwitchToRemoteWal(SwitchToRemoteWalCommand), + SwitchToLocalWal(SwitchToLocalWalCommand), } impl SubCommand { @@ -127,6 +131,8 @@ impl SubCommand { SubCommand::Bench(cmd) => cmd.build(guard).await, SubCommand::Export(cmd) => cmd.build(guard).await, SubCommand::Import(cmd) => cmd.build(guard).await, + SubCommand::SwitchToRemoteWal(cmd) => cmd.build(guard).await, + SubCommand::SwitchToLocalWal(cmd) => cmd.build(guard).await, } } } diff --git a/src/cmd/src/cli/wal_switch.rs b/src/cmd/src/cli/wal_switch.rs new file mode 100644 index 000000000000..8f9efc6481c5 --- /dev/null +++ b/src/cmd/src/cli/wal_switch.rs @@ -0,0 +1,192 @@ +use std::sync::Arc; + +use clap::Parser; +use common_meta::error::Result as CommentMetaResult; +use common_meta::kv_backend::chroot::ChrootKvBackend; +use common_meta::kv_backend::etcd::EtcdStore; +use common_meta::kv_backend::KvBackendRef; +use common_meta::range_stream::PaginationStream; +use common_meta::rpc::store::RangeRequest; +use common_meta::rpc::KeyValue; +use common_meta::utils::{ + KvBackendMetadataApplier, MetadataApplier, MetadataProcessor, NoopMetadataApplier, + RellocateRegionWalOptions, +}; +use common_meta::wal_options_allocator::WalOptionsAllocator; +use common_telemetry::warn; +use common_wal::config::kafka::common::KafkaTopicConfig; +use common_wal::config::kafka::MetasrvKafkaConfig; +use common_wal::config::MetasrvWalConfig; +use etcd_client::Client; +use futures::TryStreamExt; +use meta_srv::error::ConnectEtcdSnafu; +use meta_srv::Result as MetaResult; +use snafu::ResultExt; +use tonic::async_trait; +use tracing_appender::non_blocking::WorkerGuard; + +use crate::cli::{Instance, Tool}; +use crate::error::{self, Result}; + +#[derive(Debug, Default, Parser)] +pub struct SwitchToRemoteWalCommand { + /// Store server address default to etcd store. + #[clap(long, value_delimiter = ',', default_value="127.0.0.1:2379", num_args=1..)] + store_addr: Vec, + + /// If it's not empty, the metasrv will store all data with this key prefix. + #[clap(long, default_value = "")] + store_key_prefix: String, + + // A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`. + // i.g., greptimedb_wal_topic_0, greptimedb_wal_topic_1. + #[clap(long, default_value = "greptimedb_wal_topic")] + topic_name_prefix: String, + + // Number of topics. + #[clap(long, default_value = "64")] + num_topics: usize, + + /// Maximum number of operations permitted in a transaction. + #[clap(long, default_value = "128")] + max_txn_ops: usize, + + /// Running in dry mode, no metadata changes will be applied. + #[clap(long)] + dry: bool, +} + +impl SwitchToRemoteWalCommand { + pub async fn build(&self, guard: Vec) -> Result { + Ok(Instance::new( + Box::new(RegionWalOptionProcessor { + store_addr: self + .store_addr + .iter() + .map(|x| x.trim().to_string()) + .filter(|x| !x.is_empty()) + .collect::>(), + store_key_prefix: self.store_key_prefix.to_string(), + max_txn_ops: self.max_txn_ops, + dry: self.dry, + wal_config: MetasrvWalConfig::Kafka(MetasrvKafkaConfig { + kafka_topic: KafkaTopicConfig { + num_topics: self.num_topics, + topic_name_prefix: self.topic_name_prefix.to_string(), + ..Default::default() + }, + auto_create_topics: false, + ..Default::default() + }), + }), + guard, + )) + } +} + +#[derive(Debug, Default, Parser)] +pub struct SwitchToLocalWalCommand { + /// Store server address default to etcd store. + #[clap(long, value_delimiter = ',', default_value="127.0.0.1:2379", num_args=1..)] + store_addr: Vec, + + /// If it's not empty, the metasrv will store all data with this key prefix. + #[clap(long, default_value = "")] + store_key_prefix: String, + + /// Maximum number of operations permitted in a transaction. + #[clap(long, default_value = "128")] + max_txn_ops: usize, + + /// Running in dry mode, no metadata changes will be applied. + #[clap(long)] + dry: bool, +} + +impl SwitchToLocalWalCommand { + pub async fn build(&self, guard: Vec) -> Result { + Ok(Instance::new( + Box::new(RegionWalOptionProcessor { + store_addr: self + .store_addr + .iter() + .map(|x| x.trim().to_string()) + .filter(|x| !x.is_empty()) + .collect::>(), + store_key_prefix: self.store_key_prefix.to_string(), + max_txn_ops: self.max_txn_ops, + dry: self.dry, + wal_config: MetasrvWalConfig::RaftEngine, + }), + guard, + )) + } +} + +pub struct RegionWalOptionProcessor { + store_addr: Vec, + store_key_prefix: String, + max_txn_ops: usize, + dry: bool, + wal_config: MetasrvWalConfig, +} + +impl RegionWalOptionProcessor { + async fn create_etcd_client(&self) -> MetaResult { + let etcd_client = Client::connect(&self.store_addr, None) + .await + .context(ConnectEtcdSnafu)?; + let kv_backend = { + let etcd_backend = EtcdStore::with_etcd_client(etcd_client, self.max_txn_ops); + if !self.store_key_prefix.is_empty() { + Arc::new(ChrootKvBackend::new( + self.store_key_prefix.clone().into_bytes(), + etcd_backend, + )) + } else { + etcd_backend + } + }; + + Ok(kv_backend) + } +} + +fn decoder(kv: KeyValue) -> CommentMetaResult<(Vec, Vec)> { + Ok((kv.key, kv.value)) +} + +#[async_trait] +impl Tool for RegionWalOptionProcessor { + async fn do_work(&self) -> Result<()> { + let kv_backend = self + .create_etcd_client() + .await + .context(error::BuildKvBackendSnafu)?; + let wal_options_allocator = Arc::new(WalOptionsAllocator::new( + self.wal_config.clone(), + kv_backend.clone(), + )); + let req = RangeRequest::new().with_range("\0", "\0"); + let mut stream = Box::pin( + PaginationStream::new(kv_backend.clone(), req, 1024, Arc::new(decoder)).into_stream(), + ); + let transformers = + vec![Box::new(RellocateRegionWalOptions::new(wal_options_allocator)) as _]; + let applier = if self.dry { + warn!("Running in dry mode: no metadata changes will be applied"); + Box::new(NoopMetadataApplier) as Box + } else { + Box::new(KvBackendMetadataApplier::new(kv_backend)) as Box + }; + let processor = MetadataProcessor::new(transformers, applier); + while let Some((key, value)) = stream.try_next().await.context(error::DecodeValueSnafu)? { + processor + .handle(key, value) + .await + .context(error::MetadataProcessorSnafu)?; + } + + Ok(()) + } +} diff --git a/src/cmd/src/error.rs b/src/cmd/src/error.rs index f042b48478d4..c2642f768605 100644 --- a/src/cmd/src/error.rs +++ b/src/cmd/src/error.rs @@ -44,6 +44,20 @@ pub enum Error { source: common_meta::error::Error, }, + #[snafu(display("Failed to decode value"))] + DecodeValue { + #[snafu(implicit)] + location: Location, + source: common_meta::error::Error, + }, + + #[snafu(display("Failed to process metadata"))] + MetadataProcessor { + #[snafu(implicit)] + location: Location, + source: common_meta::error::Error, + }, + #[snafu(display("Failed to init default timezone"))] InitTimezone { #[snafu(implicit)] @@ -121,6 +135,13 @@ pub enum Error { source: meta_srv::error::Error, }, + #[snafu(display("Failed to key value backend"))] + BuildKvBackend { + #[snafu(implicit)] + location: Location, + source: meta_srv::error::Error, + }, + #[snafu(display("Failed to start meta server"))] StartMetaServer { #[snafu(implicit)] @@ -345,7 +366,11 @@ impl ErrorExt for Error { Error::StartMetaServer { source, .. } => source.status_code(), Error::ShutdownMetaServer { source, .. } => source.status_code(), Error::BuildMetaServer { source, .. } => source.status_code(), + Error::BuildKvBackend { source, .. } => source.status_code(), Error::UnsupportedSelectorType { source, .. } => source.status_code(), + Error::DecodeValue { source, .. } | Error::MetadataProcessor { source, .. } => { + source.status_code() + } Error::InitMetadata { source, .. } | Error::InitDdlManager { source, .. } => { source.status_code() diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index b6bdf6189c79..90b96f32dc9e 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -149,7 +149,7 @@ use crate::DatanodeId; pub const NAME_PATTERN: &str = r"[a-zA-Z_:-][a-zA-Z0-9_:\-\.@#]*"; pub const MAINTENANCE_KEY: &str = "__maintenance"; -const DATANODE_TABLE_KEY_PREFIX: &str = "__dn_table"; +pub const DATANODE_TABLE_KEY_PREFIX: &str = "__dn_table"; pub const TABLE_INFO_KEY_PREFIX: &str = "__table_info"; pub const VIEW_INFO_KEY_PREFIX: &str = "__view_info"; pub const TABLE_NAME_KEY_PREFIX: &str = "__table_name"; diff --git a/src/common/meta/src/kv_backend/test.rs b/src/common/meta/src/kv_backend/test.rs index 2f0216dfdfcb..231ec9158f85 100644 --- a/src/common/meta/src/kv_backend/test.rs +++ b/src/common/meta/src/kv_backend/test.rs @@ -19,7 +19,7 @@ use super::{KvBackend, *}; use crate::error::Error; use crate::rpc::store::{BatchGetRequest, PutRequest}; use crate::rpc::KeyValue; -use crate::util; +use crate::utils; pub fn mock_kvs(prefix: Vec) -> Vec { vec![ @@ -58,7 +58,7 @@ pub async fn prepare_kv_with_prefix(kv_backend: &impl KvBackend, prefix: Vec } pub async fn unprepare_kv(kv_backend: &impl KvBackend, prefix: &[u8]) { - let range_end = util::get_prefix_end_key(prefix); + let range_end = utils::get_prefix_end_key(prefix); assert!(kv_backend .delete_range(DeleteRangeRequest { key: prefix.to_vec(), @@ -105,7 +105,7 @@ pub async fn test_kv_range(kv_backend: &impl KvBackend) { pub async fn test_kv_range_with_prefix(kv_backend: &impl KvBackend, prefix: Vec) { let key = [prefix.clone(), b"key1".to_vec()].concat(); let key11 = [prefix.clone(), b"key11".to_vec()].concat(); - let range_end = util::get_prefix_end_key(&key); + let range_end = utils::get_prefix_end_key(&key); let resp = kv_backend .range(RangeRequest { @@ -195,7 +195,7 @@ pub async fn test_kv_range_2_with_prefix(kv_backend: impl KvBackend, prefix: Vec let all_end = if prefix.is_empty() { b"\0".to_vec() } else { - util::get_prefix_end_key(&prefix) + utils::get_prefix_end_key(&prefix) }; let result = kv_backend .range(RangeRequest::new().with_range(all_start, all_end.clone())) @@ -382,7 +382,7 @@ pub async fn test_kv_delete_range_with_prefix(kv_backend: impl KvBackend, prefix assert!(resp.is_none()); let key = [prefix.clone(), b"key1".to_vec()].concat(); - let range_end = util::get_prefix_end_key(&key); + let range_end = utils::get_prefix_end_key(&key); let req = DeleteRangeRequest { key: key.clone(), diff --git a/src/common/meta/src/kv_backend/txn.rs b/src/common/meta/src/kv_backend/txn.rs index 77cd0f921e21..20ced98ca83a 100644 --- a/src/common/meta/src/kv_backend/txn.rs +++ b/src/common/meta/src/kv_backend/txn.rs @@ -130,7 +130,7 @@ pub struct TxnResponse { #[derive(Debug, Clone, Default, PartialEq)] pub struct Txn { // HACK - chroot would modify this field - pub(super) req: TxnRequest, + pub(crate) req: TxnRequest, c_when: bool, c_then: bool, c_else: bool, diff --git a/src/common/meta/src/lib.rs b/src/common/meta/src/lib.rs index c00fd3383042..88b457cd45e4 100644 --- a/src/common/meta/src/lib.rs +++ b/src/common/meta/src/lib.rs @@ -44,7 +44,7 @@ pub mod sequence; pub mod state_store; #[cfg(any(test, feature = "testing"))] pub mod test_util; -pub mod util; +pub mod utils; pub mod wal_options_allocator; // The id of the cluster. diff --git a/src/common/meta/src/range_stream.rs b/src/common/meta/src/range_stream.rs index be54865281b3..821043fe87f2 100644 --- a/src/common/meta/src/range_stream.rs +++ b/src/common/meta/src/range_stream.rs @@ -23,7 +23,7 @@ use crate::error::{self, Result}; use crate::kv_backend::KvBackendRef; use crate::rpc::store::{RangeRequest, RangeResponse}; use crate::rpc::KeyValue; -use crate::util::get_next_prefix_key; +use crate::utils::get_next_prefix_key; pub type KeyValueDecoderFn = dyn Fn(KeyValue) -> Result + Send + Sync; diff --git a/src/common/meta/src/rpc/store.rs b/src/common/meta/src/rpc/store.rs index f763d6b4430d..72aa072a5e60 100644 --- a/src/common/meta/src/rpc/store.rs +++ b/src/common/meta/src/rpc/store.rs @@ -28,7 +28,7 @@ use api::v1::meta::{ use crate::error::Result; use crate::rpc::KeyValue; -use crate::{error, util}; +use crate::{error, utils}; pub fn to_range(key: Vec, range_end: Vec) -> (Bound>, Bound>) { match (&key[..], &range_end[..]) { @@ -137,7 +137,7 @@ impl RangeRequest { /// range_end is the key plus one (e.g., "aa"+1 == "ab", "a\xff"+1 == "b"), pub fn with_prefix(mut self, key: impl Into>) -> Self { self.key = key.into(); - self.range_end = util::get_prefix_end_key(&self.key); + self.range_end = utils::get_prefix_end_key(&self.key); self } @@ -180,7 +180,7 @@ impl TryFrom for RangeResponse { type Error = error::Error; fn try_from(pb: PbRangeResponse) -> Result { - util::check_response_header(pb.header.as_ref())?; + utils::check_response_header(pb.header.as_ref())?; Ok(Self { kvs: pb.kvs.into_iter().map(KeyValue::new).collect(), @@ -275,7 +275,7 @@ impl TryFrom for PutResponse { type Error = error::Error; fn try_from(pb: PbPutResponse) -> Result { - util::check_response_header(pb.header.as_ref())?; + utils::check_response_header(pb.header.as_ref())?; Ok(Self { prev_kv: pb.prev_kv.map(KeyValue::new), @@ -357,7 +357,7 @@ impl TryFrom for BatchGetResponse { type Error = error::Error; fn try_from(pb: PbBatchGetResponse) -> Result { - util::check_response_header(pb.header.as_ref())?; + utils::check_response_header(pb.header.as_ref())?; Ok(Self { kvs: pb.kvs.into_iter().map(KeyValue::new).collect(), @@ -434,7 +434,7 @@ impl TryFrom for BatchPutResponse { type Error = error::Error; fn try_from(pb: PbBatchPutResponse) -> Result { - util::check_response_header(pb.header.as_ref())?; + utils::check_response_header(pb.header.as_ref())?; Ok(Self { prev_kvs: pb.prev_kvs.into_iter().map(KeyValue::new).collect(), @@ -518,7 +518,7 @@ impl TryFrom for BatchDeleteResponse { type Error = error::Error; fn try_from(pb: PbBatchDeleteResponse) -> Result { - util::check_response_header(pb.header.as_ref())?; + utils::check_response_header(pb.header.as_ref())?; Ok(Self { prev_kvs: pb.prev_kvs.into_iter().map(KeyValue::new).collect(), @@ -605,7 +605,7 @@ impl TryFrom for CompareAndPutResponse { type Error = error::Error; fn try_from(pb: PbCompareAndPutResponse) -> Result { - util::check_response_header(pb.header.as_ref())?; + utils::check_response_header(pb.header.as_ref())?; Ok(Self { success: pb.success, @@ -722,7 +722,7 @@ impl DeleteRangeRequest { /// range_end is one bit larger than the given key. pub fn with_prefix(mut self, key: impl Into>) -> Self { self.key = key.into(); - self.range_end = util::get_prefix_end_key(&self.key); + self.range_end = utils::get_prefix_end_key(&self.key); self } @@ -744,7 +744,7 @@ impl TryFrom for DeleteRangeResponse { type Error = error::Error; fn try_from(pb: PbDeleteRangeResponse) -> Result { - util::check_response_header(pb.header.as_ref())?; + utils::check_response_header(pb.header.as_ref())?; Ok(Self { deleted: pb.deleted, diff --git a/src/common/meta/src/util.rs b/src/common/meta/src/utils.rs similarity index 92% rename from src/common/meta/src/util.rs rename to src/common/meta/src/utils.rs index e7a8eba3039c..ac4ed79a7cb5 100644 --- a/src/common/meta/src/util.rs +++ b/src/common/meta/src/utils.rs @@ -12,7 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod transformer; + use api::v1::meta::ResponseHeader; +pub use transformer::{ + KvBackendMetadataApplier, MetadataApplier, MetadataProcessor, MetadataTransformer, + NoopMetadataApplier, RellocateRegionWalOptions, +}; use crate::error::{IllegalServerStateSnafu, Result}; diff --git a/src/common/meta/src/utils/transformer.rs b/src/common/meta/src/utils/transformer.rs new file mode 100644 index 000000000000..2bb098baa63a --- /dev/null +++ b/src/common/meta/src/utils/transformer.rs @@ -0,0 +1,202 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod reallocate_region_wal_options; + +pub use reallocate_region_wal_options::RellocateRegionWalOptions; +use snafu::ensure; + +use crate::error::{self, Result}; +use crate::kv_backend::txn::Txn; +use crate::kv_backend::KvBackendRef; + +/// Represents possible actions for metadata. +#[derive(Debug, Clone)] +pub enum MetadataAction { + /// No operation, used as a placeholder. + Noop, + /// Represents a transaction-based modification to the metadata. + Transform(Txn), +} + +#[cfg(test)] +impl MetadataAction { + /// Converts a [`MetadataAction`] into a [`Txn`] + /// + /// # Panics + /// This method panics if the [`MetadataAction`] is not [`MetadataAction::Transform`]. + pub fn into_transform(self) -> Txn { + let Self::Transform(txn) = self else { + unreachable!() + }; + + txn + } +} + +/// A trait for processing key-value pairs to generate [`MetadataAction`]. +#[async_trait::async_trait] +pub trait MetadataTransformer: Sync + Send { + fn name(&self) -> &'static str { + let type_name = std::any::type_name::(); + // short name + type_name.split("::").last().unwrap_or(type_name) + } + + fn accept(&self, key: &[u8]) -> bool; + + async fn handle(&self, key: Vec, value: Vec) -> Result; +} + +/// A trait for applying [`MetadataAction`] to a backend. +#[async_trait::async_trait] +pub trait MetadataApplier: Sync + Send { + fn name(&self) -> &'static str { + let type_name = std::any::type_name::(); + // short name + type_name.split("::").last().unwrap_or(type_name) + } + + fn accept(&self, action: &MetadataAction) -> bool; + + async fn handle(&self, action: MetadataAction) -> Result<()>; +} + +#[derive(Debug, Default)] +/// The noop executor +pub struct NoopMetadataApplier; + +#[async_trait::async_trait] +impl MetadataApplier for NoopMetadataApplier { + fn accept(&self, _action: &MetadataAction) -> bool { + true + } + + async fn handle(&self, _action: MetadataAction) -> Result<()> { + Ok(()) + } +} + +/// Used to apply [`MetadataAction`] to the [`KvBackendRef`]. +pub struct KvBackendMetadataApplier { + kv_backend: KvBackendRef, +} + +impl KvBackendMetadataApplier { + /// Creates the [`KvBackendMetadataApplier`]. + pub fn new(kv_backend: KvBackendRef) -> Self { + Self { kv_backend } + } +} + +#[async_trait::async_trait] +impl MetadataApplier for KvBackendMetadataApplier { + fn accept(&self, action: &MetadataAction) -> bool { + matches!(action, MetadataAction::Transform(..)) + } + + async fn handle(&self, action: MetadataAction) -> Result<()> { + // Safty: checked before + let MetadataAction::Transform(txn) = action else { + return error::UnexpectedSnafu { + err_msg: "expected 'MetadataAction::Modify'", + } + .fail(); + }; + + let resp = self.kv_backend.txn(txn).await?; + ensure!( + resp.succeeded, + error::UnexpectedSnafu { + err_msg: "Failed to exeuctor transaction" + } + ); + + Ok(()) + } +} + +/// A processor that handles metadata transformation and application. +pub struct MetadataProcessor { + transformers: Vec>, + applier: Box, +} + +impl MetadataProcessor { + /// Creates a new `MetadataProcessor`. + pub fn new( + transformers: Vec>, + applier: Box, + ) -> Self { + Self { + transformers, + applier, + } + } + + /// Finds the first acceptable transformer for the given key. + fn find_acceptable_tranformer(&self, key: &[u8]) -> Option<&dyn MetadataTransformer> { + self.transformers + .iter() + .find(|t| t.accept(key)) + .map(|v| v.as_ref()) + } + + /// Processes a key-value pair using the appropriate transformer and applies the action. + pub async fn handle(&self, key: Vec, value: Vec) -> Result<()> { + if let Some(transformer) = self.find_acceptable_tranformer(&key) { + let action = transformer.handle(key, value).await?; + if self.applier.accept(&action) { + self.applier.handle(action).await?; + } + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use std::assert_matches::assert_matches; + use std::sync::Arc; + + use super::KvBackendMetadataApplier; + use crate::error; + use crate::kv_backend::memory::MemoryKvBackend; + use crate::kv_backend::txn::Txn; + use crate::kv_backend::KvBackend; + use crate::utils::transformer::{MetadataAction, MetadataApplier}; + + #[test] + fn test_accpet() { + let kv_backend = Arc::new(MemoryKvBackend::new()); + let modifier = KvBackendMetadataApplier::new(kv_backend); + assert!(!modifier.accept(&MetadataAction::Noop)); + assert!(modifier.accept(&MetadataAction::Transform(Txn::new()))); + } + + #[tokio::test] + async fn test_handle() { + let kv_backend = Arc::new(MemoryKvBackend::new()); + let modifier = KvBackendMetadataApplier::new(kv_backend.clone()); + let txn = Txn::put_if_not_exists("hi".as_bytes().to_vec(), "foobar".as_bytes().to_vec()); + let action = MetadataAction::Transform(txn); + modifier.handle(action.clone()).await.unwrap(); + let kv = kv_backend.get("hi".as_bytes()).await.unwrap().unwrap(); + assert_eq!(&kv.value, "foobar".as_bytes()); + + let err = modifier.handle(action.clone()).await.unwrap_err(); + assert_matches!(err, error::Error::Unexpected { .. }); + } +} diff --git a/src/common/meta/src/utils/transformer/reallocate_region_wal_options.rs b/src/common/meta/src/utils/transformer/reallocate_region_wal_options.rs new file mode 100644 index 000000000000..ce9113009c91 --- /dev/null +++ b/src/common/meta/src/utils/transformer/reallocate_region_wal_options.rs @@ -0,0 +1,159 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_telemetry::debug; + +use crate::error::Result; +use crate::key::datanode_table::DatanodeTableValue; +use crate::key::{MetadataValue, DATANODE_TABLE_KEY_PREFIX}; +use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp}; +use crate::utils::transformer::{MetadataAction, MetadataTransformer}; +use crate::wal_options_allocator::{allocate_region_wal_options, WalOptionsAllocatorRef}; + +/// A processor responsible for reallocating WAL options for regions. +pub struct RellocateRegionWalOptions { + // TODO(weny): refactor the f***king [`WalOptionsAllocatorRef`] ASAP. + wal_options_allocator: WalOptionsAllocatorRef, +} + +impl RellocateRegionWalOptions { + /// Creates the [`RellocateRegionWalOptions`]. + pub fn new(wal_options_allocator: WalOptionsAllocatorRef) -> Self { + Self { + wal_options_allocator, + } + } +} + +#[async_trait::async_trait] +impl MetadataTransformer for RellocateRegionWalOptions { + fn accept(&self, key: &[u8]) -> bool { + key.starts_with(DATANODE_TABLE_KEY_PREFIX.as_bytes()) + } + + async fn handle(&self, key: Vec, value: Vec) -> Result { + let key_str = String::from_utf8_lossy(&key); + let mut datanode_table_value = DatanodeTableValue::try_from_raw_value(&value)?; + let regions = datanode_table_value.regions.clone(); + let new_region_wal_options = + allocate_region_wal_options(regions, &self.wal_options_allocator)?; + datanode_table_value.region_info.region_wal_options = new_region_wal_options; + + debug!("Updating key: {key_str}"); + let txn = Txn::new() + .when(vec![Compare::with_value( + key.clone(), + CompareOp::Equal, + value, + )]) + .and_then(vec![TxnOp::Put( + key, + datanode_table_value.try_as_raw_value()?, + )]); + Ok(MetadataAction::Transform(txn)) + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::sync::Arc; + + use common_wal::options::WalOptions; + use store_api::storage::RegionNumber; + + use super::*; + use crate::key::datanode_table::{DatanodeTableKey, RegionInfo}; + use crate::key::MetadataKey; + use crate::wal_options_allocator::WalOptionsAllocator; + + #[test] + fn test_accpet() { + let processor = RellocateRegionWalOptions::new(Arc::new(WalOptionsAllocator::default())); + assert!(processor.accept(format!("{}/1024", DATANODE_TABLE_KEY_PREFIX).as_bytes())); + assert!(!processor.accept("hello".as_bytes())); + } + + async fn test_handle( + processor: &RellocateRegionWalOptions, + key: Vec, + value: Vec, + expected: HashMap, + ) { + let txn = processor + .handle(key.clone(), value.clone()) + .await + .unwrap() + .into_transform(); + assert_eq!( + txn.req.compare, + vec![Compare::with_value(key.clone(), CompareOp::Equal, value)] + ); + assert_eq!(txn.req.success.len(), 1); + let TxnOp::Put(txn_key, txn_value) = txn.req.success[0].clone() else { + unreachable!() + }; + assert_eq!(txn_key, key); + let value = DatanodeTableValue::try_from_raw_value(&txn_value).unwrap(); + assert_eq!(value.region_info.region_wal_options, expected); + } + + #[tokio::test] + async fn test_handle_basic() { + let processor = RellocateRegionWalOptions::new(Arc::new(WalOptionsAllocator::default())); + assert!(processor.accept(format!("{}/1024", DATANODE_TABLE_KEY_PREFIX).as_bytes())); + assert!(!processor.accept("hello".as_bytes())); + let key = DatanodeTableKey::new(1, 1024); + let value = DatanodeTableValue::new( + 1024, + vec![1], + RegionInfo { + engine: Default::default(), + region_storage_path: Default::default(), + region_options: Default::default(), + region_wal_options: HashMap::from([(1, "hello".to_string())]), + }, + ); + let key = key.to_bytes(); + let value = value.try_as_raw_value().unwrap(); + test_handle( + &processor, + key, + value, + HashMap::from([(1, serde_json::to_string(&WalOptions::RaftEngine).unwrap())]), + ) + .await; + + let key = DatanodeTableKey::new(1, 1024); + let value = DatanodeTableValue::new( + 1024, + vec![1], + RegionInfo { + engine: Default::default(), + region_storage_path: Default::default(), + region_options: Default::default(), + region_wal_options: HashMap::default(), + }, + ); + let key = key.to_bytes(); + let value = value.try_as_raw_value().unwrap(); + test_handle( + &processor, + key, + value, + HashMap::from([(1, serde_json::to_string(&WalOptions::RaftEngine).unwrap())]), + ) + .await; + } +} diff --git a/src/log-store/src/raft_engine/backend.rs b/src/log-store/src/raft_engine/backend.rs index 33cb64a2e881..5a7f40307cf6 100644 --- a/src/log-store/src/raft_engine/backend.rs +++ b/src/log-store/src/raft_engine/backend.rs @@ -29,7 +29,7 @@ use common_meta::rpc::store::{ RangeRequest, RangeResponse, }; use common_meta::rpc::KeyValue; -use common_meta::util::get_next_prefix_key; +use common_meta::utils::get_next_prefix_key; use raft_engine::{Config, Engine, LogBatch}; use snafu::{IntoError, ResultExt}; diff --git a/src/meta-client/src/client/heartbeat.rs b/src/meta-client/src/client/heartbeat.rs index b1214d72df6d..564216c67f70 100644 --- a/src/meta-client/src/client/heartbeat.rs +++ b/src/meta-client/src/client/heartbeat.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use api::v1::meta::heartbeat_client::HeartbeatClient; use api::v1::meta::{HeartbeatRequest, HeartbeatResponse, RequestHeader, Role}; use common_grpc::channel_manager::ChannelManager; -use common_meta::util; +use common_meta::utils; use common_telemetry::info; use common_telemetry::tracing_context::TracingContext; use snafu::{ensure, OptionExt, ResultExt}; @@ -87,7 +87,7 @@ impl HeartbeatStream { pub async fn message(&mut self) -> Result> { let res = self.stream.message().await.map_err(error::Error::from); if let Ok(Some(heartbeat)) = &res { - util::check_response_header(heartbeat.header.as_ref()) + utils::check_response_header(heartbeat.header.as_ref()) .context(InvalidResponseHeaderSnafu)?; } res diff --git a/src/meta-srv/src/cluster.rs b/src/meta-srv/src/cluster.rs index 9a6cecbd36f7..48202f84299e 100644 --- a/src/meta-srv/src/cluster.rs +++ b/src/meta-srv/src/cluster.rs @@ -31,7 +31,7 @@ use common_meta::rpc::store::{ DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse, }; use common_meta::rpc::KeyValue; -use common_meta::util; +use common_meta::utils; use common_telemetry::warn; use derive_builder::Builder; use snafu::{ensure, OptionExt, ResultExt}; @@ -198,7 +198,7 @@ impl KvBackend for MetaPeerClient { impl MetaPeerClient { async fn get_dn_key_value(&self, keys_only: bool) -> Result> { let key = DatanodeStatKey::prefix_key(); - let range_end = util::get_prefix_end_key(&key); + let range_end = utils::get_prefix_end_key(&key); let range_request = RangeRequest { key, range_end, diff --git a/src/meta-srv/src/lease.rs b/src/meta-srv/src/lease.rs index ef28c2ed7431..653d1b1c069f 100644 --- a/src/meta-srv/src/lease.rs +++ b/src/meta-srv/src/lease.rs @@ -18,7 +18,7 @@ use std::hash::Hash; use common_error::ext::BoxedError; use common_meta::kv_backend::KvBackend; use common_meta::peer::{Peer, PeerLookupService}; -use common_meta::{util, ClusterId, DatanodeId, FlownodeId}; +use common_meta::{utils, ClusterId, DatanodeId, FlownodeId}; use common_time::util as time_util; use snafu::ResultExt; @@ -129,7 +129,7 @@ where P: Fn(&LeaseValue) -> bool, K: Eq + Hash + TryFrom, Error = Error>, { - let range_end = util::get_prefix_end_key(&key); + let range_end = utils::get_prefix_end_key(&key); let range_req = common_meta::rpc::store::RangeRequest { key, range_end,