Skip to content

Commit

Permalink
feat: introduce cli tool for switching WAL
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Dec 4, 2024
1 parent 5bdea1a commit c7f4bc4
Show file tree
Hide file tree
Showing 16 changed files with 616 additions and 26 deletions.
6 changes: 6 additions & 0 deletions src/cmd/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@ mod database;
mod import;
#[allow(unused)]
mod repl;
mod wal_switch;

use async_trait::async_trait;
use bench::BenchTableMetadataCommand;
use clap::Parser;
use common_telemetry::logging::{LoggingOptions, TracingOptions};
pub use repl::Repl;
use tracing_appender::non_blocking::WorkerGuard;
use wal_switch::{SwitchToLocalWalCommand, SwitchToRemoteWalCommand};

use self::export::ExportCommand;
use crate::cli::import::ImportCommand;
Expand Down Expand Up @@ -118,6 +120,8 @@ enum SubCommand {
Bench(BenchTableMetadataCommand),
Export(ExportCommand),
Import(ImportCommand),
SwitchToRemoteWal(SwitchToRemoteWalCommand),
SwitchToLocalWal(SwitchToLocalWalCommand),
}

impl SubCommand {
Expand All @@ -127,6 +131,8 @@ impl SubCommand {
SubCommand::Bench(cmd) => cmd.build(guard).await,
SubCommand::Export(cmd) => cmd.build(guard).await,
SubCommand::Import(cmd) => cmd.build(guard).await,
SubCommand::SwitchToRemoteWal(cmd) => cmd.build(guard).await,
SubCommand::SwitchToLocalWal(cmd) => cmd.build(guard).await,
}
}
}
Expand Down
192 changes: 192 additions & 0 deletions src/cmd/src/cli/wal_switch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
use std::sync::Arc;

use clap::Parser;
use common_meta::error::Result as CommentMetaResult;
use common_meta::kv_backend::chroot::ChrootKvBackend;
use common_meta::kv_backend::etcd::EtcdStore;
use common_meta::kv_backend::KvBackendRef;
use common_meta::range_stream::PaginationStream;
use common_meta::rpc::store::RangeRequest;
use common_meta::rpc::KeyValue;
use common_meta::utils::{
KvBackendMetadataApplier, MetadataApplier, MetadataProcessor, NoopMetadataApplier,
RellocateRegionWalOptions,
};
use common_meta::wal_options_allocator::WalOptionsAllocator;
use common_telemetry::warn;
use common_wal::config::kafka::common::KafkaTopicConfig;
use common_wal::config::kafka::MetasrvKafkaConfig;
use common_wal::config::MetasrvWalConfig;
use etcd_client::Client;
use futures::TryStreamExt;
use meta_srv::error::ConnectEtcdSnafu;
use meta_srv::Result as MetaResult;
use snafu::ResultExt;
use tonic::async_trait;
use tracing_appender::non_blocking::WorkerGuard;

use crate::cli::{Instance, Tool};
use crate::error::{self, Result};

#[derive(Debug, Default, Parser)]
pub struct SwitchToRemoteWalCommand {
/// Store server address default to etcd store.
#[clap(long, value_delimiter = ',', default_value="127.0.0.1:2379", num_args=1..)]
store_addr: Vec<String>,

/// If it's not empty, the metasrv will store all data with this key prefix.
#[clap(long, default_value = "")]
store_key_prefix: String,

// A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.
// i.g., greptimedb_wal_topic_0, greptimedb_wal_topic_1.
#[clap(long, default_value = "greptimedb_wal_topic")]
topic_name_prefix: String,

// Number of topics.
#[clap(long, default_value = "64")]
num_topics: usize,

/// Maximum number of operations permitted in a transaction.
#[clap(long, default_value = "128")]
max_txn_ops: usize,

/// Running in dry mode, no metadata changes will be applied.
#[clap(long)]
dry: bool,
}

impl SwitchToRemoteWalCommand {
pub async fn build(&self, guard: Vec<WorkerGuard>) -> Result<Instance> {
Ok(Instance::new(
Box::new(RegionWalOptionProcessor {
store_addr: self
.store_addr
.iter()
.map(|x| x.trim().to_string())
.filter(|x| !x.is_empty())
.collect::<Vec<_>>(),
store_key_prefix: self.store_key_prefix.to_string(),
max_txn_ops: self.max_txn_ops,
dry: self.dry,
wal_config: MetasrvWalConfig::Kafka(MetasrvKafkaConfig {
kafka_topic: KafkaTopicConfig {
num_topics: self.num_topics,
topic_name_prefix: self.topic_name_prefix.to_string(),
..Default::default()
},
auto_create_topics: false,
..Default::default()
}),
}),
guard,
))
}
}

