Skip to content

Commit

Permalink
Merge branch 'main' into show_views
Browse files Browse the repository at this point in the history
  • Loading branch information
lyang24 authored Jul 14, 2024
2 parents a036897 + 5a17322 commit 031cedd
Show file tree
Hide file tree
Showing 64 changed files with 1,547 additions and 702 deletions.
315 changes: 184 additions & 131 deletions Cargo.lock

Large diffs are not rendered by default.

18 changes: 9 additions & 9 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -104,15 +104,15 @@ clap = { version = "4.4", features = ["derive"] }
config = "0.13.0"
crossbeam-utils = "0.8"
dashmap = "5.4"
datafusion = { git = "https://github.com/apache/datafusion.git", rev = "729b356ef543ffcda6813c7b5373507a04ae0109" }
datafusion-common = { git = "https://github.com/apache/datafusion.git", rev = "729b356ef543ffcda6813c7b5373507a04ae0109" }
datafusion-expr = { git = "https://github.com/apache/datafusion.git", rev = "729b356ef543ffcda6813c7b5373507a04ae0109" }
datafusion-functions = { git = "https://github.com/apache/datafusion.git", rev = "729b356ef543ffcda6813c7b5373507a04ae0109" }
datafusion-optimizer = { git = "https://github.com/apache/datafusion.git", rev = "729b356ef543ffcda6813c7b5373507a04ae0109" }
datafusion-physical-expr = { git = "https://github.com/apache/datafusion.git", rev = "729b356ef543ffcda6813c7b5373507a04ae0109" }
datafusion-physical-plan = { git = "https://github.com/apache/datafusion.git", rev = "729b356ef543ffcda6813c7b5373507a04ae0109" }
datafusion-sql = { git = "https://github.com/apache/datafusion.git", rev = "729b356ef543ffcda6813c7b5373507a04ae0109" }
datafusion-substrait = { git = "https://github.com/apache/datafusion.git", rev = "729b356ef543ffcda6813c7b5373507a04ae0109" }
datafusion = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "d7bda5c9b762426e81f144296deadc87e5f4a0b8" }
datafusion-common = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "d7bda5c9b762426e81f144296deadc87e5f4a0b8" }
datafusion-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "d7bda5c9b762426e81f144296deadc87e5f4a0b8" }
datafusion-functions = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "d7bda5c9b762426e81f144296deadc87e5f4a0b8" }
datafusion-optimizer = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "d7bda5c9b762426e81f144296deadc87e5f4a0b8" }
datafusion-physical-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "d7bda5c9b762426e81f144296deadc87e5f4a0b8" }
datafusion-physical-plan = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "d7bda5c9b762426e81f144296deadc87e5f4a0b8" }
datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "d7bda5c9b762426e81f144296deadc87e5f4a0b8" }
datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "d7bda5c9b762426e81f144296deadc87e5f4a0b8" }
derive_builder = "0.12"
dotenv = "0.15"
etcd-client = { version = "0.13" }
Expand Down
1 change: 1 addition & 0 deletions config/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@
| `use_memory_store` | Bool | `false` | Store data in memory. |
| `enable_telemetry` | Bool | `true` | Whether to enable greptimedb telemetry. |
| `store_key_prefix` | String | `""` | If it's not empty, the metasrv will store all data with this key prefix. |
| `enable_region_failover` | Bool | `false` | Whether to enable region failover.<br/>This feature is only available on GreptimeDB running on cluster mode and<br/>- Using Remote WAL<br/>- Using shared storage (e.g., s3). |
| `runtime` | -- | -- | The runtime options. |
| `runtime.read_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |
| `runtime.write_rt_size` | Integer | `8` | The number of threads to execute the runtime for global write operations. |
Expand Down
6 changes: 6 additions & 0 deletions config/metasrv.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ enable_telemetry = true
## If it's not empty, the metasrv will store all data with this key prefix.
store_key_prefix = ""

## Whether to enable region failover.
## This feature is only available on GreptimeDB running on cluster mode and
## - Using Remote WAL
## - Using shared storage (e.g., s3).
enable_region_failover = false

## The runtime options.
[runtime]
## The number of threads to execute the runtime for global read operations.
Expand Down
4 changes: 2 additions & 2 deletions src/catalog/src/information_schema/memory_table/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ pub fn get_schema_columns(table_name: &str) -> (SchemaRef, Vec<VectorRef>) {
"GIT_BRANCH",
"GIT_COMMIT",
"GIT_COMMIT_SHORT",
"GIT_DIRTY",
"GIT_CLEAN",
"PKG_VERSION",
]),
vec![
Expand All @@ -89,7 +89,7 @@ pub fn get_schema_columns(table_name: &str) -> (SchemaRef, Vec<VectorRef>) {
Arc::new(StringVector::from(vec![build_info
.commit_short
.to_string()])),
Arc::new(StringVector::from(vec![build_info.dirty.to_string()])),
Arc::new(StringVector::from(vec![build_info.clean.to_string()])),
Arc::new(StringVector::from(vec![build_info.version.to_string()])),
],
)
Expand Down
2 changes: 1 addition & 1 deletion src/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,4 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] }

[dev-dependencies.substrait_proto]
package = "substrait"
version = "0.17"
version = "0.37"
4 changes: 1 addition & 3 deletions src/cmd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,10 @@ tikv-jemallocator = "0.5"
[dev-dependencies]
client = { workspace = true, features = ["testing"] }
common-test-util.workspace = true
common-version.workspace = true
serde.workspace = true
temp-env = "0.3"
tempfile.workspace = true

[target.'cfg(not(windows))'.dev-dependencies]
rexpect = "0.5"

[build-dependencies]
common-version.workspace = true
2 changes: 1 addition & 1 deletion src/cmd/src/bin/greptime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use cmd::{cli, datanode, flownode, frontend, metasrv, standalone, App};
use common_version::version;

#[derive(Parser)]
#[command(name = "greptime", author, version, long_version = version!(), about)]
#[command(name = "greptime", author, version, long_version = version(), about)]
#[command(propagate_version = true)]
pub(crate) struct Command {
#[clap(subcommand)]
Expand Down
6 changes: 3 additions & 3 deletions src/cmd/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ impl StartCommand {
&opts.component.tracing,
opts.component.node_id.map(|x| x.to_string()),
);
log_versions(version!(), short_version!());
log_versions(version(), short_version());

info!("Datanode start command: {:#?}", self);
info!("Datanode options: {:#?}", opts);
Expand Down Expand Up @@ -338,7 +338,7 @@ mod tests {
mode = "distributed"
enable_memory_catalog = false
node_id = 42
rpc_addr = "127.0.0.1:4001"
rpc_hostname = "192.168.0.1"
[grpc]
Expand All @@ -365,7 +365,7 @@ mod tests {
mode = "distributed"
enable_memory_catalog = false
node_id = 42
[grpc]
addr = "127.0.0.1:3001"
hostname = "127.0.0.1"
Expand Down
2 changes: 1 addition & 1 deletion src/cmd/src/flownode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ impl StartCommand {
&opts.component.tracing,
opts.component.node_id.map(|x| x.to_string()),
);
log_versions(version!(), short_version!());
log_versions(version(), short_version());

info!("Flownode start command: {:#?}", self);
info!("Flownode options: {:#?}", opts);
Expand Down
2 changes: 1 addition & 1 deletion src/cmd/src/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ impl StartCommand {
&opts.component.tracing,
opts.component.node_id.clone(),
);
log_versions(version!(), short_version!());
log_versions(version(), short_version());

info!("Frontend start command: {:#?}", self);
info!("Frontend options: {:#?}", opts);
Expand Down
4 changes: 2 additions & 2 deletions src/cmd/src/metasrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ impl StartCommand {
&opts.component.tracing,
None,
);
log_versions(version!(), short_version!());
log_versions(version(), short_version());

info!("Metasrv start command: {:#?}", self);
info!("Metasrv options: {:#?}", opts);
Expand Down Expand Up @@ -296,7 +296,7 @@ mod tests {
[logging]
level = "debug"
dir = "/tmp/greptimedb/test/logs"
[failure_detector]
threshold = 8.0
min_std_deviation = "100ms"
Expand Down
10 changes: 5 additions & 5 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use common_telemetry::info;
use common_telemetry::logging::{LoggingOptions, TracingOptions};
use common_time::timezone::set_default_timezone;
use common_version::{short_version, version};
use common_wal::config::StandaloneWalConfig;
use common_wal::config::DatanodeWalConfig;
use datanode::config::{DatanodeOptions, ProcedureConfig, RegionEngineConfig, StorageConfig};
use datanode::datanode::{Datanode, DatanodeBuilder};
use file_engine::config::EngineConfig as FileEngineConfig;
Expand Down Expand Up @@ -130,7 +130,7 @@ pub struct StandaloneOptions {
pub opentsdb: OpentsdbOptions,
pub influxdb: InfluxdbOptions,
pub prom_store: PromStoreOptions,
pub wal: StandaloneWalConfig,
pub wal: DatanodeWalConfig,
pub storage: StorageConfig,
pub metadata_store: KvBackendConfig,
pub procedure: ProcedureConfig,
Expand All @@ -155,7 +155,7 @@ impl Default for StandaloneOptions {
opentsdb: OpentsdbOptions::default(),
influxdb: InfluxdbOptions::default(),
prom_store: PromStoreOptions::default(),
wal: StandaloneWalConfig::default(),
wal: DatanodeWalConfig::default(),
storage: StorageConfig::default(),
metadata_store: KvBackendConfig::default(),
procedure: ProcedureConfig::default(),
Expand Down Expand Up @@ -204,7 +204,7 @@ impl StandaloneOptions {
DatanodeOptions {
node_id: Some(0),
enable_telemetry: cloned_opts.enable_telemetry,
wal: cloned_opts.wal.into(),
wal: cloned_opts.wal,
storage: cloned_opts.storage,
region_engine: cloned_opts.region_engine,
grpc: cloned_opts.grpc,
Expand Down Expand Up @@ -413,7 +413,7 @@ impl StartCommand {
&opts.component.tracing,
None,
);
log_versions(version!(), short_version!());
log_versions(version(), short_version());

info!("Standalone start command: {:#?}", self);
info!("Standalone options: {opts:#?}");
Expand Down
4 changes: 2 additions & 2 deletions src/cmd/tests/load_config_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use common_grpc::channel_manager::{
use common_runtime::global::RuntimeOptions;
use common_telemetry::logging::LoggingOptions;
use common_wal::config::raft_engine::RaftEngineConfig;
use common_wal::config::{DatanodeWalConfig, StandaloneWalConfig};
use common_wal::config::DatanodeWalConfig;
use datanode::config::{DatanodeOptions, RegionEngineConfig, StorageConfig};
use file_engine::config::EngineConfig;
use frontend::frontend::FrontendOptions;
Expand Down Expand Up @@ -206,7 +206,7 @@ fn test_load_standalone_example_config() {
},
component: StandaloneOptions {
default_timezone: Some("UTC".to_string()),
wal: StandaloneWalConfig::RaftEngine(RaftEngineConfig {
wal: DatanodeWalConfig::RaftEngine(RaftEngineConfig {
dir: Some("/tmp/greptimedb/wal".to_string()),
sync_period: Some(Duration::from_secs(10)),
..Default::default()
Expand Down
4 changes: 1 addition & 3 deletions src/common/greptimedb-telemetry/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ workspace = true
async-trait.workspace = true
common-runtime.workspace = true
common-telemetry.workspace = true
common-version.workspace = true
reqwest.workspace = true
serde.workspace = true
tokio.workspace = true
Expand All @@ -20,6 +21,3 @@ uuid.workspace = true
common-test-util.workspace = true
hyper = { version = "0.14", features = ["full"] }
tempfile.workspace = true

[build-dependencies]
common-version.workspace = true
17 changes: 0 additions & 17 deletions src/common/greptimedb-telemetry/build.rs

This file was deleted.

10 changes: 6 additions & 4 deletions src/common/greptimedb-telemetry/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::time::Duration;
use common_runtime::error::{Error, Result};
use common_runtime::{BoxedTaskFunction, RepeatedTask, TaskFunction};
use common_telemetry::{debug, info};
use common_version::build_info;
use reqwest::{Client, Response};
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -114,11 +115,11 @@ pub enum Mode {
#[async_trait::async_trait]
pub trait Collector {
fn get_version(&self) -> String {
env!("CARGO_PKG_VERSION").to_string()
build_info().version.to_string()
}

fn get_git_hash(&self) -> String {
env!("GIT_COMMIT").to_string()
build_info().commit.to_string()
}

fn get_os(&self) -> String {
Expand Down Expand Up @@ -286,6 +287,7 @@ mod tests {
use std::time::Duration;

use common_test_util::ports;
use common_version::build_info;
use hyper::service::{make_service_fn, service_fn};
use hyper::Server;
use reqwest::{Client, Response};
Expand Down Expand Up @@ -431,8 +433,8 @@ mod tests {
let body = response.json::<StatisticData>().await.unwrap();
assert_eq!(env::consts::ARCH, body.arch);
assert_eq!(env::consts::OS, body.os);
assert_eq!(env!("CARGO_PKG_VERSION"), body.version);
assert_eq!(env!("GIT_COMMIT"), body.git_commit);
assert_eq!(build_info().version, body.version);
assert_eq!(build_info().commit, body.git_commit);
assert_eq!(Mode::Standalone, body.mode);
assert_eq!(1, body.nodes.unwrap());

Expand Down
7 changes: 6 additions & 1 deletion src/common/meta/src/wal_options_allocator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ pub fn prepare_wal_options(

#[cfg(test)]
mod tests {
use common_wal::config::kafka::common::KafkaTopicConfig;
use common_wal::config::kafka::MetasrvKafkaConfig;
use common_wal::test_util::run_test_with_kafka_wal;

Expand Down Expand Up @@ -160,9 +161,13 @@ mod tests {
.collect::<Vec<_>>();

// Creates a topic manager.
let config = MetasrvKafkaConfig {
let kafka_topic = KafkaTopicConfig {
replication_factor: broker_endpoints.len() as i16,
..Default::default()
};
let config = MetasrvKafkaConfig {
broker_endpoints,
kafka_topic,
..Default::default()
};
let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
Expand Down
21 changes: 13 additions & 8 deletions src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,11 @@ impl TopicManager {
/// Creates a new topic manager.
pub fn new(config: MetasrvKafkaConfig, kv_backend: KvBackendRef) -> Self {
// Topics should be created.
let topics = (0..config.num_topics)
.map(|topic_id| format!("{}_{topic_id}", config.topic_name_prefix))
let topics = (0..config.kafka_topic.num_topics)
.map(|topic_id| format!("{}_{topic_id}", config.kafka_topic.topic_name_prefix))
.collect::<Vec<_>>();

let selector = match config.selector_type {
let selector = match config.kafka_topic.selector_type {
TopicSelectorType::RoundRobin => RoundRobinTopicSelector::with_shuffle(),
};

Expand All @@ -76,7 +76,7 @@ impl TopicManager {
/// The initializer first tries to restore persisted topics from the kv backend.
/// If not enough topics retrieved, the initializer will try to contact the Kafka cluster and request creating more topics.
pub async fn start(&self) -> Result<()> {
let num_topics = self.config.num_topics;
let num_topics = self.config.kafka_topic.num_topics;
ensure!(num_topics > 0, InvalidNumTopicsSnafu { num_topics });

// Topics should be created.
Expand Down Expand Up @@ -185,9 +185,9 @@ impl TopicManager {
match client
.create_topic(
topic.clone(),
self.config.num_partitions,
self.config.replication_factor,
self.config.create_topic_timeout.as_millis() as i32,
self.config.kafka_topic.num_partitions,
self.config.kafka_topic.replication_factor,
self.config.kafka_topic.create_topic_timeout.as_millis() as i32,
)
.await
{
Expand Down Expand Up @@ -242,6 +242,7 @@ impl TopicManager {

#[cfg(test)]
mod tests {
use common_wal::config::kafka::common::KafkaTopicConfig;
use common_wal::test_util::run_test_with_kafka_wal;

use super::*;
Expand Down Expand Up @@ -283,9 +284,13 @@ mod tests {
.collect::<Vec<_>>();

// Creates a topic manager.
let config = MetasrvKafkaConfig {
let kafka_topic = KafkaTopicConfig {
replication_factor: broker_endpoints.len() as i16,
..Default::default()
};
let config = MetasrvKafkaConfig {
broker_endpoints,
kafka_topic,
..Default::default()
};
let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
Expand Down
2 changes: 1 addition & 1 deletion src/common/substrait/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ snafu.workspace = true

[dependencies.substrait_proto]
package = "substrait"
version = "0.17"
version = "0.37"

[dev-dependencies]
datatypes.workspace = true
Expand Down
7 changes: 6 additions & 1 deletion src/common/version/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ workspace = true
codec = ["dep:serde", "dep:schemars"]

[dependencies]
build-data = "0.1.4"
const_format = "0.2"
schemars = { workspace = true, optional = true }
serde = { workspace = true, optional = true }
shadow-rs = "0.29"

[build-dependencies]
build-data = "0.2"
shadow-rs = "0.29"
Loading

0 comments on commit 031cedd

Please sign in to comment.