From 4c6c910e4ee13ce4bb03b1e87e1f660701ca0db5 Mon Sep 17 00:00:00 2001 From: niebayes Date: Mon, 25 Dec 2023 21:42:10 +0800 Subject: [PATCH 01/19] feat: integrate remote wal to standalone --- config/standalone.example.toml | 38 ++++++++++++++++++- src/cmd/src/options.rs | 2 + src/cmd/src/standalone.rs | 26 ++++++++----- src/common/meta/src/wal.rs | 2 - .../meta/src/wal/kafka/topic_manager.rs | 2 +- tests-integration/src/standalone.rs | 7 ++-- 6 files changed, 59 insertions(+), 18 deletions(-) diff --git a/config/standalone.example.toml b/config/standalone.example.toml index 713f8ef79edb..a9efb4d10365 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -80,8 +80,42 @@ enable = true # Whether to enable Prometheus remote write and read in HTTP API, true by default. enable = true -# WAL options. -[wal] +[wal_meta] +# Available wal providers: +# - "raft_engine" (default) +# - "kafka" +provider = "raft_engine" + +# There're none raft-engine wal config since meta srv only involves in remote wal currently. + +# Kafka wal config. +# The broker endpoints of the Kafka cluster. ["127.0.0.1:9090"] by default. +# broker_endpoints = ["127.0.0.1:9090"] +# Number of topics to be created upon start. +# num_topics = 64 +# Topic selector type. +# Available selector types: +# - "round_robin" (default) +# selector_type = "round_robin" +# A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`. +# topic_name_prefix = "greptimedb_wal_topic" +# Number of partitions per topic. +# num_partitions = 1 +# Expected number of replicas of each partition. +# replication_factor = 3 +# Above which a topic creation operation will be cancelled. +# create_topic_timeout = "30s" +# The initial backoff for kafka clients. +# backoff_init = "500ms" +# The maximum backoff for kafka clients. +# backoff_max = "10s" +# Exponential backoff rate, i.e. next backoff = base * current backoff. +# backoff_base = 2.0 +# Stop reconnecting if the total wait time reaches the deadline. If this config is missing, the reconnecting won't terminate. +# backoff_deadline = "5mins" + +# WAL options for datanode. +[wal_datanode] # Available wal providers: # - "RaftEngine" (default) # - "Kafka" diff --git a/src/cmd/src/options.rs b/src/cmd/src/options.rs index 4c6b04752a33..9270d8d52a6b 100644 --- a/src/cmd/src/options.rs +++ b/src/cmd/src/options.rs @@ -14,6 +14,7 @@ use clap::ArgMatches; use common_config::KvBackendConfig; +use common_meta::wal::WalConfig as MetaSrvWalConfig; use common_telemetry::logging::{LoggingOptions, TracingOptions}; use config::{Config, Environment, File, FileFormat}; use datanode::config::{DatanodeOptions, ProcedureConfig}; @@ -37,6 +38,7 @@ pub struct MixOptions { pub frontend: FrontendOptions, pub datanode: DatanodeOptions, pub logging: LoggingOptions, + pub wal_meta: MetaSrvWalConfig, } impl From for FrontendOptions { diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index b283dc59fb2e..43d597d3f918 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -18,7 +18,7 @@ use std::{fs, path}; use async_trait::async_trait; use clap::Parser; use common_catalog::consts::MIN_USER_TABLE_ID; -use common_config::{metadata_store_dir, KvBackendConfig, WalConfig}; +use common_config::{metadata_store_dir, KvBackendConfig, WalConfig as DatanodeWalConfig}; use common_meta::cache_invalidator::DummyCacheInvalidator; use common_meta::datanode_manager::DatanodeManagerRef; use common_meta::ddl::{DdlTaskExecutorRef, TableMetadataAllocatorRef}; @@ -27,7 +27,9 @@ use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::KvBackendRef; use common_meta::region_keeper::MemoryRegionKeeper; use common_meta::sequence::SequenceBuilder; -use common_meta::wal::{WalOptionsAllocator, WalOptionsAllocatorRef}; +use common_meta::wal::{ + WalConfig as MetaSrvWalConfig, WalOptionsAllocator, WalOptionsAllocatorRef, +}; use common_procedure::ProcedureManagerRef; use common_telemetry::info; use common_telemetry::logging::LoggingOptions; @@ -104,7 +106,8 @@ pub struct StandaloneOptions { pub opentsdb: OpentsdbOptions, pub influxdb: InfluxdbOptions, pub prom_store: PromStoreOptions, - pub wal: WalConfig, + pub wal_meta: MetaSrvWalConfig, + pub wal_datanode: DatanodeWalConfig, pub storage: StorageConfig, pub metadata_store: KvBackendConfig, pub procedure: ProcedureConfig, @@ -127,7 +130,8 @@ impl Default for StandaloneOptions { opentsdb: OpentsdbOptions::default(), influxdb: InfluxdbOptions::default(), prom_store: PromStoreOptions::default(), - wal: WalConfig::default(), + wal_meta: MetaSrvWalConfig::default(), + wal_datanode: DatanodeWalConfig::default(), storage: StorageConfig::default(), metadata_store: KvBackendConfig::default(), procedure: ProcedureConfig::default(), @@ -166,7 +170,7 @@ impl StandaloneOptions { DatanodeOptions { node_id: Some(0), enable_telemetry: self.enable_telemetry, - wal: self.wal, + wal: self.wal_datanode, storage: self.storage, region_engine: self.region_engine, rpc_addr: self.grpc.addr, @@ -338,7 +342,8 @@ impl StartCommand { let procedure = opts.procedure.clone(); let frontend = opts.clone().frontend_options(); let logging = opts.logging.clone(); - let datanode = opts.datanode_options(); + let wal_meta = opts.wal_meta.clone(); + let datanode = opts.datanode_options().clone(); Ok(Options::Standalone(Box::new(MixOptions { procedure, @@ -347,6 +352,7 @@ impl StartCommand { frontend, datanode, logging, + wal_meta, }))) } @@ -392,9 +398,8 @@ impl StartCommand { .step(10) .build(), ); - // TODO(niebayes): add a wal config into the MixOptions and pass it to the allocator builder. let wal_options_allocator = Arc::new(WalOptionsAllocator::new( - common_meta::wal::WalConfig::default(), + opts.wal_meta.clone(), kv_backend.clone(), )); let table_meta_allocator = Arc::new(StandaloneTableMetadataAllocator::new( @@ -585,7 +590,7 @@ mod tests { assert_eq!(None, fe_opts.mysql.reject_no_database); assert!(fe_opts.influxdb.enable); - let WalConfig::RaftEngine(raft_engine_config) = dn_opts.wal else { + let DatanodeWalConfig::RaftEngine(raft_engine_config) = dn_opts.wal else { unreachable!() }; assert_eq!("/tmp/greptimedb/test/wal", raft_engine_config.dir.unwrap()); @@ -731,7 +736,8 @@ mod tests { assert_eq!(options.opentsdb, default_options.opentsdb); assert_eq!(options.influxdb, default_options.influxdb); assert_eq!(options.prom_store, default_options.prom_store); - assert_eq!(options.wal, default_options.wal); + assert_eq!(options.wal_meta, default_options.wal_meta); + assert_eq!(options.wal_datanode, default_options.wal_datanode); assert_eq!(options.metadata_store, default_options.metadata_store); assert_eq!(options.procedure, default_options.procedure); assert_eq!(options.logging, default_options.logging); diff --git a/src/common/meta/src/wal.rs b/src/common/meta/src/wal.rs index f34a5224a87e..f80e396a8186 100644 --- a/src/common/meta/src/wal.rs +++ b/src/common/meta/src/wal.rs @@ -94,6 +94,4 @@ mod tests { }; assert_eq!(wal_config, WalConfig::Kafka(expected_kafka_config)); } - - // TODO(niebayes): the integrate test needs to test that the example config file can be successfully parsed. } diff --git a/src/common/meta/src/wal/kafka/topic_manager.rs b/src/common/meta/src/wal/kafka/topic_manager.rs index 995d8c1393d0..30d370b5d108 100644 --- a/src/common/meta/src/wal/kafka/topic_manager.rs +++ b/src/common/meta/src/wal/kafka/topic_manager.rs @@ -130,7 +130,7 @@ impl TopicManager { ) }) .collect::>(); - // TODO(niebayes): Determine how rskafka handles an already-exist topic. Check if an error would be raised. + // FIXME(niebayes): try to create an already-exist topic would raise an error. futures::future::try_join_all(tasks) .await .context(CreateKafkaWalTopicSnafu) diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index 8cbd70260cde..14afcb2ca19a 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -23,7 +23,7 @@ use common_meta::ddl_manager::DdlManager; use common_meta::key::TableMetadataManager; use common_meta::region_keeper::MemoryRegionKeeper; use common_meta::sequence::SequenceBuilder; -use common_meta::wal::WalOptionsAllocator; +use common_meta::wal::{WalConfig as MetaSrvWalConfig, WalOptionsAllocator}; use common_procedure::options::ProcedureConfig; use common_telemetry::logging::LoggingOptions; use datanode::config::DatanodeOptions; @@ -118,9 +118,9 @@ impl GreptimeDbStandaloneBuilder { .step(10) .build(), ); - // TODO(niebayes): add a wal config into the MixOptions and pass it to the allocator builder. + let wal_meta = MetaSrvWalConfig::default(); let wal_options_allocator = Arc::new(WalOptionsAllocator::new( - common_meta::wal::WalConfig::default(), + wal_meta.clone(), kv_backend.clone(), )); let table_meta_allocator = Arc::new(StandaloneTableMetadataAllocator::new( @@ -163,6 +163,7 @@ impl GreptimeDbStandaloneBuilder { frontend: FrontendOptions::default(), datanode: opts, logging: LoggingOptions::default(), + wal_meta, }, guard, } From d2cf0b6e36ec91eb41868dad054e1f40c3c9ad9a Mon Sep 17 00:00:00 2001 From: niebayes Date: Tue, 26 Dec 2023 00:05:06 +0800 Subject: [PATCH 02/19] fix: test --- src/cmd/src/standalone.rs | 6 +++++- tests-integration/tests/http.rs | 6 +++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 43d597d3f918..ce4a615ba102 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -526,7 +526,10 @@ mod tests { enable_memory_catalog = true - [wal] + [wal_meta] + provider = "raft_engine" + + [wal_datanode] provider = "raft_engine" dir = "/tmp/greptimedb/test/wal" file_size = "1GB" @@ -534,6 +537,7 @@ mod tests { purge_interval = "10m" read_batch_size = 128 sync_write = false + [storage] data_home = "/tmp/greptimedb/" type = "File" diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 5a8f2aa4aaf7..d212be38e2d9 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -824,7 +824,11 @@ write_interval = "30s" [datanode.export_metrics.headers] [logging] -enable_otlp_tracing = false"#, +enable_otlp_tracing = false + +[wal_meta] +provider = "raft_engine" +"#, store_type, ); let body_text = drop_lines_with_inconsistent_results(res_get.text().await); From 7c7438dd7a092473234fc3aad51978d40bbb17c3 Mon Sep 17 00:00:00 2001 From: niebayes Date: Tue, 26 Dec 2023 00:29:30 +0800 Subject: [PATCH 03/19] chore: ready to debug kafka remote wal --- config/standalone.example.toml | 84 +++++++++++------------------- src/common/config/src/wal.rs | 2 +- src/common/config/src/wal/kafka.rs | 2 +- src/common/meta/src/wal.rs | 9 +++- src/common/meta/src/wal/kafka.rs | 7 ++- 5 files changed, 45 insertions(+), 59 deletions(-) diff --git a/config/standalone.example.toml b/config/standalone.example.toml index a9efb4d10365..40c372dd0cd5 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -5,8 +5,8 @@ enable_telemetry = true # HTTP server options. [http] -# Server address, "127.0.0.1:4000" by default. -addr = "127.0.0.1:4000" +# Server address, ".0.0.1:4000" by default. +addr = ".0.0.1:4000" # HTTP request timeout, 30s by default. timeout = "30s" # HTTP request body limit, 64Mb by default. @@ -15,8 +15,8 @@ body_limit = "64MB" # gRPC server options. [grpc] -# Server address, "127.0.0.1:4001" by default. -addr = "127.0.0.1:4001" +# Server address, ".0.0.1:4001" by default. +addr = ".0.0.1:4001" # The number of server worker threads, 8 by default. runtime_size = 8 @@ -24,8 +24,8 @@ runtime_size = 8 [mysql] # Whether to enable enable = true -# Server address, "127.0.0.1:4002" by default. -addr = "127.0.0.1:4002" +# Server address, ".0.0.1:4002" by default. +addr = ".0.0.1:4002" # The number of server worker threads, 2 by default. runtime_size = 2 @@ -47,8 +47,8 @@ key_path = "" [postgres] # Whether to enable enable = true -# Server address, "127.0.0.1:4003" by default. -addr = "127.0.0.1:4003" +# Server address, ".0.0.1:4003" by default. +addr = ".0.0.1:4003" # The number of server worker threads, 2 by default. runtime_size = 2 @@ -65,8 +65,8 @@ key_path = "" [opentsdb] # Whether to enable enable = true -# OpenTSDB telnet API server address, "127.0.0.1:4242" by default. -addr = "127.0.0.1:4242" +# OpenTSDB telnet API server address, ".0.0.1:4242" by default. +addr = ".0.0.1:4242" # The number of server worker threads, 2 by default. runtime_size = 2 @@ -84,60 +84,38 @@ enable = true # Available wal providers: # - "raft_engine" (default) # - "kafka" -provider = "raft_engine" +provider = "kafka" # There're none raft-engine wal config since meta srv only involves in remote wal currently. # Kafka wal config. -# The broker endpoints of the Kafka cluster. ["127.0.0.1:9090"] by default. -# broker_endpoints = ["127.0.0.1:9090"] -# Number of topics to be created upon start. -# num_topics = 64 -# Topic selector type. -# Available selector types: -# - "round_robin" (default) -# selector_type = "round_robin" -# A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`. -# topic_name_prefix = "greptimedb_wal_topic" -# Number of partitions per topic. -# num_partitions = 1 -# Expected number of replicas of each partition. -# replication_factor = 3 -# Above which a topic creation operation will be cancelled. -# create_topic_timeout = "30s" -# The initial backoff for kafka clients. -# backoff_init = "500ms" -# The maximum backoff for kafka clients. -# backoff_max = "10s" -# Exponential backoff rate, i.e. next backoff = base * current backoff. -# backoff_base = 2.0 -# Stop reconnecting if the total wait time reaches the deadline. If this config is missing, the reconnecting won't terminate. -# backoff_deadline = "5mins" +broker_endpoints = ["192.168.50.178:29092"] +num_topics = 64 +selector_type = "round_robin" +topic_name_prefix = "greptimedb_wal_topic" +num_partitions = 1 +replication_factor = 3 +backoff_init = "500ms" +backoff_max = "10s" +backoff_base = 2.0 +backoff_deadline = "5mins" # WAL options for datanode. [wal_datanode] # Available wal providers: # - "RaftEngine" (default) # - "Kafka" -provider = "raft_engine" +provider = "kafka" # Kafka wal options. -# The broker endpoints of the Kafka cluster. ["127.0.0.1:9090"] by default. -# broker_endpoints = ["127.0.0.1:9090"] -# The maximum log size a kafka batch producer could buffer. -# max_batch_size = "4MB" -# The linger duration of a kafka batch producer. -# linger = "200ms" -# The maximum amount of time (in milliseconds) to wait for Kafka records to be returned. -# max_wait_time = "100ms" -# The initial backoff for kafka clients. -# backoff_init = "500ms" -# The maximum backoff for kafka clients. -# backoff_max = "10s" -# Exponential backoff rate, i.e. next backoff = base * current backoff. -# backoff_base = 2.0 -# Stop reconnecting if the total wait time reaches the deadline. If this config is missing, the reconnecting won't terminate. -# backoff_deadline = "5mins" +broker_endpoints = ["192.168.50.178:29092"] +max_batch_size = "4MB" +linger = "200ms" +max_wait_time = "100ms" +backoff_init = "500ms" +backoff_max = "10s" +backoff_base = 2.0 +backoff_deadline = "5mins" # WAL data directory # dir = "/tmp/greptimedb/wal" @@ -243,7 +221,7 @@ parallel_scan_channel_size = 32 # whether enable export metrics, default is false # enable = false # The url of metrics export endpoint, default is `frontend` default HTTP endpoint. -# endpoint = "127.0.0.1:4000" +# endpoint = ".0.0.1:4000" # The database name of exported metrics stores, user needs to specify a valid database # db = "" # The interval of export metrics diff --git a/src/common/config/src/wal.rs b/src/common/config/src/wal.rs index 60128d14b35e..e3c1b7b9cb28 100644 --- a/src/common/config/src/wal.rs +++ b/src/common/config/src/wal.rs @@ -37,7 +37,7 @@ pub enum WalConfig { impl Default for WalConfig { fn default() -> Self { - WalConfig::RaftEngine(RaftEngineConfig::default()) + WalConfig::Kafka(KafkaConfig::default()) } } diff --git a/src/common/config/src/wal/kafka.rs b/src/common/config/src/wal/kafka.rs index eb6795054141..fb366411c317 100644 --- a/src/common/config/src/wal/kafka.rs +++ b/src/common/config/src/wal/kafka.rs @@ -59,7 +59,7 @@ pub struct KafkaConfig { impl Default for KafkaConfig { fn default() -> Self { Self { - broker_endpoints: vec!["127.0.0.1:9090".to_string()], + broker_endpoints: vec!["127.0.0.1:9092".to_string()], compression: RsKafkaCompression::NoCompression, max_batch_size: ReadableSize::mb(4), linger: Duration::from_millis(200), diff --git a/src/common/meta/src/wal.rs b/src/common/meta/src/wal.rs index f80e396a8186..728373182036 100644 --- a/src/common/meta/src/wal.rs +++ b/src/common/meta/src/wal.rs @@ -28,16 +28,21 @@ pub use crate::wal::options_allocator::{ }; /// Wal config for metasrv. -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] #[serde(tag = "provider")] pub enum WalConfig { - #[default] #[serde(rename = "raft_engine")] RaftEngine, #[serde(rename = "kafka")] Kafka(KafkaConfig), } +impl Default for WalConfig { + fn default() -> Self { + WalConfig::Kafka(KafkaConfig::default()) + } +} + #[cfg(test)] mod tests { use std::time::Duration; diff --git a/src/common/meta/src/wal/kafka.rs b/src/common/meta/src/wal/kafka.rs index 173a74662d95..8b434cb8dd55 100644 --- a/src/common/meta/src/wal/kafka.rs +++ b/src/common/meta/src/wal/kafka.rs @@ -60,13 +60,16 @@ pub struct KafkaConfig { impl Default for KafkaConfig { fn default() -> Self { + let broker_endpoints = vec!["127.0.0.1:9092".to_string()]; + let replication_factor = broker_endpoints.len() as i16; + Self { - broker_endpoints: vec!["127.0.0.1:9090".to_string()], + broker_endpoints, num_topics: 64, selector_type: TopicSelectorType::RoundRobin, topic_name_prefix: "greptimedb_wal_topic".to_string(), num_partitions: 1, - replication_factor: 3, + replication_factor, create_topic_timeout: Duration::from_secs(30), backoff_init: Duration::from_millis(500), backoff_max: Duration::from_secs(10), From fd2c2954c97ab5767a7401be66b9c2b6088a1839 Mon Sep 17 00:00:00 2001 From: niebayes Date: Tue, 26 Dec 2023 00:30:08 +0800 Subject: [PATCH 04/19] fix: test --- tests-integration/tests/http.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index d212be38e2d9..0decb3821951 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -827,8 +827,7 @@ write_interval = "30s" enable_otlp_tracing = false [wal_meta] -provider = "raft_engine" -"#, +provider = "raft_engine""#, store_type, ); let body_text = drop_lines_with_inconsistent_results(res_get.text().await); From ac463cdf6e694b3a3153f25d70a3c31d54ed7b3a Mon Sep 17 00:00:00 2001 From: niebayes Date: Tue, 26 Dec 2023 00:57:09 +0800 Subject: [PATCH 05/19] chore: add some logs for remote wal --- src/common/meta/src/ddl/create_table.rs | 7 ++++++- src/common/meta/src/key/datanode_table.rs | 6 ++++++ src/datanode/src/datanode.rs | 6 +++++- src/log-store/src/kafka/log_store.rs | 20 ++++++++++++++++++++ src/mito2/src/region/opener.rs | 8 ++++++++ src/mito2/src/wal.rs | 6 ++++++ 6 files changed, 51 insertions(+), 2 deletions(-) diff --git a/src/common/meta/src/ddl/create_table.rs b/src/common/meta/src/ddl/create_table.rs index c73844fc8337..3496d98b998b 100644 --- a/src/common/meta/src/ddl/create_table.rs +++ b/src/common/meta/src/ddl/create_table.rs @@ -26,8 +26,8 @@ use common_procedure::error::{ ExternalSnafu, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu, }; use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status}; -use common_telemetry::info; use common_telemetry::tracing_context::TracingContext; +use common_telemetry::{debug, info}; use futures::future::join_all; use serde::{Deserialize, Serialize}; use snafu::{ensure, OptionExt, ResultExt}; @@ -475,6 +475,11 @@ impl CreateRequestBuilder { ); } + debug!( + "Set region options {:?} for region {}", + request.options, region_id + ); + Ok(request) } } diff --git a/src/common/meta/src/key/datanode_table.rs b/src/common/meta/src/key/datanode_table.rs index b2e25e014bc8..ee202758bffd 100644 --- a/src/common/meta/src/key/datanode_table.rs +++ b/src/common/meta/src/key/datanode_table.rs @@ -15,6 +15,7 @@ use std::collections::HashMap; use std::sync::Arc; +use common_telemetry::debug; use futures::stream::BoxStream; use futures::StreamExt; use serde::{Deserialize, Serialize}; @@ -185,6 +186,11 @@ impl DatanodeTableManager { }) .collect(); + debug!( + "Persist region wal options {:?} for table {}", + filtered_region_wal_options, table_id + ); + let key = DatanodeTableKey::new(datanode_id, table_id); let val = DatanodeTableValue::new( table_id, diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 0a0206eddc66..0ba769edf559 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -29,7 +29,7 @@ use common_meta::key::datanode_table::{DatanodeTableManager, DatanodeTableValue} use common_meta::kv_backend::KvBackendRef; pub use common_procedure::options::ProcedureConfig; use common_runtime::Runtime; -use common_telemetry::{error, info, warn}; +use common_telemetry::{debug, error, info, warn}; use file_engine::engine::FileRegionEngine; use futures::future; use futures_util::future::try_join_all; @@ -545,6 +545,10 @@ async fn open_all_regions( .and_then(|wal_options| { region_options.insert(WAL_OPTIONS_KEY.to_string(), wal_options.clone()) }); + debug!( + "Read region options {:?} for region {} from kv backend", + region_options, region_number + ); regions.push(( RegionId::new(table_value.table_id, region_number), diff --git a/src/log-store/src/kafka/log_store.rs b/src/log-store/src/kafka/log_store.rs index 4ff054712ff3..fd57f6678fad 100644 --- a/src/log-store/src/kafka/log_store.rs +++ b/src/log-store/src/kafka/log_store.rs @@ -16,6 +16,7 @@ use std::collections::HashMap; use std::sync::Arc; use common_config::wal::{KafkaConfig, WalOptions}; +use common_telemetry::debug; use futures_util::StreamExt; use rskafka::client::consumer::{StartOffset, StreamConsumerBuilder}; use store_api::logstore::entry::Id as EntryId; @@ -82,6 +83,8 @@ impl LogStore for KafkaLogStore { /// Appends a batch of entries and returns a response containing a map where the key is a region id /// while the value is the id of the last successfully written entry of the region. async fn append_batch(&self, entries: Vec) -> Result { + debug!("LogStore handles append_batch with entries {:?}", entries); + if entries.is_empty() { return Ok(AppendBatchResponse::default()); } @@ -97,6 +100,8 @@ impl LogStore for KafkaLogStore { // Builds a record from entries belong to a region and produces them to kafka server. let region_ids = producers.keys().cloned().collect::>(); + debug!("Constructed producers for regions {:?}", region_ids); + let tasks = producers .into_values() .map(|producer| producer.produce(&self.client_manager)) @@ -108,6 +113,8 @@ impl LogStore for KafkaLogStore { .into_iter() .map(TryInto::try_into) .collect::>>()?; + debug!("The entries are appended at offsets {:?}", entry_ids); + Ok(AppendBatchResponse { last_entry_ids: region_ids.into_iter().zip(entry_ids).collect(), }) @@ -120,6 +127,11 @@ impl LogStore for KafkaLogStore { ns: &Self::Namespace, entry_id: EntryId, ) -> Result> { + debug!( + "LogStore handles read at entry_id {} for ns {:?}", + entry_id, ns + ); + let topic = ns.topic.clone(); let region_id = ns.region_id; @@ -131,12 +143,20 @@ impl LogStore for KafkaLogStore { .raw_client .clone(); + debug!("Got the client of topic {} for region {}", topic, region_id); + // Reads the entries starting from exactly the specified offset. let offset = Offset::try_from(entry_id)?.0; let mut stream_consumer = StreamConsumerBuilder::new(client, StartOffset::At(offset)) .with_max_batch_size(self.config.max_batch_size.as_bytes() as i32) .with_max_wait_ms(self.config.max_wait_time.as_millis() as i32) .build(); + + debug!( + "Built a stream consumer for topic {} at offset {}", + topic, offset + ); + let stream = async_stream::stream!({ while let Some(consume_result) = stream_consumer.next().await { yield handle_consume_result(consume_result, &topic, region_id, offset); diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index ffb3696a97e5..0fe0fce950f9 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -226,6 +226,10 @@ impl RegionOpener { ) -> Result> { let region_options = self.options.as_ref().unwrap().clone(); let wal_options = region_options.wal_options.clone(); + debug!( + "Try to open region {} with wal options {:?}", + self.region_id, wal_options + ); let region_manifest_options = self.manifest_options(config, ®ion_options)?; let Some(manifest_manager) = RegionManifestManager::open(region_manifest_options).await? @@ -256,6 +260,10 @@ impl RegionOpener { let flushed_entry_id = version.flushed_entry_id; let version_control = Arc::new(VersionControl::new(version)); if !self.skip_wal_replay { + info!( + "Start replaying memtable at flushed_entry_id {} for region {}", + flushed_entry_id, region_id + ); replay_memtable( wal, &wal_options, diff --git a/src/mito2/src/wal.rs b/src/mito2/src/wal.rs index ac17d3df5415..abdeb7914f0c 100644 --- a/src/mito2/src/wal.rs +++ b/src/mito2/src/wal.rs @@ -22,6 +22,7 @@ use api::v1::WalEntry; use async_stream::try_stream; use common_config::wal::WalOptions; use common_error::ext::BoxedError; +use common_telemetry::debug; use futures::stream::BoxStream; use futures::StreamExt; use prost::Message; @@ -73,6 +74,11 @@ impl Wal { start_id: EntryId, wal_options: &'a WalOptions, ) -> Result { + debug!( + "Scanning log entries for region {} starting from {} with wal_options {:?}", + region_id, start_id, wal_options + ); + let stream = try_stream!({ let namespace = self.store.namespace(region_id.into(), wal_options); let mut stream = self From 2cc692d381b7ec5eabcea65de9ffc5e9e165e458 Mon Sep 17 00:00:00 2001 From: niebayes Date: Tue, 26 Dec 2023 01:14:18 +0800 Subject: [PATCH 06/19] chore: add logs for topic manager --- .../meta/src/wal/kafka/topic_manager.rs | 64 ++++++++++++++----- 1 file changed, 48 insertions(+), 16 deletions(-) diff --git a/src/common/meta/src/wal/kafka/topic_manager.rs b/src/common/meta/src/wal/kafka/topic_manager.rs index 30d370b5d108..8523e1726c65 100644 --- a/src/common/meta/src/wal/kafka/topic_manager.rs +++ b/src/common/meta/src/wal/kafka/topic_manager.rs @@ -16,10 +16,12 @@ use std::collections::HashSet; use std::sync::Arc; use std::time::Duration; -use common_telemetry::debug; +use common_telemetry::{debug, error, info}; +use rskafka::client::error::Error as RsKafkaError; +use rskafka::client::error::ProtocolError::TopicAlreadyExists; use rskafka::client::ClientBuilder; use rskafka::BackoffConfig; -use snafu::{ensure, ResultExt}; +use snafu::{ensure, AsErrorSource, ResultExt}; use crate::error::{ BuildKafkaClientSnafu, BuildKafkaCtrlClientSnafu, CreateKafkaWalTopicSnafu, DecodeJsonSnafu, @@ -78,7 +80,11 @@ impl TopicManager { .await? .into_iter() .collect::>(); - debug!("Restored {} topics", created_topics.len()); + debug!( + "Restored {} topics {:?}", + created_topics.len(), + created_topics + ); // Creates missing topics. let to_be_created = topics @@ -91,6 +97,8 @@ impl TopicManager { Some(i) }) .collect::>(); + debug!("Should create {} topics", to_be_created.len()); + if !to_be_created.is_empty() { self.try_create_topics(topics, &to_be_created).await?; Self::persist_created_topics(topics, &self.kv_backend).await?; @@ -118,23 +126,47 @@ impl TopicManager { .controller_client() .context(BuildKafkaCtrlClientSnafu)?; - // Spawns tokio tasks for creating missing topics. - let tasks = to_be_created - .iter() - .map(|i| { - client.create_topic( - topics[*i].clone(), + for topic in topics { + debug!("Try to create topic {}", topic); + + let response = client + .create_topic( + topic, self.config.num_partitions, self.config.replication_factor, self.config.create_topic_timeout.as_millis() as i32, ) - }) - .collect::>(); - // FIXME(niebayes): try to create an already-exist topic would raise an error. - futures::future::try_join_all(tasks) - .await - .context(CreateKafkaWalTopicSnafu) - .map(|_| ()) + .await; + if let Err(e) = response { + if e.to_string() == TopicAlreadyExists.to_string() { + info!("The topic {} was already created", topic); + } else { + error!("Failed to create topic {}, source: {}", topic, e); + return Err(e).context(CreateKafkaWalTopicSnafu); + } + } else { + info!("Successfully created topic {}", topic); + } + } + + Ok(()) + + // Spawns tokio tasks for creating missing topics. + // let tasks = to_be_created + // .iter() + // .map(|i| { + // client.create_topic( + // topics[*i].clone(), + // self.config.num_partitions, + // self.config.replication_factor, + // self.config.create_topic_timeout.as_millis() as i32, + // ) + // }) + // .collect::>(); + // futures::future::try_join_all(tasks) + // .await + // .context(CreateKafkaWalTopicSnafu) + // .map(|_| ()) } /// Selects one topic from the topic pool through the topic selector. From 67fb017a70da497f4ea71006548af83b24ca6d5e Mon Sep 17 00:00:00 2001 From: niebayes Date: Tue, 26 Dec 2023 15:21:20 +0800 Subject: [PATCH 07/19] fix: properly terminate stream consumer --- .../meta/src/wal/kafka/topic_manager.rs | 3 +- src/log-store/src/error.rs | 21 ++++--- src/log-store/src/kafka.rs | 20 +++++++ src/log-store/src/kafka/log_store.rs | 56 ++++++++++++++----- src/log-store/src/kafka/record_utils.rs | 39 ++----------- src/mito2/src/region/opener.rs | 2 + src/store-api/src/logstore.rs | 1 - 7 files changed, 82 insertions(+), 60 deletions(-) diff --git a/src/common/meta/src/wal/kafka/topic_manager.rs b/src/common/meta/src/wal/kafka/topic_manager.rs index 8523e1726c65..fb6e2f2ab1d5 100644 --- a/src/common/meta/src/wal/kafka/topic_manager.rs +++ b/src/common/meta/src/wal/kafka/topic_manager.rs @@ -140,9 +140,10 @@ impl TopicManager { if let Err(e) = response { if e.to_string() == TopicAlreadyExists.to_string() { info!("The topic {} was already created", topic); + // FIXME(niebayes): properly handle topic already exists error. } else { error!("Failed to create topic {}, source: {}", topic, e); - return Err(e).context(CreateKafkaWalTopicSnafu); + // return Err(e).context(CreateKafkaWalTopicSnafu); } } else { info!("Successfully created topic {}", topic); diff --git a/src/log-store/src/error.rs b/src/log-store/src/error.rs index 1ee344046adc..bb332abd5bad 100644 --- a/src/log-store/src/error.rs +++ b/src/log-store/src/error.rs @@ -20,6 +20,8 @@ use common_macro::stack_trace_debug; use common_runtime::error::Error as RuntimeError; use snafu::{Location, Snafu}; +use crate::kafka::NamespaceImpl as KafkaNamespace; + #[derive(Snafu)] #[snafu(visibility(pub))] #[stack_trace_debug] @@ -152,16 +154,17 @@ pub enum Error { error: rskafka::client::producer::Error, }, - #[snafu(display( - "Failed to read a record from Kafka, topic: {}, region_id: {}, offset: {}", - topic, - region_id, - offset, - ))] + #[snafu(display("Failed to read a record from Kafka, ns: {}", ns))] ConsumeRecord { - topic: String, - region_id: u64, - offset: i64, + ns: KafkaNamespace, + location: Location, + #[snafu(source)] + error: rskafka::client::error::Error, + }, + + #[snafu(display("Failed to get the lastest offset, ns: {}", ns))] + GetOffset { + ns: KafkaNamespace, location: Location, #[snafu(source)] error: rskafka::client::error::Error, diff --git a/src/log-store/src/kafka.rs b/src/log-store/src/kafka.rs index 5fd4fe326eed..fefa7823c5c2 100644 --- a/src/log-store/src/kafka.rs +++ b/src/log-store/src/kafka.rs @@ -17,6 +17,8 @@ pub mod log_store; mod offset; mod record_utils; +use std::fmt::Display; + use common_meta::wal::KafkaWalTopic as Topic; use serde::{Deserialize, Serialize}; use store_api::logstore::entry::{Entry, Id as EntryId}; @@ -37,6 +39,12 @@ impl Namespace for NamespaceImpl { } } +impl Display for NamespaceImpl { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}/{}", self.topic, self.region_id) + } +} + /// Kafka Entry implementation. #[derive(Debug, PartialEq, Clone)] pub struct EntryImpl { @@ -64,3 +72,15 @@ impl Entry for EntryImpl { self.ns.clone() } } + +impl Display for EntryImpl { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "Entry (ns: {}, id: {}, data_len: {})", + self.ns, + self.id, + self.data.len() + ) + } +} diff --git a/src/log-store/src/kafka/log_store.rs b/src/log-store/src/kafka/log_store.rs index fd57f6678fad..cdbb36bbe28f 100644 --- a/src/log-store/src/kafka/log_store.rs +++ b/src/log-store/src/kafka/log_store.rs @@ -19,15 +19,17 @@ use common_config::wal::{KafkaConfig, WalOptions}; use common_telemetry::debug; use futures_util::StreamExt; use rskafka::client::consumer::{StartOffset, StreamConsumerBuilder}; +use rskafka::client::partition::OffsetAt; +use snafu::ResultExt; use store_api::logstore::entry::Id as EntryId; use store_api::logstore::entry_stream::SendableEntryStream; use store_api::logstore::namespace::Id as NamespaceId; use store_api::logstore::{AppendBatchResponse, AppendResponse, LogStore}; -use crate::error::{Error, Result}; +use crate::error::{ConsumeRecordSnafu, Error, GetOffsetSnafu, Result}; use crate::kafka::client_manager::{ClientManager, ClientManagerRef}; use crate::kafka::offset::Offset; -use crate::kafka::record_utils::{handle_consume_result, RecordProducer}; +use crate::kafka::record_utils::{decode_from_record, RecordProducer}; use crate::kafka::{EntryImpl, NamespaceImpl}; /// A log store backed by Kafka. @@ -127,11 +129,6 @@ impl LogStore for KafkaLogStore { ns: &Self::Namespace, entry_id: EntryId, ) -> Result> { - debug!( - "LogStore handles read at entry_id {} for ns {:?}", - entry_id, ns - ); - let topic = ns.topic.clone(); let region_id = ns.region_id; @@ -143,23 +140,54 @@ impl LogStore for KafkaLogStore { .raw_client .clone(); - debug!("Got the client of topic {} for region {}", topic, region_id); + // Gets the offset of the lastest record in the topic. Actually, it's the lastest record of the single partition in the topic. + // The read operation terminates when this record is consumed. + let end_offset = client + .get_offset(OffsetAt::Latest) + .await + .context(GetOffsetSnafu { ns: ns.clone() })?; + // Reads entries with offsets in the range [start_offset, end_offset]. + let start_offset = Offset::try_from(entry_id)?.0; + + // Abort if there're no new entries. + // FIXME(niebayes): how come this case happens? + if start_offset > end_offset { + debug!("No new entries in ns {}", ns); + return Ok(futures_util::stream::empty().boxed()); + } - // Reads the entries starting from exactly the specified offset. - let offset = Offset::try_from(entry_id)?.0; - let mut stream_consumer = StreamConsumerBuilder::new(client, StartOffset::At(offset)) + let mut stream_consumer = StreamConsumerBuilder::new(client, StartOffset::At(start_offset)) .with_max_batch_size(self.config.max_batch_size.as_bytes() as i32) .with_max_wait_ms(self.config.max_wait_time.as_millis() as i32) .build(); debug!( - "Built a stream consumer for topic {} at offset {}", - topic, offset + "Built a stream consumer for ns {} to consume entries in range [{}, {}]", + ns, start_offset, end_offset ); + let ns_clone = ns.clone(); let stream = async_stream::stream!({ while let Some(consume_result) = stream_consumer.next().await { - yield handle_consume_result(consume_result, &topic, region_id, offset); + let (record, offset) = consume_result.context(ConsumeRecordSnafu { + ns: ns_clone.clone(), + })?; + let entries = decode_from_record(record.record)?; + + // Filters entries by region id. + if let Some(entry) = entries.first() + && entry.ns.region_id == region_id + { + yield Ok(entries); + } else { + yield Ok(vec![]); + } + + // Terminates the stream if the entry with the end offset was read. + if offset >= end_offset { + debug!("Stream for ns {} terminates at offset {}", ns_clone, offset); + break; + } } }); Ok(Box::pin(stream)) diff --git a/src/log-store/src/kafka/record_utils.rs b/src/log-store/src/kafka/record_utils.rs index 37a66acfbdb3..3707b873f3e3 100644 --- a/src/log-store/src/kafka/record_utils.rs +++ b/src/log-store/src/kafka/record_utils.rs @@ -12,21 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. -use common_config::wal::KafkaWalTopic as Topic; -use rskafka::record::{Record, RecordAndOffset}; +use rskafka::record::Record; use serde::{Deserialize, Serialize}; use snafu::{ensure, OptionExt, ResultExt}; use crate::error::{ - ConsumeRecordSnafu, DecodeMetaSnafu, EmptyEntriesSnafu, EncodeMetaSnafu, GetClientSnafu, - MissingKeySnafu, MissingValueSnafu, ProduceRecordSnafu, Result, + DecodeMetaSnafu, EmptyEntriesSnafu, EncodeMetaSnafu, GetClientSnafu, MissingKeySnafu, + MissingValueSnafu, ProduceRecordSnafu, Result, }; use crate::kafka::client_manager::ClientManagerRef; use crate::kafka::offset::Offset; use crate::kafka::{EntryId, EntryImpl, NamespaceImpl}; -type ConsumeResult = std::result::Result<(RecordAndOffset, i64), rskafka::client::error::Error>; - /// Record metadata which will be serialized/deserialized to/from the `key` of a Record. #[derive(Debug, Serialize, Deserialize, PartialEq)] struct RecordMeta { @@ -125,7 +122,7 @@ fn encode_to_record(ns: NamespaceImpl, entries: Vec) -> Result Result> { +pub(crate) fn decode_from_record(record: Record) -> Result> { let key = record.key.context(MissingKeySnafu)?; let value = record.value.context(MissingValueSnafu)?; let meta: RecordMeta = serde_json::from_slice(&key).context(DecodeMetaSnafu)?; @@ -144,34 +141,6 @@ fn decode_from_record(record: Record) -> Result> { Ok(entries) } -/// Handles the result of a consume operation on a kafka topic. -pub(crate) fn handle_consume_result( - result: ConsumeResult, - topic: &Topic, - region_id: u64, - offset: i64, -) -> Result> { - match result { - Ok((record_and_offset, _)) => { - // Only produces entries belong to the region with the given region id. - // Since a record only contains entries from a single region, it suffices to check the first entry only. - let entries = decode_from_record(record_and_offset.record)?; - if let Some(entry) = entries.first() - && entry.id == region_id - { - Ok(entries) - } else { - Ok(vec![]) - } - } - Err(e) => Err(e).context(ConsumeRecordSnafu { - topic, - region_id, - offset, - }), - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index 0fe0fce950f9..d3ed421b23cc 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -399,6 +399,8 @@ pub(crate) async fn replay_memtable( } // set next_entry_id and write to memtable. + // FIXME(niebayes): figure out how to properly set last entry id when the replay is done. + last_entry_id = flushed_entry_id; region_write_ctx.set_next_entry_id(last_entry_id + 1); region_write_ctx.write_memtable(); diff --git a/src/store-api/src/logstore.rs b/src/store-api/src/logstore.rs index 98d7c00ab366..16809c26b1a1 100644 --- a/src/store-api/src/logstore.rs +++ b/src/store-api/src/logstore.rs @@ -49,7 +49,6 @@ pub trait LogStore: Send + Sync + 'static + std::fmt::Debug { /// Creates a new `EntryStream` to asynchronously generates `Entry` with ids /// starting from `id`. - // TODO(niebayes): update docs for entry id. async fn read( &self, ns: &Self::Namespace, From 57e3b04ad5689949b5202a4a6050b9fae64da4dc Mon Sep 17 00:00:00 2001 From: niebayes Date: Tue, 26 Dec 2023 15:42:11 +0800 Subject: [PATCH 08/19] fix: properly handle TopicAlreadyExists error --- .../meta/src/wal/kafka/topic_manager.rs | 94 +++++++++---------- 1 file changed, 45 insertions(+), 49 deletions(-) diff --git a/src/common/meta/src/wal/kafka/topic_manager.rs b/src/common/meta/src/wal/kafka/topic_manager.rs index fb6e2f2ab1d5..ba03c9fe17e9 100644 --- a/src/common/meta/src/wal/kafka/topic_manager.rs +++ b/src/common/meta/src/wal/kafka/topic_manager.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use std::time::Duration; use common_telemetry::{debug, error, info}; +use rskafka::client::controller::ControllerClient; use rskafka::client::error::Error as RsKafkaError; use rskafka::client::error::ProtocolError::TopicAlreadyExists; use rskafka::client::ClientBuilder; @@ -80,11 +81,6 @@ impl TopicManager { .await? .into_iter() .collect::>(); - debug!( - "Restored {} topics {:?}", - created_topics.len(), - created_topics - ); // Creates missing topics. let to_be_created = topics @@ -97,12 +93,10 @@ impl TopicManager { Some(i) }) .collect::>(); - debug!("Should create {} topics", to_be_created.len()); if !to_be_created.is_empty() { self.try_create_topics(topics, &to_be_created).await?; Self::persist_created_topics(topics, &self.kv_backend).await?; - debug!("Persisted {} topics", topics.len()); } Ok(()) } @@ -126,48 +120,12 @@ impl TopicManager { .controller_client() .context(BuildKafkaCtrlClientSnafu)?; - for topic in topics { - debug!("Try to create topic {}", topic); - - let response = client - .create_topic( - topic, - self.config.num_partitions, - self.config.replication_factor, - self.config.create_topic_timeout.as_millis() as i32, - ) - .await; - if let Err(e) = response { - if e.to_string() == TopicAlreadyExists.to_string() { - info!("The topic {} was already created", topic); - // FIXME(niebayes): properly handle topic already exists error. - } else { - error!("Failed to create topic {}, source: {}", topic, e); - // return Err(e).context(CreateKafkaWalTopicSnafu); - } - } else { - info!("Successfully created topic {}", topic); - } - } - - Ok(()) - - // Spawns tokio tasks for creating missing topics. - // let tasks = to_be_created - // .iter() - // .map(|i| { - // client.create_topic( - // topics[*i].clone(), - // self.config.num_partitions, - // self.config.replication_factor, - // self.config.create_topic_timeout.as_millis() as i32, - // ) - // }) - // .collect::>(); - // futures::future::try_join_all(tasks) - // .await - // .context(CreateKafkaWalTopicSnafu) - // .map(|_| ()) + // Try to create missing topics. + let tasks = to_be_created + .iter() + .map(|i| self.try_create_topic(&topics[*i], &client)) + .collect::>(); + futures::future::try_join_all(tasks).await.map(|_| ()) } /// Selects one topic from the topic pool through the topic selector. @@ -182,6 +140,32 @@ impl TopicManager { .collect() } + async fn try_create_topic(&self, topic: &Topic, client: &ControllerClient) -> Result<()> { + match client + .create_topic( + topic.clone(), + self.config.num_partitions, + self.config.replication_factor, + self.config.create_topic_timeout.as_millis() as i32, + ) + .await + { + Ok(_) => { + info!("Successfully created topic {}", topic); + Ok(()) + } + Err(e) => { + if Self::is_topic_already_exist_err(&e) { + info!("The topic {} already exists", topic); + Ok(()) + } else { + error!("Failed to create a topic {}, error {:?}", topic, e); + Err(e).context(CreateKafkaWalTopicSnafu) + } + } + } + } + async fn restore_created_topics(kv_backend: &KvBackendRef) -> Result> { kv_backend .get(CREATED_TOPICS_KEY.as_bytes()) @@ -203,6 +187,18 @@ impl TopicManager { .await .map(|_| ()) } + + fn is_topic_already_exist_err(e: &RsKafkaError) -> bool { + if let &RsKafkaError::ServerError { + protocol_error: TopicAlreadyExists, + .. + } = e + { + true + } else { + false + } + } } #[cfg(test)] From 6dd3602ae25d18790885bb1ec20b0b4fe1ccb4ac Mon Sep 17 00:00:00 2001 From: niebayes Date: Tue, 26 Dec 2023 16:13:35 +0800 Subject: [PATCH 09/19] fix: parse config file error --- config/datanode.example.toml | 2 +- config/metasrv.example.toml | 2 +- config/standalone.example.toml | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 342e10bfe19f..3562f071d63d 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -56,7 +56,7 @@ sync_write = false # max_wait_time = "100ms" # backoff_init = "500ms" # backoff_max = "10s" -# backoff_base = 2.0 +# backoff_base = 2 # backoff_deadline = "5mins" # Storage options, see `standalone.example.toml`. diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml index 120f19255f3a..c353ef664268 100644 --- a/config/metasrv.example.toml +++ b/config/metasrv.example.toml @@ -73,7 +73,7 @@ provider = "raft_engine" # The maximum backoff for kafka clients. # backoff_max = "10s" # Exponential backoff rate, i.e. next backoff = base * current backoff. -# backoff_base = 2.0 +# backoff_base = 2 # Stop reconnecting if the total wait time reaches the deadline. If this config is missing, the reconnecting won't terminate. # backoff_deadline = "5mins" diff --git a/config/standalone.example.toml b/config/standalone.example.toml index 40c372dd0cd5..b3898ebb390b 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -97,7 +97,7 @@ num_partitions = 1 replication_factor = 3 backoff_init = "500ms" backoff_max = "10s" -backoff_base = 2.0 +backoff_base = 2 backoff_deadline = "5mins" # WAL options for datanode. @@ -114,7 +114,7 @@ linger = "200ms" max_wait_time = "100ms" backoff_init = "500ms" backoff_max = "10s" -backoff_base = 2.0 +backoff_base = 2 backoff_deadline = "5mins" # WAL data directory From 224c6cc0f68b6b69354012902cf5cf3e81cc6c04 Mon Sep 17 00:00:00 2001 From: niebayes Date: Tue, 26 Dec 2023 16:25:43 +0800 Subject: [PATCH 10/19] fix: properly handle last entry id --- src/common/meta/src/ddl/create_table.rs | 7 +------ src/common/meta/src/key/datanode_table.rs | 6 ------ src/common/meta/src/wal/kafka/topic_manager.rs | 16 +++++++--------- src/datanode/src/datanode.rs | 6 +----- src/mito2/src/region/opener.rs | 2 -- src/mito2/src/wal.rs | 6 ------ 6 files changed, 9 insertions(+), 34 deletions(-) diff --git a/src/common/meta/src/ddl/create_table.rs b/src/common/meta/src/ddl/create_table.rs index 3496d98b998b..c73844fc8337 100644 --- a/src/common/meta/src/ddl/create_table.rs +++ b/src/common/meta/src/ddl/create_table.rs @@ -26,8 +26,8 @@ use common_procedure::error::{ ExternalSnafu, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu, }; use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status}; +use common_telemetry::info; use common_telemetry::tracing_context::TracingContext; -use common_telemetry::{debug, info}; use futures::future::join_all; use serde::{Deserialize, Serialize}; use snafu::{ensure, OptionExt, ResultExt}; @@ -475,11 +475,6 @@ impl CreateRequestBuilder { ); } - debug!( - "Set region options {:?} for region {}", - request.options, region_id - ); - Ok(request) } } diff --git a/src/common/meta/src/key/datanode_table.rs b/src/common/meta/src/key/datanode_table.rs index ee202758bffd..b2e25e014bc8 100644 --- a/src/common/meta/src/key/datanode_table.rs +++ b/src/common/meta/src/key/datanode_table.rs @@ -15,7 +15,6 @@ use std::collections::HashMap; use std::sync::Arc; -use common_telemetry::debug; use futures::stream::BoxStream; use futures::StreamExt; use serde::{Deserialize, Serialize}; @@ -186,11 +185,6 @@ impl DatanodeTableManager { }) .collect(); - debug!( - "Persist region wal options {:?} for table {}", - filtered_region_wal_options, table_id - ); - let key = DatanodeTableKey::new(datanode_id, table_id); let val = DatanodeTableValue::new( table_id, diff --git a/src/common/meta/src/wal/kafka/topic_manager.rs b/src/common/meta/src/wal/kafka/topic_manager.rs index ba03c9fe17e9..45b6115b0020 100644 --- a/src/common/meta/src/wal/kafka/topic_manager.rs +++ b/src/common/meta/src/wal/kafka/topic_manager.rs @@ -189,15 +189,13 @@ impl TopicManager { } fn is_topic_already_exist_err(e: &RsKafkaError) -> bool { - if let &RsKafkaError::ServerError { - protocol_error: TopicAlreadyExists, - .. - } = e - { - true - } else { - false - } + matches!( + e, + &RsKafkaError::ServerError { + protocol_error: TopicAlreadyExists, + .. + } + ) } } diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 0ba769edf559..0a0206eddc66 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -29,7 +29,7 @@ use common_meta::key::datanode_table::{DatanodeTableManager, DatanodeTableValue} use common_meta::kv_backend::KvBackendRef; pub use common_procedure::options::ProcedureConfig; use common_runtime::Runtime; -use common_telemetry::{debug, error, info, warn}; +use common_telemetry::{error, info, warn}; use file_engine::engine::FileRegionEngine; use futures::future; use futures_util::future::try_join_all; @@ -545,10 +545,6 @@ async fn open_all_regions( .and_then(|wal_options| { region_options.insert(WAL_OPTIONS_KEY.to_string(), wal_options.clone()) }); - debug!( - "Read region options {:?} for region {} from kv backend", - region_options, region_number - ); regions.push(( RegionId::new(table_value.table_id, region_number), diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index d3ed421b23cc..0fe0fce950f9 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -399,8 +399,6 @@ pub(crate) async fn replay_memtable( } // set next_entry_id and write to memtable. - // FIXME(niebayes): figure out how to properly set last entry id when the replay is done. - last_entry_id = flushed_entry_id; region_write_ctx.set_next_entry_id(last_entry_id + 1); region_write_ctx.write_memtable(); diff --git a/src/mito2/src/wal.rs b/src/mito2/src/wal.rs index abdeb7914f0c..ac17d3df5415 100644 --- a/src/mito2/src/wal.rs +++ b/src/mito2/src/wal.rs @@ -22,7 +22,6 @@ use api::v1::WalEntry; use async_stream::try_stream; use common_config::wal::WalOptions; use common_error::ext::BoxedError; -use common_telemetry::debug; use futures::stream::BoxStream; use futures::StreamExt; use prost::Message; @@ -74,11 +73,6 @@ impl Wal { start_id: EntryId, wal_options: &'a WalOptions, ) -> Result { - debug!( - "Scanning log entries for region {} starting from {} with wal_options {:?}", - region_id, start_id, wal_options - ); - let stream = try_stream!({ let namespace = self.store.namespace(region_id.into(), wal_options); let mut stream = self From c01da6cc8289ebc00db87649cbae7e4154f27c5b Mon Sep 17 00:00:00 2001 From: niebayes Date: Tue, 26 Dec 2023 16:29:35 +0800 Subject: [PATCH 11/19] chore: prepare for merge --- config/datanode.example.toml | 4 +- config/metasrv.example.toml | 4 +- config/standalone.example.toml | 88 ++++++++++++++++++---------------- 3 files changed, 52 insertions(+), 44 deletions(-) diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 3562f071d63d..bd3f8fc2eec9 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -50,10 +50,10 @@ read_batch_size = 128 sync_write = false # Kafka wal options, see `standalone.example.toml`. -# broker_endpoints = ["127.0.0.1:9090"] +# broker_endpoints = ["127.0.0.1:9092"] # max_batch_size = "4MB" # linger = "200ms" -# max_wait_time = "100ms" +# produce_record_timeout = "100ms" # backoff_init = "500ms" # backoff_max = "10s" # backoff_base = 2 diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml index c353ef664268..462e17581fc6 100644 --- a/config/metasrv.example.toml +++ b/config/metasrv.example.toml @@ -52,8 +52,8 @@ provider = "raft_engine" # There're none raft-engine wal config since meta srv only involves in remote wal currently. # Kafka wal config. -# The broker endpoints of the Kafka cluster. ["127.0.0.1:9090"] by default. -# broker_endpoints = ["127.0.0.1:9090"] +# The broker endpoints of the Kafka cluster. ["127.0.0.1:9092"] by default. +# broker_endpoints = ["127.0.0.1:9092"] # Number of topics to be created upon start. # num_topics = 64 # Topic selector type. diff --git a/config/standalone.example.toml b/config/standalone.example.toml index b3898ebb390b..897fbc68e03f 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -5,8 +5,8 @@ enable_telemetry = true # HTTP server options. [http] -# Server address, ".0.0.1:4000" by default. -addr = ".0.0.1:4000" +# Server address, "127.0.0.1:4000" by default. +addr = "127.0.0.1:4000" # HTTP request timeout, 30s by default. timeout = "30s" # HTTP request body limit, 64Mb by default. @@ -15,8 +15,8 @@ body_limit = "64MB" # gRPC server options. [grpc] -# Server address, ".0.0.1:4001" by default. -addr = ".0.0.1:4001" +# Server address, "127.0.0.1:4001" by default. +addr = "127.0.0.1:4001" # The number of server worker threads, 8 by default. runtime_size = 8 @@ -24,8 +24,8 @@ runtime_size = 8 [mysql] # Whether to enable enable = true -# Server address, ".0.0.1:4002" by default. -addr = ".0.0.1:4002" +# Server address, "127.0.0.1:4002" by default. +addr = "127.0.0.1:4002" # The number of server worker threads, 2 by default. runtime_size = 2 @@ -47,8 +47,8 @@ key_path = "" [postgres] # Whether to enable enable = true -# Server address, ".0.0.1:4003" by default. -addr = ".0.0.1:4003" +# Server address, "127.0.0.1:4003" by default. +addr = "127.0.0.1:4003" # The number of server worker threads, 2 by default. runtime_size = 2 @@ -65,8 +65,8 @@ key_path = "" [opentsdb] # Whether to enable enable = true -# OpenTSDB telnet API server address, ".0.0.1:4242" by default. -addr = ".0.0.1:4242" +# OpenTSDB telnet API server address, "127.0.0.1:4242" by default. +addr = "127.0.0.1:4242" # The number of server worker threads, 2 by default. runtime_size = 2 @@ -80,42 +80,48 @@ enable = true # Whether to enable Prometheus remote write and read in HTTP API, true by default. enable = true -[wal_meta] +[wal] # Available wal providers: # - "raft_engine" (default) # - "kafka" -provider = "kafka" +provider = "raft_engine" # There're none raft-engine wal config since meta srv only involves in remote wal currently. -# Kafka wal config. -broker_endpoints = ["192.168.50.178:29092"] -num_topics = 64 -selector_type = "round_robin" -topic_name_prefix = "greptimedb_wal_topic" -num_partitions = 1 -replication_factor = 3 -backoff_init = "500ms" -backoff_max = "10s" -backoff_base = 2 -backoff_deadline = "5mins" - -# WAL options for datanode. -[wal_datanode] -# Available wal providers: -# - "RaftEngine" (default) -# - "Kafka" -provider = "kafka" - # Kafka wal options. -broker_endpoints = ["192.168.50.178:29092"] -max_batch_size = "4MB" -linger = "200ms" -max_wait_time = "100ms" -backoff_init = "500ms" -backoff_max = "10s" -backoff_base = 2 -backoff_deadline = "5mins" +# The broker endpoints of the Kafka cluster. ["127.0.0.1:9092"] by default. +# broker_endpoints = ["127.0.0.1:9092"] + +# Number of topics to be created upon start. +# num_topics = 64 +# Topic selector type. +# Available selector types: +# - "round_robin" (default) +# selector_type = "round_robin" +# A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`. +# topic_name_prefix = "greptimedb_wal_topic" +# Number of partitions per topic. +# num_partitions = 1 +# Expected number of replicas of each partition. +# replication_factor = 3 + +# The maximum log size a kafka batch producer could buffer. +# max_batch_size = "4MB" +# The linger duration of a kafka batch producer. +# linger = "200ms" +# The maximum amount of time (in milliseconds) to wait for Kafka records to be returned. +# produce_record_timeout = "100ms" +# Above which a topic creation operation will be cancelled. +# create_topic_timeout = "30s" + +# The initial backoff for kafka clients. +# backoff_init = "500ms" +# The maximum backoff for kafka clients. +# backoff_max = "10s" +# Exponential backoff rate, i.e. next backoff = base * current backoff. +# backoff_base = 2 +# Stop reconnecting if the total wait time reaches the deadline. If this config is missing, the reconnecting won't terminate. +# backoff_deadline = "5mins" # WAL data directory # dir = "/tmp/greptimedb/wal" @@ -212,6 +218,8 @@ parallel_scan_channel_size = 32 # otlp_endpoint = "localhost:4317" # The percentage of tracing will be sampled and exported. Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1. ratio > 1 are treated as 1. Fractions < 0 are treated as 0 # tracing_sample_ratio = 1.0 +# Whether to append logs to stdout. Defaults to true. +# append_stdout = true # Standalone export the metrics generated by itself # encoded to Prometheus remote-write format @@ -221,7 +229,7 @@ parallel_scan_channel_size = 32 # whether enable export metrics, default is false # enable = false # The url of metrics export endpoint, default is `frontend` default HTTP endpoint. -# endpoint = ".0.0.1:4000" +# endpoint = "127.0.0.1:4000" # The database name of exported metrics stores, user needs to specify a valid database # db = "" # The interval of export metrics From 0d7fb7efbdc7c1c29d788818a8139db0bbcd0a1e Mon Sep 17 00:00:00 2001 From: niebayes Date: Tue, 26 Dec 2023 16:47:33 +0800 Subject: [PATCH 12/19] fix: test --- src/cmd/src/standalone.rs | 5 +---- src/log-store/src/kafka/log_store.rs | 1 - src/mito2/src/region/opener.rs | 4 ---- 3 files changed, 1 insertion(+), 9 deletions(-) diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index d16920d5561e..812b64d301e1 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -524,10 +524,7 @@ mod tests { enable_memory_catalog = true - [wal_meta] - provider = "raft_engine" - - [wal_datanode] + [wal] provider = "raft_engine" dir = "/tmp/greptimedb/test/wal" file_size = "1GB" diff --git a/src/log-store/src/kafka/log_store.rs b/src/log-store/src/kafka/log_store.rs index 5390c308c378..a1ae440589df 100644 --- a/src/log-store/src/kafka/log_store.rs +++ b/src/log-store/src/kafka/log_store.rs @@ -102,7 +102,6 @@ impl LogStore for KafkaLogStore { // Builds a record from entries belong to a region and produces them to kafka server. let region_ids = producers.keys().cloned().collect::>(); - debug!("Constructed producers for regions {:?}", region_ids); let tasks = producers .into_values() diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index 0fe0fce950f9..7b969d578d00 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -226,10 +226,6 @@ impl RegionOpener { ) -> Result> { let region_options = self.options.as_ref().unwrap().clone(); let wal_options = region_options.wal_options.clone(); - debug!( - "Try to open region {} with wal options {:?}", - self.region_id, wal_options - ); let region_manifest_options = self.manifest_options(config, ®ion_options)?; let Some(manifest_manager) = RegionManifestManager::open(region_manifest_options).await? From 0e6fd8bf82fc792f8caccd19510bd252d3705b5b Mon Sep 17 00:00:00 2001 From: niebayes Date: Tue, 26 Dec 2023 17:27:50 +0800 Subject: [PATCH 13/19] fix: typo --- config/metasrv.example.toml | 2 +- config/standalone.example.toml | 2 +- src/log-store/src/error.rs | 2 +- src/log-store/src/kafka/log_store.rs | 28 +++++++++++++++++++++------- 4 files changed, 24 insertions(+), 10 deletions(-) diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml index 462e17581fc6..ff05a9c095e8 100644 --- a/config/metasrv.example.toml +++ b/config/metasrv.example.toml @@ -65,7 +65,7 @@ provider = "raft_engine" # Number of partitions per topic. # num_partitions = 1 # Expected number of replicas of each partition. -# replication_factor = 3 +# replication_factor = 1 # Above which a topic creation operation will be cancelled. # create_topic_timeout = "30s" # The initial backoff for kafka clients. diff --git a/config/standalone.example.toml b/config/standalone.example.toml index 897fbc68e03f..7db8477ec78e 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -103,7 +103,7 @@ provider = "raft_engine" # Number of partitions per topic. # num_partitions = 1 # Expected number of replicas of each partition. -# replication_factor = 3 +# replication_factor = 1 # The maximum log size a kafka batch producer could buffer. # max_batch_size = "4MB" diff --git a/src/log-store/src/error.rs b/src/log-store/src/error.rs index bb332abd5bad..7f475e2076a8 100644 --- a/src/log-store/src/error.rs +++ b/src/log-store/src/error.rs @@ -162,7 +162,7 @@ pub enum Error { error: rskafka::client::error::Error, }, - #[snafu(display("Failed to get the lastest offset, ns: {}", ns))] + #[snafu(display("Failed to get the latest offset, ns: {}", ns))] GetOffset { ns: KafkaNamespace, location: Location, diff --git a/src/log-store/src/kafka/log_store.rs b/src/log-store/src/kafka/log_store.rs index a1ae440589df..69b052aad55d 100644 --- a/src/log-store/src/kafka/log_store.rs +++ b/src/log-store/src/kafka/log_store.rs @@ -139,13 +139,15 @@ impl LogStore for KafkaLogStore { .raw_client .clone(); - // Gets the offset of the lastest record in the topic. Actually, it's the lastest record of the single partition in the topic. + // Gets the offset of the latest record in the topic. Actually, it's the latest record of the single partition in the topic. // The read operation terminates when this record is consumed. + // Warning: the `get_offset` returns the end offset of the latest record. For our usage, it should be decremented. let end_offset = client .get_offset(OffsetAt::Latest) .await - .context(GetOffsetSnafu { ns: ns.clone() })?; - // Reads entries with offsets in the range [start_offset, end_offset]. + .context(GetOffsetSnafu { ns: ns.clone() })? + - 1; + // Reads entries with offsets in the range [start_offset, end_offset). let start_offset = Offset::try_from(entry_id)?.0; // Abort if there're no new entries. @@ -161,16 +163,25 @@ impl LogStore for KafkaLogStore { .build(); debug!( - "Built a stream consumer for ns {} to consume entries in range [{}, {}]", + "Built a stream consumer for ns {} to consume entries in range [{}, {})", ns, start_offset, end_offset ); let ns_clone = ns.clone(); let stream = async_stream::stream!({ while let Some(consume_result) = stream_consumer.next().await { - let (record, offset) = consume_result.context(ConsumeRecordSnafu { + // Each next will prdoce a `RecordAndOffset` and a high watermark offset. + // The `RecordAndOffset` contains the record data and its start offset. + // The high watermark offset is the end offset of the latest record in the partition. + let (record, high_watermark) = consume_result.context(ConsumeRecordSnafu { ns: ns_clone.clone(), })?; + let record_offset = record.offset; + debug!( + "Read a record at offset {} for ns {}, high watermark: {}", + record_offset, ns_clone, high_watermark + ); + let entries = decode_from_record(record.record)?; // Filters entries by region id. @@ -183,8 +194,11 @@ impl LogStore for KafkaLogStore { } // Terminates the stream if the entry with the end offset was read. - if offset >= end_offset { - debug!("Stream for ns {} terminates at offset {}", ns_clone, offset); + if record_offset >= end_offset { + debug!( + "Stream consumer for ns {} terminates at offset {}", + ns_clone, record_offset + ); break; } } From e9764021542c2e243149af4c0259ee53d2737013 Mon Sep 17 00:00:00 2001 From: niebayes Date: Tue, 26 Dec 2023 17:34:37 +0800 Subject: [PATCH 14/19] fix: set replication_factor properly --- src/common/config/src/wal/kafka.rs | 7 +++++-- src/common/meta/src/wal.rs | 4 ++-- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/src/common/config/src/wal/kafka.rs b/src/common/config/src/wal/kafka.rs index d1d1a615a370..e93aa6cb2271 100644 --- a/src/common/config/src/wal/kafka.rs +++ b/src/common/config/src/wal/kafka.rs @@ -121,13 +121,16 @@ pub struct StandaloneKafkaConfig { impl Default for StandaloneKafkaConfig { fn default() -> Self { + let base = KafkaConfig::default(); + let replication_factor = base.broker_endpoints.len() as i16; + Self { - base: KafkaConfig::default(), + base, num_topics: 64, selector_type: TopicSelectorType::RoundRobin, topic_name_prefix: "greptimedb_wal_topic".to_string(), num_partitions: 1, - replication_factor: 3, + replication_factor, create_topic_timeout: Duration::from_secs(30), } } diff --git a/src/common/meta/src/wal.rs b/src/common/meta/src/wal.rs index 1e394e847985..853c6fa5df63 100644 --- a/src/common/meta/src/wal.rs +++ b/src/common/meta/src/wal.rs @@ -89,7 +89,7 @@ mod tests { selector_type = "round_robin" topic_name_prefix = "greptimedb_wal_topic" num_partitions = 1 - replication_factor = 3 + replication_factor = 1 create_topic_timeout = "30s" backoff_init = "500ms" backoff_max = "10s" @@ -103,7 +103,7 @@ mod tests { selector_type: TopicSelectorType::RoundRobin, topic_name_prefix: "greptimedb_wal_topic".to_string(), num_partitions: 1, - replication_factor: 3, + replication_factor: 1, create_topic_timeout: Duration::from_secs(30), backoff: KafkaBackoffConfig { init: Duration::from_millis(500), From 0b1bc4e72dec93a52114d077520bbbe37753112a Mon Sep 17 00:00:00 2001 From: niebayes Date: Tue, 26 Dec 2023 17:38:56 +0800 Subject: [PATCH 15/19] fix: CR --- src/log-store/src/kafka/log_store.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/log-store/src/kafka/log_store.rs b/src/log-store/src/kafka/log_store.rs index 69b052aad55d..03b2dbbaf0ae 100644 --- a/src/log-store/src/kafka/log_store.rs +++ b/src/log-store/src/kafka/log_store.rs @@ -16,7 +16,7 @@ use std::collections::HashMap; use std::sync::Arc; use common_config::wal::{KafkaConfig, WalOptions}; -use common_telemetry::debug; +use common_telemetry::{debug, warn}; use futures_util::StreamExt; use rskafka::client::consumer::{StartOffset, StreamConsumerBuilder}; use rskafka::client::partition::OffsetAt; @@ -153,7 +153,10 @@ impl LogStore for KafkaLogStore { // Abort if there're no new entries. // FIXME(niebayes): how come this case happens? if start_offset > end_offset { - debug!("No new entries in ns {}", ns); + warn!( + "No new entries for ns {} in range [{}, {})", + ns, start_offset, end_offset + ); return Ok(futures_util::stream::empty().boxed()); } From 093a3e0038c6ffe18c96734768448f55147ddd6b Mon Sep 17 00:00:00 2001 From: niebayes Date: Tue, 26 Dec 2023 19:04:59 +0800 Subject: [PATCH 16/19] test: tmp for test --- config/datanode.example.toml | 28 ++++++++++---------- config/metasrv.example.toml | 38 +++++++++------------------- src/common/config/src/wal.rs | 2 +- src/common/meta/src/wal.rs | 9 +++++-- src/log-store/src/kafka/log_store.rs | 1 + 5 files changed, 35 insertions(+), 43 deletions(-) diff --git a/config/datanode.example.toml b/config/datanode.example.toml index bd3f8fc2eec9..c71f84e48a9a 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -39,25 +39,25 @@ tcp_nodelay = true # except those corresponding to the chosen one. [wal] # WAL data directory -provider = "raft_engine" +provider = "kafka" # Raft-engine wal options, see `standalone.example.toml`. # dir = "/tmp/greptimedb/wal" -file_size = "256MB" -purge_threshold = "4GB" -purge_interval = "10m" -read_batch_size = 128 -sync_write = false +# file_size = "256MB" +# purge_threshold = "4GB" +# purge_interval = "10m" +# read_batch_size = 128 +# sync_write = false # Kafka wal options, see `standalone.example.toml`. -# broker_endpoints = ["127.0.0.1:9092"] -# max_batch_size = "4MB" -# linger = "200ms" -# produce_record_timeout = "100ms" -# backoff_init = "500ms" -# backoff_max = "10s" -# backoff_base = 2 -# backoff_deadline = "5mins" +broker_endpoints = ["127.0.0.1:9092"] +max_batch_size = "4MB" +linger = "200ms" +produce_record_timeout = "100ms" +backoff_init = "500ms" +backoff_max = "10s" +backoff_base = 2 +backoff_deadline = "5mins" # Storage options, see `standalone.example.toml`. [storage] diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml index ff05a9c095e8..93d8c05daf86 100644 --- a/config/metasrv.example.toml +++ b/config/metasrv.example.toml @@ -47,35 +47,21 @@ first_heartbeat_estimate = "1000ms" # Available wal providers: # - "raft_engine" (default) # - "kafka" -provider = "raft_engine" +provider = "kafka" # There're none raft-engine wal config since meta srv only involves in remote wal currently. -# Kafka wal config. -# The broker endpoints of the Kafka cluster. ["127.0.0.1:9092"] by default. -# broker_endpoints = ["127.0.0.1:9092"] -# Number of topics to be created upon start. -# num_topics = 64 -# Topic selector type. -# Available selector types: -# - "round_robin" (default) -# selector_type = "round_robin" -# A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`. -# topic_name_prefix = "greptimedb_wal_topic" -# Number of partitions per topic. -# num_partitions = 1 -# Expected number of replicas of each partition. -# replication_factor = 1 -# Above which a topic creation operation will be cancelled. -# create_topic_timeout = "30s" -# The initial backoff for kafka clients. -# backoff_init = "500ms" -# The maximum backoff for kafka clients. -# backoff_max = "10s" -# Exponential backoff rate, i.e. next backoff = base * current backoff. -# backoff_base = 2 -# Stop reconnecting if the total wait time reaches the deadline. If this config is missing, the reconnecting won't terminate. -# backoff_deadline = "5mins" +broker_endpoints = ["127.0.0.1:9092"] +num_topics = 64 +selector_type = "round_robin" +topic_name_prefix = "greptimedb_wal_topic" +num_partitions = 1 +replication_factor = 1 +create_topic_timeout = "30s" +backoff_init = "500ms" +backoff_max = "10s" +backoff_base = 2 +backoff_deadline = "5mins" # Metasrv export the metrics generated by itself # encoded to Prometheus remote-write format diff --git a/src/common/config/src/wal.rs b/src/common/config/src/wal.rs index f9c492758e63..7dcd962d0fc9 100644 --- a/src/common/config/src/wal.rs +++ b/src/common/config/src/wal.rs @@ -46,7 +46,7 @@ impl From for WalConfig { impl Default for WalConfig { fn default() -> Self { - WalConfig::RaftEngine(RaftEngineConfig::default()) + WalConfig::Kafka(KafkaConfig::default()) } } diff --git a/src/common/meta/src/wal.rs b/src/common/meta/src/wal.rs index 853c6fa5df63..5bd0c1a57feb 100644 --- a/src/common/meta/src/wal.rs +++ b/src/common/meta/src/wal.rs @@ -29,14 +29,19 @@ pub use crate::wal::options_allocator::{ }; /// Wal config for metasrv. -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] #[serde(tag = "provider", rename_all = "snake_case")] pub enum WalConfig { - #[default] RaftEngine, Kafka(KafkaConfig), } +impl Default for WalConfig { + fn default() -> Self { + WalConfig::Kafka(KafkaConfig::default()) + } +} + impl From for WalConfig { fn from(value: StandaloneWalConfig) -> Self { match value { diff --git a/src/log-store/src/kafka/log_store.rs b/src/log-store/src/kafka/log_store.rs index 03b2dbbaf0ae..73b0fe1de2a9 100644 --- a/src/log-store/src/kafka/log_store.rs +++ b/src/log-store/src/kafka/log_store.rs @@ -142,6 +142,7 @@ impl LogStore for KafkaLogStore { // Gets the offset of the latest record in the topic. Actually, it's the latest record of the single partition in the topic. // The read operation terminates when this record is consumed. // Warning: the `get_offset` returns the end offset of the latest record. For our usage, it should be decremented. + // See: https://kafka.apache.org/36/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#endOffsets(java.util.Collection) let end_offset = client .get_offset(OffsetAt::Latest) .await From 0504f67bd53053a69d4d2c9b2090ad179b3af9b3 Mon Sep 17 00:00:00 2001 From: niebayes Date: Tue, 26 Dec 2023 19:28:57 +0800 Subject: [PATCH 17/19] Revert "test: tmp for test" This reverts commit 093a3e0038c6ffe18c96734768448f55147ddd6b. --- config/datanode.example.toml | 28 ++++++++++---------- config/metasrv.example.toml | 38 +++++++++++++++++++--------- src/common/config/src/wal.rs | 2 +- src/common/meta/src/wal.rs | 9 ++----- src/log-store/src/kafka/log_store.rs | 1 - 5 files changed, 43 insertions(+), 35 deletions(-) diff --git a/config/datanode.example.toml b/config/datanode.example.toml index c71f84e48a9a..bd3f8fc2eec9 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -39,25 +39,25 @@ tcp_nodelay = true # except those corresponding to the chosen one. [wal] # WAL data directory -provider = "kafka" +provider = "raft_engine" # Raft-engine wal options, see `standalone.example.toml`. # dir = "/tmp/greptimedb/wal" -# file_size = "256MB" -# purge_threshold = "4GB" -# purge_interval = "10m" -# read_batch_size = 128 -# sync_write = false +file_size = "256MB" +purge_threshold = "4GB" +purge_interval = "10m" +read_batch_size = 128 +sync_write = false # Kafka wal options, see `standalone.example.toml`. -broker_endpoints = ["127.0.0.1:9092"] -max_batch_size = "4MB" -linger = "200ms" -produce_record_timeout = "100ms" -backoff_init = "500ms" -backoff_max = "10s" -backoff_base = 2 -backoff_deadline = "5mins" +# broker_endpoints = ["127.0.0.1:9092"] +# max_batch_size = "4MB" +# linger = "200ms" +# produce_record_timeout = "100ms" +# backoff_init = "500ms" +# backoff_max = "10s" +# backoff_base = 2 +# backoff_deadline = "5mins" # Storage options, see `standalone.example.toml`. [storage] diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml index 93d8c05daf86..ff05a9c095e8 100644 --- a/config/metasrv.example.toml +++ b/config/metasrv.example.toml @@ -47,21 +47,35 @@ first_heartbeat_estimate = "1000ms" # Available wal providers: # - "raft_engine" (default) # - "kafka" -provider = "kafka" +provider = "raft_engine" # There're none raft-engine wal config since meta srv only involves in remote wal currently. -broker_endpoints = ["127.0.0.1:9092"] -num_topics = 64 -selector_type = "round_robin" -topic_name_prefix = "greptimedb_wal_topic" -num_partitions = 1 -replication_factor = 1 -create_topic_timeout = "30s" -backoff_init = "500ms" -backoff_max = "10s" -backoff_base = 2 -backoff_deadline = "5mins" +# Kafka wal config. +# The broker endpoints of the Kafka cluster. ["127.0.0.1:9092"] by default. +# broker_endpoints = ["127.0.0.1:9092"] +# Number of topics to be created upon start. +# num_topics = 64 +# Topic selector type. +# Available selector types: +# - "round_robin" (default) +# selector_type = "round_robin" +# A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`. +# topic_name_prefix = "greptimedb_wal_topic" +# Number of partitions per topic. +# num_partitions = 1 +# Expected number of replicas of each partition. +# replication_factor = 1 +# Above which a topic creation operation will be cancelled. +# create_topic_timeout = "30s" +# The initial backoff for kafka clients. +# backoff_init = "500ms" +# The maximum backoff for kafka clients. +# backoff_max = "10s" +# Exponential backoff rate, i.e. next backoff = base * current backoff. +# backoff_base = 2 +# Stop reconnecting if the total wait time reaches the deadline. If this config is missing, the reconnecting won't terminate. +# backoff_deadline = "5mins" # Metasrv export the metrics generated by itself # encoded to Prometheus remote-write format diff --git a/src/common/config/src/wal.rs b/src/common/config/src/wal.rs index 7dcd962d0fc9..f9c492758e63 100644 --- a/src/common/config/src/wal.rs +++ b/src/common/config/src/wal.rs @@ -46,7 +46,7 @@ impl From for WalConfig { impl Default for WalConfig { fn default() -> Self { - WalConfig::Kafka(KafkaConfig::default()) + WalConfig::RaftEngine(RaftEngineConfig::default()) } } diff --git a/src/common/meta/src/wal.rs b/src/common/meta/src/wal.rs index 5bd0c1a57feb..853c6fa5df63 100644 --- a/src/common/meta/src/wal.rs +++ b/src/common/meta/src/wal.rs @@ -29,19 +29,14 @@ pub use crate::wal::options_allocator::{ }; /// Wal config for metasrv. -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)] #[serde(tag = "provider", rename_all = "snake_case")] pub enum WalConfig { + #[default] RaftEngine, Kafka(KafkaConfig), } -impl Default for WalConfig { - fn default() -> Self { - WalConfig::Kafka(KafkaConfig::default()) - } -} - impl From for WalConfig { fn from(value: StandaloneWalConfig) -> Self { match value { diff --git a/src/log-store/src/kafka/log_store.rs b/src/log-store/src/kafka/log_store.rs index 73b0fe1de2a9..03b2dbbaf0ae 100644 --- a/src/log-store/src/kafka/log_store.rs +++ b/src/log-store/src/kafka/log_store.rs @@ -142,7 +142,6 @@ impl LogStore for KafkaLogStore { // Gets the offset of the latest record in the topic. Actually, it's the latest record of the single partition in the topic. // The read operation terminates when this record is consumed. // Warning: the `get_offset` returns the end offset of the latest record. For our usage, it should be decremented. - // See: https://kafka.apache.org/36/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#endOffsets(java.util.Collection) let end_offset = client .get_offset(OffsetAt::Latest) .await From e64c12dd55062956191e6d00f6446a8a198bb7fa Mon Sep 17 00:00:00 2001 From: niebayes Date: Tue, 26 Dec 2023 19:30:35 +0800 Subject: [PATCH 18/19] fix: serde --- src/log-store/src/kafka/log_store.rs | 1 + src/meta-srv/src/selector.rs | 1 + 2 files changed, 2 insertions(+) diff --git a/src/log-store/src/kafka/log_store.rs b/src/log-store/src/kafka/log_store.rs index 03b2dbbaf0ae..73b0fe1de2a9 100644 --- a/src/log-store/src/kafka/log_store.rs +++ b/src/log-store/src/kafka/log_store.rs @@ -142,6 +142,7 @@ impl LogStore for KafkaLogStore { // Gets the offset of the latest record in the topic. Actually, it's the latest record of the single partition in the topic. // The read operation terminates when this record is consumed. // Warning: the `get_offset` returns the end offset of the latest record. For our usage, it should be decremented. + // See: https://kafka.apache.org/36/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#endOffsets(java.util.Collection) let end_offset = client .get_offset(OffsetAt::Latest) .await diff --git a/src/meta-srv/src/selector.rs b/src/meta-srv/src/selector.rs index 44ce0b2c8cb4..3873722a27b3 100644 --- a/src/meta-srv/src/selector.rs +++ b/src/meta-srv/src/selector.rs @@ -56,6 +56,7 @@ impl Default for SelectorOptions { } #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)] +#[serde(rename_all = "snake_case")] pub enum SelectorType { #[default] LoadBased, From 4df0d4fa7f7f7dd7df0202639090691fafe64579 Mon Sep 17 00:00:00 2001 From: niebayes Date: Tue, 26 Dec 2023 20:25:21 +0800 Subject: [PATCH 19/19] fix selector type deserialize --- src/meta-srv/src/selector.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/meta-srv/src/selector.rs b/src/meta-srv/src/selector.rs index 3873722a27b3..4c3a91caef2b 100644 --- a/src/meta-srv/src/selector.rs +++ b/src/meta-srv/src/selector.rs @@ -56,7 +56,7 @@ impl Default for SelectorOptions { } #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)] -#[serde(rename_all = "snake_case")] +#[serde(try_from = "String")] pub enum SelectorType { #[default] LoadBased, @@ -78,6 +78,14 @@ impl TryFrom<&str> for SelectorType { } } +impl TryFrom for SelectorType { + type Error = error::Error; + + fn try_from(value: String) -> Result { + SelectorType::try_from(value.as_str()) + } +} + #[cfg(test)] mod tests { use super::SelectorType;