#[derive(Debug, Default, Parser)]
pub struct SwitchToLocalWalCommand {
/// Store server address default to etcd store.
#[clap(long, value_delimiter = ',', default_value="127.0.0.1:2379", num_args=1..)]
store_addr: Vec<String>,

/// If it's not empty, the metasrv will store all data with this key prefix.
#[clap(long, default_value = "")]
store_key_prefix: String,

/// Maximum number of operations permitted in a transaction.
#[clap(long, default_value = "128")]
max_txn_ops: usize,

/// Running in dry mode, no metadata changes will be applied.
#[clap(long)]
dry: bool,
}

impl SwitchToLocalWalCommand {
pub async fn build(&self, guard: Vec<WorkerGuard>) -> Result<Instance> {
Ok(Instance::new(
Box::new(RegionWalOptionProcessor {
store_addr: self
.store_addr
.iter()
.map(|x| x.trim().to_string())
.filter(|x| !x.is_empty())
.collect::<Vec<_>>(),
store_key_prefix: self.store_key_prefix.to_string(),
max_txn_ops: self.max_txn_ops,
dry: self.dry,
wal_config: MetasrvWalConfig::RaftEngine,
}),
guard,
))
}
}

pub struct RegionWalOptionProcessor {
store_addr: Vec<String>,
store_key_prefix: String,
max_txn_ops: usize,
dry: bool,
wal_config: MetasrvWalConfig,
}

impl RegionWalOptionProcessor {
async fn create_etcd_client(&self) -> MetaResult<KvBackendRef> {
let etcd_client = Client::connect(&self.store_addr, None)
.await
.context(ConnectEtcdSnafu)?;
let kv_backend = {
let etcd_backend = EtcdStore::with_etcd_client(etcd_client, self.max_txn_ops);
if !self.store_key_prefix.is_empty() {
Arc::new(ChrootKvBackend::new(
self.store_key_prefix.clone().into_bytes(),
etcd_backend,
))
} else {
etcd_backend
}
};

Ok(kv_backend)
}
}

fn decoder(kv: KeyValue) -> CommentMetaResult<(Vec<u8>, Vec<u8>)> {
Ok((kv.key, kv.value))
}

#[async_trait]
impl Tool for RegionWalOptionProcessor {
async fn do_work(&self) -> Result<()> {
let kv_backend = self
.create_etcd_client()
.await
.context(error::BuildKvBackendSnafu)?;
let wal_options_allocator = Arc::new(WalOptionsAllocator::new(
self.wal_config.clone(),
kv_backend.clone(),
));
let req = RangeRequest::new().with_range("\0", "\0");
let mut stream = Box::pin(
PaginationStream::new(kv_backend.clone(), req, 1024, Arc::new(decoder)).into_stream(),
);
let transformers =
vec![Box::new(RellocateRegionWalOptions::new(wal_options_allocator)) as _];
let applier = if self.dry {
warn!("Running in dry mode: no metadata changes will be applied");
Box::new(NoopMetadataApplier) as Box<dyn MetadataApplier>
} else {
Box::new(KvBackendMetadataApplier::new(kv_backend)) as Box<dyn MetadataApplier>
};
let processor = MetadataProcessor::new(transformers, applier);
while let Some((key, value)) = stream.try_next().await.context(error::DecodeValueSnafu)? {
processor
.handle(key, value)
.await
.context(error::MetadataProcessorSnafu)?;
}

Ok(())
}
}
25 changes: 25 additions & 0 deletions src/cmd/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,20 @@ pub enum Error {
source: common_meta::error::Error,
},

#[snafu(display("Failed to decode value"))]
DecodeValue {
#[snafu(implicit)]
location: Location,
source: common_meta::error::Error,
},

#[snafu(display("Failed to process metadata"))]
MetadataProcessor {
#[snafu(implicit)]
location: Location,
source: common_meta::error::Error,
},

