diff --git a/.github/doc-label-config.yml b/.github/doc-label-config.yml new file mode 100644 index 000000000000..60f20533a1d2 --- /dev/null +++ b/.github/doc-label-config.yml @@ -0,0 +1,4 @@ +Doc not needed: + - '- \[x\] This PR does not require documentation updates.' +Doc update required: + - '- \[ \] This PR does not require documentation updates.' diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md index 1c62e4ad4105..c0bc418013d2 100644 --- a/.github/pull_request_template.md +++ b/.github/pull_request_template.md @@ -15,5 +15,6 @@ Please explain IN DETAIL what the changes are in this PR and why they are needed - [ ] I have written the necessary rustdoc comments. - [ ] I have added the necessary unit tests and integration tests. +- [ ] This PR does not require documentation updates. ## Refer to a related PR or issue link (optional) diff --git a/.github/workflows/doc-label.yml b/.github/workflows/doc-label.yml new file mode 100644 index 000000000000..298c7e3cecce --- /dev/null +++ b/.github/workflows/doc-label.yml @@ -0,0 +1,20 @@ +name: "PR Doc Labeler" +on: + pull_request: + types: [opened, edited, synchronize, ready_for_review, auto_merge_enabled, labeled, unlabeled] + +permissions: + pull-requests: write + contents: read + +jobs: + triage: + if: ${{ github.repository == 'GreptimeTeam/greptimedb' }} + runs-on: ubuntu-latest + steps: + - uses: github/issue-labeler@v3.3 + with: + configuration-path: .github/doc-label-config.yml + enable-versioned-regex: false + repo-token: ${{ secrets.GITHUB_TOKEN }} + sync-labels: 1 diff --git a/Cargo.lock b/Cargo.lock index ba1d36fc121b..4892674eb661 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4873,7 +4873,6 @@ dependencies = [ name = "metric-engine" version = "0.5.0" dependencies = [ - "ahash 0.8.6", "api", "aquamarine", "async-trait", @@ -4889,6 +4888,7 @@ dependencies = [ "datatypes", "lazy_static", "mito2", + "mur3", "object-store", "prometheus", "serde_json", @@ -5066,6 +5066,12 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" +[[package]] +name = "mur3" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97af489e1e21b68de4c390ecca6703318bc1aa16e9733bcb62c089b73c6fbb1b" + [[package]] name = "mysql-common-derive" version = "0.30.2" diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index ce4a615ba102..d16920d5561e 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -18,7 +18,8 @@ use std::{fs, path}; use async_trait::async_trait; use clap::Parser; use common_catalog::consts::MIN_USER_TABLE_ID; -use common_config::{metadata_store_dir, KvBackendConfig, WalConfig as DatanodeWalConfig}; +use common_config::wal::StandaloneWalConfig; +use common_config::{metadata_store_dir, KvBackendConfig}; use common_meta::cache_invalidator::DummyCacheInvalidator; use common_meta::datanode_manager::DatanodeManagerRef; use common_meta::ddl::{DdlTaskExecutorRef, TableMetadataAllocatorRef}; @@ -27,9 +28,7 @@ use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::KvBackendRef; use common_meta::region_keeper::MemoryRegionKeeper; use common_meta::sequence::SequenceBuilder; -use common_meta::wal::{ - WalConfig as MetaSrvWalConfig, WalOptionsAllocator, WalOptionsAllocatorRef, -}; +use common_meta::wal::{WalOptionsAllocator, WalOptionsAllocatorRef}; use common_procedure::ProcedureManagerRef; use common_telemetry::info; use common_telemetry::logging::LoggingOptions; @@ -106,8 +105,7 @@ pub struct StandaloneOptions { pub opentsdb: OpentsdbOptions, pub influxdb: InfluxdbOptions, pub prom_store: PromStoreOptions, - pub wal_meta: MetaSrvWalConfig, - pub wal_datanode: DatanodeWalConfig, + pub wal: StandaloneWalConfig, pub storage: StorageConfig, pub metadata_store: KvBackendConfig, pub procedure: ProcedureConfig, @@ -130,8 +128,7 @@ impl Default for StandaloneOptions { opentsdb: OpentsdbOptions::default(), influxdb: InfluxdbOptions::default(), prom_store: PromStoreOptions::default(), - wal_meta: MetaSrvWalConfig::default(), - wal_datanode: DatanodeWalConfig::default(), + wal: StandaloneWalConfig::default(), storage: StorageConfig::default(), metadata_store: KvBackendConfig::default(), procedure: ProcedureConfig::default(), @@ -170,7 +167,7 @@ impl StandaloneOptions { DatanodeOptions { node_id: Some(0), enable_telemetry: self.enable_telemetry, - wal: self.wal_datanode, + wal: self.wal.into(), storage: self.storage, region_engine: self.region_engine, rpc_addr: self.grpc.addr, @@ -342,7 +339,7 @@ impl StartCommand { let procedure = opts.procedure.clone(); let frontend = opts.clone().frontend_options(); let logging = opts.logging.clone(); - let wal_meta = opts.wal_meta.clone(); + let wal_meta = opts.wal.clone().into(); let datanode = opts.datanode_options().clone(); Ok(Options::Standalone(Box::new(MixOptions { @@ -484,6 +481,7 @@ mod tests { use auth::{Identity, Password, UserProviderRef}; use common_base::readable_size::ReadableSize; + use common_config::WalConfig; use common_test_util::temp_dir::create_named_temp_file; use datanode::config::{FileConfig, GcsConfig}; use servers::Mode; @@ -594,7 +592,7 @@ mod tests { assert_eq!(None, fe_opts.mysql.reject_no_database); assert!(fe_opts.influxdb.enable); - let DatanodeWalConfig::RaftEngine(raft_engine_config) = dn_opts.wal else { + let WalConfig::RaftEngine(raft_engine_config) = dn_opts.wal else { unreachable!() }; assert_eq!("/tmp/greptimedb/test/wal", raft_engine_config.dir.unwrap()); @@ -740,8 +738,7 @@ mod tests { assert_eq!(options.opentsdb, default_options.opentsdb); assert_eq!(options.influxdb, default_options.influxdb); assert_eq!(options.prom_store, default_options.prom_store); - assert_eq!(options.wal_meta, default_options.wal_meta); - assert_eq!(options.wal_datanode, default_options.wal_datanode); + assert_eq!(options.wal, default_options.wal); assert_eq!(options.metadata_store, default_options.metadata_store); assert_eq!(options.procedure, default_options.procedure); assert_eq!(options.logging, default_options.logging); diff --git a/src/common/config/src/wal.rs b/src/common/config/src/wal.rs index e3c1b7b9cb28..f9c492758e63 100644 --- a/src/common/config/src/wal.rs +++ b/src/common/config/src/wal.rs @@ -18,7 +18,9 @@ pub mod raft_engine; use serde::{Deserialize, Serialize}; use serde_with::with_prefix; -pub use crate::wal::kafka::{KafkaConfig, KafkaOptions as KafkaWalOptions, Topic as KafkaWalTopic}; +pub use crate::wal::kafka::{ + KafkaConfig, KafkaOptions as KafkaWalOptions, StandaloneKafkaConfig, Topic as KafkaWalTopic, +}; pub use crate::wal::raft_engine::RaftEngineConfig; /// An encoded wal options will be wrapped into a (WAL_OPTIONS_KEY, encoded wal options) key-value pair @@ -27,17 +29,38 @@ pub const WAL_OPTIONS_KEY: &str = "wal_options"; /// Wal config for datanode. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -#[serde(tag = "provider")] +#[serde(tag = "provider", rename_all = "snake_case")] pub enum WalConfig { - #[serde(rename = "raft_engine")] RaftEngine(RaftEngineConfig), - #[serde(rename = "kafka")] Kafka(KafkaConfig), } +impl From for WalConfig { + fn from(value: StandaloneWalConfig) -> Self { + match value { + StandaloneWalConfig::RaftEngine(config) => WalConfig::RaftEngine(config), + StandaloneWalConfig::Kafka(config) => WalConfig::Kafka(config.base), + } + } +} + impl Default for WalConfig { fn default() -> Self { - WalConfig::Kafka(KafkaConfig::default()) + WalConfig::RaftEngine(RaftEngineConfig::default()) + } +} + +/// Wal config for datanode. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(tag = "provider", rename_all = "snake_case")] +pub enum StandaloneWalConfig { + RaftEngine(RaftEngineConfig), + Kafka(StandaloneKafkaConfig), +} + +impl Default for StandaloneWalConfig { + fn default() -> Self { + StandaloneWalConfig::RaftEngine(RaftEngineConfig::default()) } } @@ -45,12 +68,10 @@ impl Default for WalConfig { /// A wal options is encoded by metasrv with `serde_json::to_string`, and then decoded /// by datanode with `serde_json::from_str`. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)] -#[serde(tag = "wal.provider")] +#[serde(tag = "wal.provider", rename_all = "snake_case")] pub enum WalOptions { #[default] - #[serde(rename = "raft_engine")] RaftEngine, - #[serde(rename = "kafka")] #[serde(with = "prefix_wal_kafka")] Kafka(KafkaWalOptions), } @@ -64,15 +85,16 @@ mod tests { use common_base::readable_size::ReadableSize; use rskafka::client::partition::Compression as RsKafkaCompression; + use crate::wal::kafka::KafkaBackoffConfig; use crate::wal::{KafkaConfig, KafkaWalOptions, WalOptions}; #[test] fn test_serde_kafka_config() { let toml_str = r#" - broker_endpoints = ["127.0.0.1:9090"] + broker_endpoints = ["127.0.0.1:9092"] max_batch_size = "4MB" linger = "200ms" - max_wait_time = "100ms" + produce_record_timeout = "100ms" backoff_init = "500ms" backoff_max = "10s" backoff_base = 2 @@ -80,15 +102,17 @@ mod tests { "#; let decoded: KafkaConfig = toml::from_str(toml_str).unwrap(); let expected = KafkaConfig { - broker_endpoints: vec!["127.0.0.1:9090".to_string()], + broker_endpoints: vec!["127.0.0.1:9092".to_string()], compression: RsKafkaCompression::default(), max_batch_size: ReadableSize::mb(4), linger: Duration::from_millis(200), - max_wait_time: Duration::from_millis(100), - backoff_init: Duration::from_millis(500), - backoff_max: Duration::from_secs(10), - backoff_base: 2, - backoff_deadline: Some(Duration::from_secs(60 * 5)), + produce_record_timeout: Duration::from_millis(100), + backoff: KafkaBackoffConfig { + init: Duration::from_millis(500), + max: Duration::from_secs(10), + base: 2, + deadline: Some(Duration::from_secs(60 * 5)), + }, }; assert_eq!(decoded, expected); } diff --git a/src/common/config/src/wal/kafka.rs b/src/common/config/src/wal/kafka.rs index fb366411c317..d1d1a615a370 100644 --- a/src/common/config/src/wal/kafka.rs +++ b/src/common/config/src/wal/kafka.rs @@ -17,12 +17,21 @@ use std::time::Duration; use common_base::readable_size::ReadableSize; use rskafka::client::partition::Compression as RsKafkaCompression; use serde::{Deserialize, Serialize}; +use serde_with::with_prefix; /// Topic name prefix. pub const TOPIC_NAME_PREFIX: &str = "greptimedb_wal_topic"; /// Kafka wal topic. pub type Topic = String; +/// The type of the topic selector, i.e. with which strategy to select a topic. +#[derive(Default, Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum TopicSelectorType { + #[default] + RoundRobin, +} + /// Configurations for kafka wal. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(default)] @@ -40,34 +49,86 @@ pub struct KafkaConfig { pub linger: Duration, /// The maximum amount of time (in milliseconds) to wait for Kafka records to be returned. #[serde(with = "humantime_serde")] - pub max_wait_time: Duration, + pub produce_record_timeout: Duration, + /// The backoff config. + #[serde(flatten, with = "kafka_backoff")] + pub backoff: KafkaBackoffConfig, +} + +impl Default for KafkaConfig { + fn default() -> Self { + Self { + broker_endpoints: vec!["127.0.0.1:9092".to_string()], + compression: RsKafkaCompression::NoCompression, + max_batch_size: ReadableSize::mb(4), + linger: Duration::from_millis(200), + produce_record_timeout: Duration::from_millis(100), + backoff: KafkaBackoffConfig::default(), + } + } +} + +with_prefix!(pub kafka_backoff "backoff_"); + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(default)] +pub struct KafkaBackoffConfig { /// The initial backoff for kafka clients. #[serde(with = "humantime_serde")] - pub backoff_init: Duration, + pub init: Duration, /// The maximum backoff for kafka clients. #[serde(with = "humantime_serde")] - pub backoff_max: Duration, + pub max: Duration, /// Exponential backoff rate, i.e. next backoff = base * current backoff. // Sets to u32 type since some structs containing the KafkaConfig need to derive the Eq trait. - pub backoff_base: u32, + pub base: u32, /// Stop reconnecting if the total wait time reaches the deadline. /// If it's None, the reconnecting won't terminate. #[serde(with = "humantime_serde")] - pub backoff_deadline: Option, + pub deadline: Option, } -impl Default for KafkaConfig { +impl Default for KafkaBackoffConfig { fn default() -> Self { Self { - broker_endpoints: vec!["127.0.0.1:9092".to_string()], - compression: RsKafkaCompression::NoCompression, - max_batch_size: ReadableSize::mb(4), - linger: Duration::from_millis(200), - max_wait_time: Duration::from_millis(100), - backoff_init: Duration::from_millis(500), - backoff_max: Duration::from_secs(10), - backoff_base: 2, - backoff_deadline: Some(Duration::from_secs(60 * 5)), // 5 mins + init: Duration::from_millis(500), + max: Duration::from_secs(10), + base: 2, + deadline: Some(Duration::from_secs(60 * 5)), // 5 mins + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(default)] +pub struct StandaloneKafkaConfig { + #[serde(flatten)] + pub base: KafkaConfig, + /// Number of topics to be created upon start. + pub num_topics: usize, + /// The type of the topic selector with which to select a topic for a region. + pub selector_type: TopicSelectorType, + /// Topic name prefix. + pub topic_name_prefix: String, + /// Number of partitions per topic. + pub num_partitions: i32, + /// The replication factor of each topic. + pub replication_factor: i16, + /// Above which a topic creation operation will be cancelled. + #[serde(with = "humantime_serde")] + pub create_topic_timeout: Duration, +} + +impl Default for StandaloneKafkaConfig { + fn default() -> Self { + Self { + base: KafkaConfig::default(), + num_topics: 64, + selector_type: TopicSelectorType::RoundRobin, + topic_name_prefix: "greptimedb_wal_topic".to_string(), + num_partitions: 1, + replication_factor: 3, + create_topic_timeout: Duration::from_secs(30), } } } diff --git a/src/common/meta/src/wal.rs b/src/common/meta/src/wal.rs index 728373182036..1e394e847985 100644 --- a/src/common/meta/src/wal.rs +++ b/src/common/meta/src/wal.rs @@ -17,6 +17,7 @@ pub mod options_allocator; use std::collections::HashMap; +use common_config::wal::StandaloneWalConfig; use serde::{Deserialize, Serialize}; use serde_with::with_prefix; @@ -28,18 +29,29 @@ pub use crate::wal::options_allocator::{ }; /// Wal config for metasrv. -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] -#[serde(tag = "provider")] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)] +#[serde(tag = "provider", rename_all = "snake_case")] pub enum WalConfig { - #[serde(rename = "raft_engine")] + #[default] RaftEngine, - #[serde(rename = "kafka")] Kafka(KafkaConfig), } -impl Default for WalConfig { - fn default() -> Self { - WalConfig::Kafka(KafkaConfig::default()) +impl From for WalConfig { + fn from(value: StandaloneWalConfig) -> Self { + match value { + StandaloneWalConfig::RaftEngine(config) => WalConfig::RaftEngine, + StandaloneWalConfig::Kafka(config) => WalConfig::Kafka(KafkaConfig { + broker_endpoints: config.base.broker_endpoints, + num_topics: config.num_topics, + selector_type: config.selector_type, + topic_name_prefix: config.topic_name_prefix, + num_partitions: config.num_partitions, + replication_factor: config.replication_factor, + create_topic_timeout: config.create_topic_timeout, + backoff: config.base.backoff, + }), + } } } @@ -47,8 +59,9 @@ impl Default for WalConfig { mod tests { use std::time::Duration; + use common_config::wal::kafka::{KafkaBackoffConfig, TopicSelectorType}; + use super::*; - use crate::wal::kafka::topic_selector::SelectorType as KafkaTopicSelectorType; #[test] fn test_serde_wal_config() { @@ -62,7 +75,7 @@ mod tests { // Test serde raft-engine wal config with extra other wal config. let toml_str = r#" provider = "raft_engine" - broker_endpoints = ["127.0.0.1:9090"] + broker_endpoints = ["127.0.0.1:9092"] num_topics = 32 "#; let wal_config: WalConfig = toml::from_str(toml_str).unwrap(); @@ -71,7 +84,7 @@ mod tests { // Test serde kafka wal config. let toml_str = r#" provider = "kafka" - broker_endpoints = ["127.0.0.1:9090"] + broker_endpoints = ["127.0.0.1:9092"] num_topics = 32 selector_type = "round_robin" topic_name_prefix = "greptimedb_wal_topic" @@ -85,17 +98,19 @@ mod tests { "#; let wal_config: WalConfig = toml::from_str(toml_str).unwrap(); let expected_kafka_config = KafkaConfig { - broker_endpoints: vec!["127.0.0.1:9090".to_string()], + broker_endpoints: vec!["127.0.0.1:9092".to_string()], num_topics: 32, - selector_type: KafkaTopicSelectorType::RoundRobin, + selector_type: TopicSelectorType::RoundRobin, topic_name_prefix: "greptimedb_wal_topic".to_string(), num_partitions: 1, replication_factor: 3, create_topic_timeout: Duration::from_secs(30), - backoff_init: Duration::from_millis(500), - backoff_max: Duration::from_secs(10), - backoff_base: 2, - backoff_deadline: Some(Duration::from_secs(60 * 5)), + backoff: KafkaBackoffConfig { + init: Duration::from_millis(500), + max: Duration::from_secs(10), + base: 2, + deadline: Some(Duration::from_secs(60 * 5)), + }, }; assert_eq!(wal_config, WalConfig::Kafka(expected_kafka_config)); } diff --git a/src/common/meta/src/wal/kafka.rs b/src/common/meta/src/wal/kafka.rs index 8b434cb8dd55..0a61b6015dfc 100644 --- a/src/common/meta/src/wal/kafka.rs +++ b/src/common/meta/src/wal/kafka.rs @@ -18,11 +18,12 @@ pub mod topic_selector; use std::time::Duration; +use common_config::wal::kafka::{kafka_backoff, KafkaBackoffConfig, TopicSelectorType}; +use common_config::wal::StandaloneWalConfig; use serde::{Deserialize, Serialize}; pub use crate::wal::kafka::topic::Topic; pub use crate::wal::kafka::topic_manager::TopicManager; -use crate::wal::kafka::topic_selector::SelectorType as TopicSelectorType; /// Configurations for kafka wal. #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] @@ -42,20 +43,9 @@ pub struct KafkaConfig { /// Above which a topic creation operation will be cancelled. #[serde(with = "humantime_serde")] pub create_topic_timeout: Duration, - /// The initial backoff for kafka clients. - #[serde(with = "humantime_serde")] - pub backoff_init: Duration, - /// The maximum backoff for kafka clients. - #[serde(with = "humantime_serde")] - pub backoff_max: Duration, - /// Exponential backoff rate, i.e. next backoff = base * current backoff. - // Sets to u32 type since the `backoff_base` field in the KafkaConfig for datanode is of type u32, - // and we want to unify their types. - pub backoff_base: u32, - /// Stop reconnecting if the total wait time reaches the deadline. - /// If it's None, the reconnecting won't terminate. - #[serde(with = "humantime_serde")] - pub backoff_deadline: Option, + /// The backoff config. + #[serde(flatten, with = "kafka_backoff")] + pub backoff: KafkaBackoffConfig, } impl Default for KafkaConfig { @@ -71,10 +61,7 @@ impl Default for KafkaConfig { num_partitions: 1, replication_factor, create_topic_timeout: Duration::from_secs(30), - backoff_init: Duration::from_millis(500), - backoff_max: Duration::from_secs(10), - backoff_base: 2, - backoff_deadline: Some(Duration::from_secs(60 * 5)), // 5 mins + backoff: KafkaBackoffConfig::default(), } } } diff --git a/src/common/meta/src/wal/kafka/topic_manager.rs b/src/common/meta/src/wal/kafka/topic_manager.rs index 45b6115b0020..860192b97071 100644 --- a/src/common/meta/src/wal/kafka/topic_manager.rs +++ b/src/common/meta/src/wal/kafka/topic_manager.rs @@ -16,6 +16,7 @@ use std::collections::HashSet; use std::sync::Arc; use std::time::Duration; +use common_config::wal::kafka::TopicSelectorType; use common_telemetry::{debug, error, info}; use rskafka::client::controller::ControllerClient; use rskafka::client::error::Error as RsKafkaError; @@ -31,7 +32,7 @@ use crate::error::{ use crate::kv_backend::KvBackendRef; use crate::rpc::store::PutRequest; use crate::wal::kafka::topic::Topic; -use crate::wal::kafka::topic_selector::{RoundRobinTopicSelector, SelectorType, TopicSelectorRef}; +use crate::wal::kafka::topic_selector::{RoundRobinTopicSelector, TopicSelectorRef}; use crate::wal::kafka::KafkaConfig; const CREATED_TOPICS_KEY: &str = "__created_wal_topics/kafka/"; @@ -54,7 +55,7 @@ impl TopicManager { .collect::>(); let selector = match config.selector_type { - SelectorType::RoundRobin => RoundRobinTopicSelector::with_shuffle(), + TopicSelectorType::RoundRobin => RoundRobinTopicSelector::with_shuffle(), }; Self { @@ -105,10 +106,10 @@ impl TopicManager { async fn try_create_topics(&self, topics: &[Topic], to_be_created: &[usize]) -> Result<()> { // Builds an kafka controller client for creating topics. let backoff_config = BackoffConfig { - init_backoff: self.config.backoff_init, - max_backoff: self.config.backoff_max, - base: self.config.backoff_base as f64, - deadline: self.config.backoff_deadline, + init_backoff: self.config.backoff.init, + max_backoff: self.config.backoff.max, + base: self.config.backoff.base as f64, + deadline: self.config.backoff.deadline, }; let client = ClientBuilder::new(self.config.broker_endpoints.clone()) .backoff_config(backoff_config) diff --git a/src/common/meta/src/wal/kafka/topic_selector.rs b/src/common/meta/src/wal/kafka/topic_selector.rs index 6764cadcc990..fe7517bfd0b5 100644 --- a/src/common/meta/src/wal/kafka/topic_selector.rs +++ b/src/common/meta/src/wal/kafka/topic_selector.rs @@ -22,14 +22,6 @@ use snafu::ensure; use crate::error::{EmptyTopicPoolSnafu, Result}; use crate::wal::kafka::topic::Topic; -/// The type of the topic selector, i.e. with which strategy to select a topic. -#[derive(Default, Debug, Clone, Serialize, Deserialize, PartialEq)] -pub enum SelectorType { - #[default] - #[serde(rename = "round_robin")] - RoundRobin, -} - /// Controls topic selection. pub(crate) trait TopicSelector: Send + Sync { /// Selects a topic from the topic pool. diff --git a/src/common/telemetry/src/logging.rs b/src/common/telemetry/src/logging.rs index f825d8f3835b..0b7c3ba1b818 100644 --- a/src/common/telemetry/src/logging.rs +++ b/src/common/telemetry/src/logging.rs @@ -43,6 +43,7 @@ pub struct LoggingOptions { pub enable_otlp_tracing: bool, pub otlp_endpoint: Option, pub tracing_sample_ratio: Option, + pub append_stdout: bool, } impl PartialEq for LoggingOptions { @@ -52,6 +53,7 @@ impl PartialEq for LoggingOptions { && self.enable_otlp_tracing == other.enable_otlp_tracing && self.otlp_endpoint == other.otlp_endpoint && self.tracing_sample_ratio == other.tracing_sample_ratio + && self.append_stdout == other.append_stdout } } @@ -65,6 +67,7 @@ impl Default for LoggingOptions { enable_otlp_tracing: false, otlp_endpoint: None, tracing_sample_ratio: None, + append_stdout: true, } } } @@ -129,10 +132,14 @@ pub fn init_global_logging( // Enable log compatible layer to convert log record to tracing span. LogTracer::init().expect("log tracer must be valid"); - // Stdout layer. - let (stdout_writer, stdout_guard) = tracing_appender::non_blocking(std::io::stdout()); - let stdout_logging_layer = Layer::new().with_writer(stdout_writer); - guards.push(stdout_guard); + let stdout_logging_layer = if opts.append_stdout { + let (stdout_writer, stdout_guard) = tracing_appender::non_blocking(std::io::stdout()); + guards.push(stdout_guard); + + Some(Layer::new().with_writer(stdout_writer)) + } else { + None + }; // JSON log layer. let rolling_appender = RollingFileAppender::new(Rotation::HOURLY, dir, app_name); @@ -184,7 +191,7 @@ pub fn init_global_logging( None }; - let stdout_logging_layer = stdout_logging_layer.with_filter(filter.clone()); + let stdout_logging_layer = stdout_logging_layer.map(|x| x.with_filter(filter.clone())); let file_logging_layer = file_logging_layer.with_filter(filter); diff --git a/src/log-store/src/kafka/client_manager.rs b/src/log-store/src/kafka/client_manager.rs index 9aa27bf1b3fd..e272840201bb 100644 --- a/src/log-store/src/kafka/client_manager.rs +++ b/src/log-store/src/kafka/client_manager.rs @@ -75,10 +75,10 @@ impl ClientManager { pub(crate) async fn try_new(config: &KafkaConfig) -> Result { // Sets backoff config for the top-level kafka client and all clients constructed by it. let backoff_config = BackoffConfig { - init_backoff: config.backoff_init, - max_backoff: config.backoff_max, - base: config.backoff_base as f64, - deadline: config.backoff_deadline, + init_backoff: config.backoff.init, + max_backoff: config.backoff.max, + base: config.backoff.base as f64, + deadline: config.backoff.deadline, }; let client = ClientBuilder::new(config.broker_endpoints.clone()) .backoff_config(backoff_config) diff --git a/src/log-store/src/kafka/log_store.rs b/src/log-store/src/kafka/log_store.rs index cdbb36bbe28f..5390c308c378 100644 --- a/src/log-store/src/kafka/log_store.rs +++ b/src/log-store/src/kafka/log_store.rs @@ -158,7 +158,7 @@ impl LogStore for KafkaLogStore { let mut stream_consumer = StreamConsumerBuilder::new(client, StartOffset::At(start_offset)) .with_max_batch_size(self.config.max_batch_size.as_bytes() as i32) - .with_max_wait_ms(self.config.max_wait_time.as_millis() as i32) + .with_max_wait_ms(self.config.produce_record_timeout.as_millis() as i32) .build(); debug!( diff --git a/src/metric-engine/Cargo.toml b/src/metric-engine/Cargo.toml index 2d820b9a2787..def5885cf908 100644 --- a/src/metric-engine/Cargo.toml +++ b/src/metric-engine/Cargo.toml @@ -5,7 +5,6 @@ edition.workspace = true license.workspace = true [dependencies] -ahash.workspace = true api.workspace = true aquamarine.workspace = true async-trait.workspace = true @@ -20,6 +19,7 @@ datafusion.workspace = true datatypes.workspace = true lazy_static = "1.4" mito2.workspace = true +mur3 = "0.1" object-store.workspace = true prometheus.workspace = true serde_json.workspace = true diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index 70038f528b60..5e610c3e91a3 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -40,9 +40,6 @@ use self::state::MetricEngineState; use crate::data_region::DataRegion; use crate::metadata_region::MetadataRegion; -/// Fixed random state for generating tsid -pub(crate) const RANDOM_STATE: ahash::RandomState = ahash::RandomState::with_seeds(1, 2, 3, 4); - #[cfg_attr(doc, aquamarine::aquamarine)] /// # Metric Engine /// diff --git a/src/metric-engine/src/engine/put.rs b/src/metric-engine/src/engine/put.rs index a0f187faaa7a..19f40a509975 100644 --- a/src/metric-engine/src/engine/put.rs +++ b/src/metric-engine/src/engine/put.rs @@ -14,7 +14,6 @@ use std::hash::{BuildHasher, Hash, Hasher}; -use ahash::RandomState; use api::v1::value::ValueData; use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType}; use common_telemetry::{error, info}; @@ -25,13 +24,16 @@ use store_api::metric_engine_consts::{ use store_api::region_request::{AffectedRows, RegionPutRequest}; use store_api::storage::{RegionId, TableId}; -use crate::engine::{MetricEngineInner, RANDOM_STATE}; +use crate::engine::MetricEngineInner; use crate::error::{ ColumnNotFoundSnafu, ForbiddenPhysicalAlterSnafu, LogicalRegionNotFoundSnafu, Result, }; use crate::metrics::FORBIDDEN_OPERATION_COUNT; use crate::utils::{to_data_region_id, to_metadata_region_id}; +// A random number +const TSID_HASH_SEED: u32 = 846793005; + impl MetricEngineInner { /// Dispatch region put request pub async fn put_region( @@ -174,9 +176,8 @@ impl MetricEngineInner { }); // fill internal columns - let mut random_state = RANDOM_STATE.clone(); for row in &mut rows.rows { - Self::fill_internal_columns(&mut random_state, table_id, &tag_col_indices, row); + Self::fill_internal_columns(table_id, &tag_col_indices, row); } Ok(()) @@ -184,12 +185,11 @@ impl MetricEngineInner { /// Fills internal columns of a row with table name and a hash of tag values. fn fill_internal_columns( - random_state: &mut RandomState, table_id: TableId, tag_col_indices: &[(usize, String)], row: &mut Row, ) { - let mut hasher = random_state.build_hasher(); + let mut hasher = mur3::Hasher128::with_seed(TSID_HASH_SEED); for (idx, name) in tag_col_indices { let tag = row.values[*idx].clone(); name.hash(&mut hasher); @@ -198,7 +198,8 @@ impl MetricEngineInner { string.hash(&mut hasher); } } - let hash = hasher.finish(); + // TSID is 64 bits, simply truncate the 128 bits hash + let (hash, _) = hasher.finish128(); // fill table id and tsid row.values.push(ValueData::U32Value(table_id).into()); @@ -247,15 +248,15 @@ mod tests { .unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); let expected = "\ -+-------------------------+----------------+------------+---------------------+-------+ -| greptime_timestamp | greptime_value | __table_id | __tsid | job | -+-------------------------+----------------+------------+---------------------+-------+ -| 1970-01-01T00:00:00 | 0.0 | 3 | 4844750677434873907 | tag_0 | -| 1970-01-01T00:00:00.001 | 1.0 | 3 | 4844750677434873907 | tag_0 | -| 1970-01-01T00:00:00.002 | 2.0 | 3 | 4844750677434873907 | tag_0 | -| 1970-01-01T00:00:00.003 | 3.0 | 3 | 4844750677434873907 | tag_0 | -| 1970-01-01T00:00:00.004 | 4.0 | 3 | 4844750677434873907 | tag_0 | -+-------------------------+----------------+------------+---------------------+-------+"; ++-------------------------+----------------+------------+----------------------+-------+ +| greptime_timestamp | greptime_value | __table_id | __tsid | job | ++-------------------------+----------------+------------+----------------------+-------+ +| 1970-01-01T00:00:00 | 0.0 | 3 | 12881218023286672757 | tag_0 | +| 1970-01-01T00:00:00.001 | 1.0 | 3 | 12881218023286672757 | tag_0 | +| 1970-01-01T00:00:00.002 | 2.0 | 3 | 12881218023286672757 | tag_0 | +| 1970-01-01T00:00:00.003 | 3.0 | 3 | 12881218023286672757 | tag_0 | +| 1970-01-01T00:00:00.004 | 4.0 | 3 | 12881218023286672757 | tag_0 | ++-------------------------+----------------+------------+----------------------+-------+"; assert_eq!(expected, batches.pretty_print().unwrap(), "physical region"); // read data from logical region diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index b1d48f8c654e..39457281d76b 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -416,6 +416,13 @@ pub enum Error { error: ArrowError, location: Location, }, + + #[snafu(display("Invalid file metadata"))] + ConvertMetaData { + location: Location, + #[snafu(source)] + error: parquet::errors::ParquetError, + }, } pub type Result = std::result::Result; @@ -477,6 +484,7 @@ impl ErrorExt for Error { InvalidBatch { .. } => StatusCode::InvalidArguments, InvalidRecordBatch { .. } => StatusCode::InvalidArguments, ConvertVector { source, .. } => source.status_code(), + ConvertMetaData { .. } => StatusCode::Internal, ComputeArrow { .. } => StatusCode::Internal, ComputeVector { .. } => StatusCode::Internal, PrimaryKeyLengthMismatch { .. } => StatusCode::InvalidArguments, diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index af3f8479f39c..584faf1ab964 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -15,6 +15,7 @@ //! SST in parquet format. mod format; +mod helper; mod page_reader; pub mod reader; pub mod row_group; @@ -22,6 +23,7 @@ mod stats; pub mod writer; use common_base::readable_size::ReadableSize; +use parquet::file::metadata::ParquetMetaData; use crate::sst::file::FileTimeRange; @@ -59,6 +61,8 @@ pub struct SstInfo { pub file_size: u64, /// Number of rows. pub num_rows: usize, + /// File Meta Data + pub file_metadata: Option, } #[cfg(test)] @@ -195,4 +199,68 @@ mod tests { }; assert!(cache.as_ref().unwrap().get_pages(&page_key).is_none()); } + + #[tokio::test] + async fn test_parquet_metadata_eq() { + // create test env + let mut env = crate::test_util::TestEnv::new(); + let object_store = env.init_object_store_manager(); + let handle = sst_file_handle(0, 1000); + let file_path = handle.file_path(FILE_DIR); + let metadata = Arc::new(sst_region_metadata()); + let source = new_source(&[ + new_batch_by_range(&["a", "d"], 0, 60), + new_batch_by_range(&["b", "f"], 0, 40), + new_batch_by_range(&["b", "h"], 100, 200), + ]); + let write_opts = WriteOptions { + row_group_size: 50, + ..Default::default() + }; + + // write the sst file and get sst info + // sst info contains the parquet metadata, which is converted from FileMetaData + let mut writer = + ParquetWriter::new(file_path, metadata.clone(), source, object_store.clone()); + let sst_info = writer + .write_all(&write_opts) + .await + .unwrap() + .expect("write_all should return sst info"); + let writer_metadata = sst_info.file_metadata.unwrap(); + + // read the sst file metadata + let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store); + let reader = builder.build().await.unwrap(); + let reader_metadata = reader.parquet_metadata(); + + // Because ParquetMetaData doesn't implement PartialEq, + // check all fields manually + macro_rules! assert_metadata { + ( $writer:expr, $reader:expr, $($method:ident,)+ ) => { + $( + assert_eq!($writer.$method(), $reader.$method()); + )+ + } + } + + assert_metadata!( + writer_metadata.file_metadata(), + reader_metadata.file_metadata(), + version, + num_rows, + created_by, + key_value_metadata, + schema_descr, + column_orders, + ); + + assert_metadata!( + writer_metadata, + reader_metadata, + row_groups, + column_index, + offset_index, + ); + } } diff --git a/src/mito2/src/sst/parquet/helper.rs b/src/mito2/src/sst/parquet/helper.rs new file mode 100644 index 000000000000..6e059bd963e5 --- /dev/null +++ b/src/mito2/src/sst/parquet/helper.rs @@ -0,0 +1,86 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use parquet::basic::ColumnOrder; +use parquet::file::metadata::{FileMetaData, ParquetMetaData, RowGroupMetaData}; +use parquet::format; +use parquet::schema::types::{from_thrift, SchemaDescriptor}; +use snafu::ResultExt; + +use crate::error; +use crate::error::Result; + +// Refer to https://github.com/apache/arrow-rs/blob/7e134f4d277c0b62c27529fc15a4739de3ad0afd/parquet/src/file/footer.rs#L74-L90 +/// Convert [format::FileMetaData] to [ParquetMetaData] +pub fn parse_parquet_metadata(t_file_metadata: format::FileMetaData) -> Result { + let schema = from_thrift(&t_file_metadata.schema).context(error::ConvertMetaDataSnafu)?; + let schema_desc_ptr = Arc::new(SchemaDescriptor::new(schema)); + + let mut row_groups = Vec::with_capacity(t_file_metadata.row_groups.len()); + for rg in t_file_metadata.row_groups { + row_groups.push( + RowGroupMetaData::from_thrift(schema_desc_ptr.clone(), rg) + .context(error::ConvertMetaDataSnafu)?, + ); + } + let column_orders = parse_column_orders(t_file_metadata.column_orders, &schema_desc_ptr); + + let file_metadata = FileMetaData::new( + t_file_metadata.version, + t_file_metadata.num_rows, + t_file_metadata.created_by, + t_file_metadata.key_value_metadata, + schema_desc_ptr, + column_orders, + ); + // There may be a problem owing to lacking of column_index and offset_index, + // if we open page index in the future. + Ok(ParquetMetaData::new(file_metadata, row_groups)) +} + +// Port from https://github.com/apache/arrow-rs/blob/7e134f4d277c0b62c27529fc15a4739de3ad0afd/parquet/src/file/footer.rs#L106-L137 +/// Parses column orders from Thrift definition. +/// If no column orders are defined, returns `None`. +fn parse_column_orders( + t_column_orders: Option>, + schema_descr: &SchemaDescriptor, +) -> Option> { + match t_column_orders { + Some(orders) => { + // Should always be the case + assert_eq!( + orders.len(), + schema_descr.num_columns(), + "Column order length mismatch" + ); + let mut res = Vec::with_capacity(schema_descr.num_columns()); + for (i, column) in schema_descr.columns().iter().enumerate() { + match orders[i] { + format::ColumnOrder::TYPEORDER(_) => { + let sort_order = ColumnOrder::get_sort_order( + column.logical_type(), + column.converted_type(), + column.physical_type(), + ); + res.push(ColumnOrder::TYPE_DEFINED_ORDER(sort_order)); + } + } + } + Some(res) + } + None => None, + } +} diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 0882ef82c7e3..60729c664283 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -452,4 +452,9 @@ impl ParquetReader { Ok(None) } + + #[cfg(test)] + pub fn parquet_metadata(&self) -> Arc { + self.reader_builder.parquet_meta.clone() + } } diff --git a/src/mito2/src/sst/parquet/writer.rs b/src/mito2/src/sst/parquet/writer.rs index d776b3ac627d..febec27c0d36 100644 --- a/src/mito2/src/sst/parquet/writer.rs +++ b/src/mito2/src/sst/parquet/writer.rs @@ -26,6 +26,7 @@ use snafu::ResultExt; use store_api::metadata::RegionMetadataRef; use store_api::storage::consts::SEQUENCE_COLUMN_NAME; +use super::helper::parse_parquet_metadata; use crate::error::{InvalidMetadataSnafu, Result, WriteBufferSnafu}; use crate::read::{Batch, Source}; use crate::sst::parquet::format::WriteFormat; @@ -107,15 +108,20 @@ impl ParquetWriter { return Ok(None); } - let (_file_meta, file_size) = buffered_writer.close().await.context(WriteBufferSnafu)?; + let (file_meta, file_size) = buffered_writer.close().await.context(WriteBufferSnafu)?; + // Safety: num rows > 0 so we must have min/max. let time_range = stats.time_range.unwrap(); + // convert FileMetaData to ParquetMetaData + let parquet_metadata = parse_parquet_metadata(file_meta)?; + // object_store.write will make sure all bytes are written or an error is raised. Ok(Some(SstInfo { time_range, file_size, num_rows: stats.num_rows, + file_metadata: Some(parquet_metadata), })) } diff --git a/src/operator/src/error.rs b/src/operator/src/error.rs index e96f1aaa21fe..52956e8055f9 100644 --- a/src/operator/src/error.rs +++ b/src/operator/src/error.rs @@ -483,6 +483,12 @@ pub enum Error { location: Location, source: query::error::Error, }, + + #[snafu(display("Invalid table name: {}", table_name))] + InvalidTableName { + table_name: String, + location: Location, + }, } pub type Result = std::result::Result; @@ -507,7 +513,8 @@ impl ErrorExt for Error { | Error::InvalidPartitionColumns { .. } | Error::PrepareFileTable { .. } | Error::InferFileTableSchema { .. } - | Error::SchemaIncompatible { .. } => StatusCode::InvalidArguments, + | Error::SchemaIncompatible { .. } + | Error::InvalidTableName { .. } => StatusCode::InvalidArguments, Error::TableAlreadyExists { .. } => StatusCode::TableAlreadyExists, diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index 620e3de6445d..43fdf23a4f5b 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -50,8 +50,8 @@ use table::TableRef; use super::StatementExecutor; use crate::error::{ self, AlterExprToRequestSnafu, CatalogSnafu, ColumnDataTypeSnafu, ColumnNotFoundSnafu, - DeserializePartitionSnafu, InvalidPartitionColumnsSnafu, ParseSqlSnafu, Result, - SchemaNotFoundSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu, + DeserializePartitionSnafu, InvalidPartitionColumnsSnafu, InvalidTableNameSnafu, ParseSqlSnafu, + Result, SchemaNotFoundSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu, UnrecognizedTableOptionSnafu, }; use crate::expr_factory; @@ -131,8 +131,8 @@ impl StatementExecutor { ensure!( NAME_PATTERN_REG.is_match(&create_table.table_name), - error::UnexpectedSnafu { - violated: format!("Invalid table name: {}", create_table.table_name) + InvalidTableNameSnafu { + table_name: create_table.table_name.clone(), } ); diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index d73fcbe91397..70b4401c9a73 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -24,7 +24,7 @@ use catalog; use common_error::ext::{BoxedError, ErrorExt}; use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; -use common_telemetry::logging; +use common_telemetry::{debug, error}; use datatypes::prelude::ConcreteDataType; use query::parser::PromQuery; use serde_json::json; @@ -620,7 +620,11 @@ impl IntoResponse for Error { | Error::InvalidQuery { .. } | Error::TimePrecision { .. } => HttpStatusCode::BAD_REQUEST, _ => { - logging::error!(self; "Failed to handle HTTP request"); + if self.status_code().should_log_error() { + error!(self; "Failed to handle HTTP request: "); + } else { + debug!("Failed to handle HTTP request: {self}"); + } HttpStatusCode::INTERNAL_SERVER_ERROR } diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 0decb3821951..9341ba5f09ce 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -747,6 +747,7 @@ enable = true [frontend.logging] enable_otlp_tracing = false +append_stdout = true [frontend.datanode.client] timeout = "10s" @@ -815,6 +816,7 @@ parallel_scan_channel_size = 32 [datanode.logging] enable_otlp_tracing = false +append_stdout = true [datanode.export_metrics] enable = false @@ -825,6 +827,7 @@ write_interval = "30s" [logging] enable_otlp_tracing = false +append_stdout = true [wal_meta] provider = "raft_engine""#, diff --git a/tests/cases/standalone/common/create/create.result b/tests/cases/standalone/common/create/create.result index 08e4b658de2b..436cbfb393db 100644 --- a/tests/cases/standalone/common/create/create.result +++ b/tests/cases/standalone/common/create/create.result @@ -52,7 +52,7 @@ Error: 4000(TableAlreadyExists), Table already exists: `greptime.public.test2` CREATE TABLE 'N.~' (i TIMESTAMP TIME INDEX); -Error: 1002(Unexpected), Unexpected, violated: Invalid table name: N.~ +Error: 1004(InvalidArguments), Invalid table name: N.~ DESC TABLE integers; diff --git a/tests/runner/src/env.rs b/tests/runner/src/env.rs index b5218979821a..1bd7ad36496a 100644 --- a/tests/runner/src/env.rs +++ b/tests/runner/src/env.rs @@ -190,7 +190,7 @@ impl Env { "start".to_string(), "-c".to_string(), self.generate_config_file(subcommand, db_ctx), - "--http-addr=127.0.0.1:5001".to_string(), + "--http-addr=127.0.0.1:5002".to_string(), ]; (args, SERVER_ADDR.to_string()) } @@ -213,7 +213,7 @@ impl Env { "true".to_string(), "--enable-region-failover".to_string(), "false".to_string(), - "--http-addr=127.0.0.1:5001".to_string(), + "--http-addr=127.0.0.1:5002".to_string(), ]; (args, METASRV_ADDR.to_string()) }