Skip to content

Commit

Permalink
fix: resolve conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
niebayes committed Dec 26, 2023
2 parents c01da6c + bf635a6 commit 27bbf24
Show file tree
Hide file tree
Showing 28 changed files with 435 additions and 135 deletions.
4 changes: 4 additions & 0 deletions .github/doc-label-config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Doc not needed:
- '- \[x\] This PR does not require documentation updates.'
Doc update required:
- '- \[ \] This PR does not require documentation updates.'
1 change: 1 addition & 0 deletions .github/pull_request_template.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@ Please explain IN DETAIL what the changes are in this PR and why they are needed

- [ ] I have written the necessary rustdoc comments.
- [ ] I have added the necessary unit tests and integration tests.
- [ ] This PR does not require documentation updates.

## Refer to a related PR or issue link (optional)
20 changes: 20 additions & 0 deletions .github/workflows/doc-label.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
name: "PR Doc Labeler"
on:
pull_request:
types: [opened, edited, synchronize, ready_for_review, auto_merge_enabled, labeled, unlabeled]

permissions:
pull-requests: write
contents: read

jobs:
triage:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
runs-on: ubuntu-latest
steps:
- uses: github/[email protected]
with:
configuration-path: .github/doc-label-config.yml
enable-versioned-regex: false
repo-token: ${{ secrets.GITHUB_TOKEN }}
sync-labels: 1
8 changes: 7 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 10 additions & 13 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ use std::{fs, path};
use async_trait::async_trait;
use clap::Parser;
use common_catalog::consts::MIN_USER_TABLE_ID;
use common_config::{metadata_store_dir, KvBackendConfig, WalConfig as DatanodeWalConfig};
use common_config::wal::StandaloneWalConfig;
use common_config::{metadata_store_dir, KvBackendConfig};
use common_meta::cache_invalidator::DummyCacheInvalidator;
use common_meta::datanode_manager::DatanodeManagerRef;
use common_meta::ddl::{DdlTaskExecutorRef, TableMetadataAllocatorRef};
Expand All @@ -27,9 +28,7 @@ use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
use common_meta::region_keeper::MemoryRegionKeeper;
use common_meta::sequence::SequenceBuilder;
use common_meta::wal::{
WalConfig as MetaSrvWalConfig, WalOptionsAllocator, WalOptionsAllocatorRef,
};
use common_meta::wal::{WalOptionsAllocator, WalOptionsAllocatorRef};
use common_procedure::ProcedureManagerRef;
use common_telemetry::info;
use common_telemetry::logging::LoggingOptions;
Expand Down Expand Up @@ -106,8 +105,7 @@ pub struct StandaloneOptions {
pub opentsdb: OpentsdbOptions,
pub influxdb: InfluxdbOptions,
pub prom_store: PromStoreOptions,
pub wal_meta: MetaSrvWalConfig,
pub wal_datanode: DatanodeWalConfig,
pub wal: StandaloneWalConfig,
pub storage: StorageConfig,
pub metadata_store: KvBackendConfig,
pub procedure: ProcedureConfig,
Expand All @@ -130,8 +128,7 @@ impl Default for StandaloneOptions {
opentsdb: OpentsdbOptions::default(),
influxdb: InfluxdbOptions::default(),
prom_store: PromStoreOptions::default(),
wal_meta: MetaSrvWalConfig::default(),
wal_datanode: DatanodeWalConfig::default(),
wal: StandaloneWalConfig::default(),
storage: StorageConfig::default(),
metadata_store: KvBackendConfig::default(),
procedure: ProcedureConfig::default(),
Expand Down Expand Up @@ -170,7 +167,7 @@ impl StandaloneOptions {
DatanodeOptions {
node_id: Some(0),
enable_telemetry: self.enable_telemetry,
wal: self.wal_datanode,
wal: self.wal.into(),
storage: self.storage,
region_engine: self.region_engine,
rpc_addr: self.grpc.addr,
Expand Down Expand Up @@ -342,7 +339,7 @@ impl StartCommand {
let procedure = opts.procedure.clone();
let frontend = opts.clone().frontend_options();
let logging = opts.logging.clone();
let wal_meta = opts.wal_meta.clone();
let wal_meta = opts.wal.clone().into();
let datanode = opts.datanode_options().clone();

Ok(Options::Standalone(Box::new(MixOptions {
Expand Down Expand Up @@ -484,6 +481,7 @@ mod tests {

use auth::{Identity, Password, UserProviderRef};
use common_base::readable_size::ReadableSize;
use common_config::WalConfig;
use common_test_util::temp_dir::create_named_temp_file;
use datanode::config::{FileConfig, GcsConfig};
use servers::Mode;
Expand Down Expand Up @@ -594,7 +592,7 @@ mod tests {
assert_eq!(None, fe_opts.mysql.reject_no_database);
assert!(fe_opts.influxdb.enable);

let DatanodeWalConfig::RaftEngine(raft_engine_config) = dn_opts.wal else {
let WalConfig::RaftEngine(raft_engine_config) = dn_opts.wal else {
unreachable!()
};
assert_eq!("/tmp/greptimedb/test/wal", raft_engine_config.dir.unwrap());
Expand Down Expand Up @@ -740,8 +738,7 @@ mod tests {
assert_eq!(options.opentsdb, default_options.opentsdb);
assert_eq!(options.influxdb, default_options.influxdb);
assert_eq!(options.prom_store, default_options.prom_store);
assert_eq!(options.wal_meta, default_options.wal_meta);
assert_eq!(options.wal_datanode, default_options.wal_datanode);
assert_eq!(options.wal, default_options.wal);
assert_eq!(options.metadata_store, default_options.metadata_store);
assert_eq!(options.procedure, default_options.procedure);
assert_eq!(options.logging, default_options.logging);
Expand Down
56 changes: 40 additions & 16 deletions src/common/config/src/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ pub mod raft_engine;
use serde::{Deserialize, Serialize};
use serde_with::with_prefix;

pub use crate::wal::kafka::{KafkaConfig, KafkaOptions as KafkaWalOptions, Topic as KafkaWalTopic};
pub use crate::wal::kafka::{
KafkaConfig, KafkaOptions as KafkaWalOptions, StandaloneKafkaConfig, Topic as KafkaWalTopic,
};
pub use crate::wal::raft_engine::RaftEngineConfig;

/// An encoded wal options will be wrapped into a (WAL_OPTIONS_KEY, encoded wal options) key-value pair
Expand All @@ -27,30 +29,49 @@ pub const WAL_OPTIONS_KEY: &str = "wal_options";

/// Wal config for datanode.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(tag = "provider")]
#[serde(tag = "provider", rename_all = "snake_case")]
pub enum WalConfig {
#[serde(rename = "raft_engine")]
RaftEngine(RaftEngineConfig),
#[serde(rename = "kafka")]
Kafka(KafkaConfig),
}

impl From<StandaloneWalConfig> for WalConfig {
fn from(value: StandaloneWalConfig) -> Self {
match value {
StandaloneWalConfig::RaftEngine(config) => WalConfig::RaftEngine(config),
StandaloneWalConfig::Kafka(config) => WalConfig::Kafka(config.base),
}
}
}

impl Default for WalConfig {
fn default() -> Self {
WalConfig::Kafka(KafkaConfig::default())
WalConfig::RaftEngine(RaftEngineConfig::default())
}
}

/// Wal config for datanode.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(tag = "provider", rename_all = "snake_case")]
pub enum StandaloneWalConfig {
RaftEngine(RaftEngineConfig),
Kafka(StandaloneKafkaConfig),
}

impl Default for StandaloneWalConfig {
fn default() -> Self {
StandaloneWalConfig::RaftEngine(RaftEngineConfig::default())
}
}

/// Wal options allocated to a region.
/// A wal options is encoded by metasrv with `serde_json::to_string`, and then decoded
/// by datanode with `serde_json::from_str`.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(tag = "wal.provider")]
#[serde(tag = "wal.provider", rename_all = "snake_case")]
pub enum WalOptions {
#[default]
#[serde(rename = "raft_engine")]
RaftEngine,
#[serde(rename = "kafka")]
#[serde(with = "prefix_wal_kafka")]
Kafka(KafkaWalOptions),
}
Expand All @@ -64,31 +85,34 @@ mod tests {
use common_base::readable_size::ReadableSize;
use rskafka::client::partition::Compression as RsKafkaCompression;

use crate::wal::kafka::KafkaBackoffConfig;
use crate::wal::{KafkaConfig, KafkaWalOptions, WalOptions};

#[test]
fn test_serde_kafka_config() {
let toml_str = r#"
broker_endpoints = ["127.0.0.1:9090"]
broker_endpoints = ["127.0.0.1:9092"]
max_batch_size = "4MB"
linger = "200ms"
max_wait_time = "100ms"
produce_record_timeout = "100ms"
backoff_init = "500ms"
backoff_max = "10s"
backoff_base = 2
backoff_deadline = "5mins"
"#;
let decoded: KafkaConfig = toml::from_str(toml_str).unwrap();
let expected = KafkaConfig {
broker_endpoints: vec!["127.0.0.1:9090".to_string()],
broker_endpoints: vec!["127.0.0.1:9092".to_string()],
compression: RsKafkaCompression::default(),
max_batch_size: ReadableSize::mb(4),
linger: Duration::from_millis(200),
max_wait_time: Duration::from_millis(100),
backoff_init: Duration::from_millis(500),
backoff_max: Duration::from_secs(10),
backoff_base: 2,
backoff_deadline: Some(Duration::from_secs(60 * 5)),
produce_record_timeout: Duration::from_millis(100),
backoff: KafkaBackoffConfig {
init: Duration::from_millis(500),
max: Duration::from_secs(10),
base: 2,
deadline: Some(Duration::from_secs(60 * 5)),
},
};
assert_eq!(decoded, expected);
}
Expand Down
91 changes: 76 additions & 15 deletions src/common/config/src/wal/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,21 @@ use std::time::Duration;
use common_base::readable_size::ReadableSize;
use rskafka::client::partition::Compression as RsKafkaCompression;
use serde::{Deserialize, Serialize};
use serde_with::with_prefix;

/// Topic name prefix.
pub const TOPIC_NAME_PREFIX: &str = "greptimedb_wal_topic";
/// Kafka wal topic.
pub type Topic = String;

/// The type of the topic selector, i.e. with which strategy to select a topic.
#[derive(Default, Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum TopicSelectorType {
#[default]
RoundRobin,
}

/// Configurations for kafka wal.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(default)]
Expand All @@ -40,34 +49,86 @@ pub struct KafkaConfig {
pub linger: Duration,
/// The maximum amount of time (in milliseconds) to wait for Kafka records to be returned.
#[serde(with = "humantime_serde")]
pub max_wait_time: Duration,
pub produce_record_timeout: Duration,
/// The backoff config.
#[serde(flatten, with = "kafka_backoff")]
pub backoff: KafkaBackoffConfig,
}

impl Default for KafkaConfig {
fn default() -> Self {
Self {
broker_endpoints: vec!["127.0.0.1:9092".to_string()],
compression: RsKafkaCompression::NoCompression,
max_batch_size: ReadableSize::mb(4),
linger: Duration::from_millis(200),
produce_record_timeout: Duration::from_millis(100),
backoff: KafkaBackoffConfig::default(),
}
}
}

with_prefix!(pub kafka_backoff "backoff_");

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct KafkaBackoffConfig {
/// The initial backoff for kafka clients.
#[serde(with = "humantime_serde")]
pub backoff_init: Duration,
pub init: Duration,
/// The maximum backoff for kafka clients.
#[serde(with = "humantime_serde")]
pub backoff_max: Duration,
pub max: Duration,
/// Exponential backoff rate, i.e. next backoff = base * current backoff.
// Sets to u32 type since some structs containing the KafkaConfig need to derive the Eq trait.
pub backoff_base: u32,
pub base: u32,
/// Stop reconnecting if the total wait time reaches the deadline.
/// If it's None, the reconnecting won't terminate.
#[serde(with = "humantime_serde")]
pub backoff_deadline: Option<Duration>,
pub deadline: Option<Duration>,
}

impl Default for KafkaConfig {
impl Default for KafkaBackoffConfig {
fn default() -> Self {
Self {
broker_endpoints: vec!["127.0.0.1:9092".to_string()],
compression: RsKafkaCompression::NoCompression,
max_batch_size: ReadableSize::mb(4),
linger: Duration::from_millis(200),
max_wait_time: Duration::from_millis(100),
backoff_init: Duration::from_millis(500),
backoff_max: Duration::from_secs(10),
backoff_base: 2,
backoff_deadline: Some(Duration::from_secs(60 * 5)), // 5 mins
init: Duration::from_millis(500),
max: Duration::from_secs(10),
base: 2,
deadline: Some(Duration::from_secs(60 * 5)), // 5 mins
}
}
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct StandaloneKafkaConfig {
#[serde(flatten)]
pub base: KafkaConfig,
/// Number of topics to be created upon start.
pub num_topics: usize,
/// The type of the topic selector with which to select a topic for a region.
pub selector_type: TopicSelectorType,
/// Topic name prefix.
pub topic_name_prefix: String,
/// Number of partitions per topic.
pub num_partitions: i32,
/// The replication factor of each topic.
pub replication_factor: i16,
/// Above which a topic creation operation will be cancelled.
#[serde(with = "humantime_serde")]
pub create_topic_timeout: Duration,
}

impl Default for StandaloneKafkaConfig {
fn default() -> Self {
Self {
base: KafkaConfig::default(),
num_topics: 64,
selector_type: TopicSelectorType::RoundRobin,
topic_name_prefix: "greptimedb_wal_topic".to_string(),
num_partitions: 1,
replication_factor: 3,
create_topic_timeout: Duration::from_secs(30),
}
}
}
Expand Down
Loading

0 comments on commit 27bbf24

Please sign in to comment.