#[snafu(display("Failed to init default timezone"))]
InitTimezone {
#[snafu(implicit)]
Expand Down Expand Up @@ -121,6 +135,13 @@ pub enum Error {
source: meta_srv::error::Error,
},

#[snafu(display("Failed to key value backend"))]
BuildKvBackend {
#[snafu(implicit)]
location: Location,
source: meta_srv::error::Error,
},

#[snafu(display("Failed to start meta server"))]
StartMetaServer {
#[snafu(implicit)]
Expand Down Expand Up @@ -345,7 +366,11 @@ impl ErrorExt for Error {
Error::StartMetaServer { source, .. } => source.status_code(),
Error::ShutdownMetaServer { source, .. } => source.status_code(),
Error::BuildMetaServer { source, .. } => source.status_code(),
Error::BuildKvBackend { source, .. } => source.status_code(),
Error::UnsupportedSelectorType { source, .. } => source.status_code(),
Error::DecodeValue { source, .. } | Error::MetadataProcessor { source, .. } => {
source.status_code()
}

Error::InitMetadata { source, .. } | Error::InitDdlManager { source, .. } => {
source.status_code()
Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ use crate::DatanodeId;
pub const NAME_PATTERN: &str = r"[a-zA-Z_:-][a-zA-Z0-9_:\-\.@#]*";
pub const MAINTENANCE_KEY: &str = "__maintenance";

const DATANODE_TABLE_KEY_PREFIX: &str = "__dn_table";
pub const DATANODE_TABLE_KEY_PREFIX: &str = "__dn_table";
pub const TABLE_INFO_KEY_PREFIX: &str = "__table_info";
pub const VIEW_INFO_KEY_PREFIX: &str = "__view_info";
pub const TABLE_NAME_KEY_PREFIX: &str = "__table_name";
Expand Down
10 changes: 5 additions & 5 deletions src/common/meta/src/kv_backend/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use super::{KvBackend, *};
use crate::error::Error;
use crate::rpc::store::{BatchGetRequest, PutRequest};
use crate::rpc::KeyValue;
use crate::util;
use crate::utils;

pub fn mock_kvs(prefix: Vec<u8>) -> Vec<KeyValue> {
vec![
Expand Down Expand Up @@ -58,7 +58,7 @@ pub async fn prepare_kv_with_prefix(kv_backend: &impl KvBackend, prefix: Vec<u8>
}

pub async fn unprepare_kv(kv_backend: &impl KvBackend, prefix: &[u8]) {
let range_end = util::get_prefix_end_key(prefix);
let range_end = utils::get_prefix_end_key(prefix);
assert!(kv_backend
.delete_range(DeleteRangeRequest {
key: prefix.to_vec(),
Expand Down Expand Up @@ -105,7 +105,7 @@ pub async fn test_kv_range(kv_backend: &impl KvBackend) {
pub async fn test_kv_range_with_prefix(kv_backend: &impl KvBackend, prefix: Vec<u8>) {
let key = [prefix.clone(), b"key1".to_vec()].concat();
let key11 = [prefix.clone(), b"key11".to_vec()].concat();
let range_end = util::get_prefix_end_key(&key);
let range_end = utils::get_prefix_end_key(&key);

let resp = kv_backend
.range(RangeRequest {
Expand Down Expand Up @@ -195,7 +195,7 @@ pub async fn test_kv_range_2_with_prefix(kv_backend: impl KvBackend, prefix: Vec
let all_end = if prefix.is_empty() {
b"\0".to_vec()
} else {
util::get_prefix_end_key(&prefix)
utils::get_prefix_end_key(&prefix)
};
let result = kv_backend
.range(RangeRequest::new().with_range(all_start, all_end.clone()))
Expand Down Expand Up @@ -382,7 +382,7 @@ pub async fn test_kv_delete_range_with_prefix(kv_backend: impl KvBackend, prefix
assert!(resp.is_none());

let key = [prefix.clone(), b"key1".to_vec()].concat();
let range_end = util::get_prefix_end_key(&key);
let range_end = utils::get_prefix_end_key(&key);

let req = DeleteRangeRequest {
key: key.clone(),
Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/src/kv_backend/txn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ pub struct TxnResponse {
#[derive(Debug, Clone, Default, PartialEq)]
pub struct Txn {
// HACK - chroot would modify this field
pub(super) req: TxnRequest,
pub(crate) req: TxnRequest,
c_when: bool,
c_then: bool,
c_else: bool,
Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub mod sequence;
pub mod state_store;
#[cfg(any(test, feature = "testing"))]
pub mod test_util;
pub mod util;
pub mod utils;
pub mod wal_options_allocator;

// The id of the cluster.
Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/src/range_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::error::{self, Result};
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::{RangeRequest, RangeResponse};
use crate::rpc::KeyValue;
use crate::util::get_next_prefix_key;
use crate::utils::get_next_prefix_key;

pub type KeyValueDecoderFn<T> = dyn Fn(KeyValue) -> Result<T> + Send + Sync;

Expand Down
Loading

0 comments on commit c7f4bc4

Please sign in to comment.