From 285ffc5850df6191d19999989b14d0560412910c Mon Sep 17 00:00:00 2001 From: tison Date: Thu, 11 Jul 2024 20:08:30 -0700 Subject: [PATCH 1/7] fix: build info should use build time env var (#4343) * fix: build info should use build time env var Signed-off-by: tison * catch up Signed-off-by: tison * fixup lifetime Signed-off-by: tison * fixup Signed-off-by: tison * fix more Signed-off-by: tison --------- Signed-off-by: tison --- Cargo.lock | 264 +++++++++++------- .../information_schema/memory_table/tables.rs | 4 +- src/client/Cargo.toml | 2 +- src/cmd/Cargo.toml | 4 +- src/cmd/src/bin/greptime.rs | 2 +- src/cmd/src/datanode.rs | 6 +- src/cmd/src/flownode.rs | 2 +- src/cmd/src/frontend.rs | 2 +- src/cmd/src/metasrv.rs | 4 +- src/cmd/src/standalone.rs | 2 +- src/common/greptimedb-telemetry/Cargo.toml | 4 +- src/common/greptimedb-telemetry/build.rs | 17 -- src/common/greptimedb-telemetry/src/lib.rs | 10 +- src/common/substrait/Cargo.toml | 2 +- src/common/version/Cargo.toml | 7 +- src/{cmd => common/version}/build.rs | 17 +- src/common/version/src/lib.rs | 167 ++++++----- src/servers/build.rs | 3 +- src/servers/src/http/handler.rs | 12 +- src/servers/src/http/prometheus.rs | 6 +- src/servers/tests/http/http_handler_test.rs | 11 +- .../information_schema/cluster_info.result | 10 +- .../information_schema/cluster_info.sql | 10 +- .../standalone/common/function/system.result | 4 +- .../standalone/common/function/system.sql | 2 +- .../common/system/information_schema.result | 4 +- .../information_schema/cluster_info.result | 6 +- .../information_schema/cluster_info.sql | 6 +- 28 files changed, 320 insertions(+), 270 deletions(-) delete mode 100644 src/common/greptimedb-telemetry/build.rs rename src/{cmd => common/version}/build.rs (65%) diff --git a/Cargo.lock b/Cargo.lock index 22db8c1d9365..0acae30c2cab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1180,12 +1180,11 @@ dependencies = [ [[package]] name = "build-data" -version = "0.1.5" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aed3884e2cab7c973c8fd2d150314b6a932df7fdc830edcaf1e8e7c4ae9db3c0" +checksum = "eda20fcece9c23f3c3f4c2751a8a5ca9491c05fa7a69920af65953c3b39b7ce4" dependencies = [ "chrono", - "safe-lock", "safe-regex", ] @@ -1670,7 +1669,7 @@ dependencies = [ "rand", "serde_json", "snafu 0.8.3", - "substrait 0.17.1", + "substrait 0.37.3", "substrait 0.8.2", "tokio", "tokio-stream", @@ -2265,8 +2264,10 @@ name = "common-version" version = "0.8.2" dependencies = [ "build-data", + "const_format", "schemars", "serde", + "shadow-rs", ] [[package]] @@ -2397,6 +2398,32 @@ dependencies = [ "tiny-keccak", ] +[[package]] +name = "const_fn" +version = "0.4.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "373e9fafaa20882876db20562275ff58d50e0caa2590077fe7ce7bef90211d0d" + +[[package]] +name = "const_format" +version = "0.2.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3a214c7af3d04997541b18d432afaff4c455e79e2029079647e72fc2bd27673" +dependencies = [ + "const_format_proc_macros", +] + +[[package]] +name = "const_format_proc_macros" +version = "0.2.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7f6ff08fd20f4f299298a28e2dfa8a8ba1036e6cd2460ac1de7b425d76f2500" +dependencies = [ + "proc-macro2", + "quote", + "unicode-xid", +] + [[package]] name = "constant_time_eq" version = "0.3.0" @@ -4188,9 +4215,9 @@ checksum = "40ecd4077b5ae9fd2e9e169b102c6c330d0605168eb0e8bf79952b256dbefffd" [[package]] name = "git2" -version = "0.18.3" +version = "0.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "232e6a7bfe35766bf715e55a88b39a700596c0ccfd88cd3680b4cdb40d66ef70" +checksum = "b903b73e45dc0c6c596f2d37eccece7c1c8bb6e4407b001096387c63d0d93724" dependencies = [ "bitflags 2.5.0", "libc", @@ -4264,15 +4291,6 @@ dependencies = [ "ahash 0.7.8", ] -[[package]] -name = "hashbrown" -version = "0.13.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43a3c133739dddd0d2990f9a4bdf8eb4b21ef50e4851ca85ab661199821d510e" -dependencies = [ - "ahash 0.8.11", -] - [[package]] name = "hashbrown" version = "0.14.5" @@ -5163,6 +5181,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "is_debug" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06d198e9919d9822d5f7083ba8530e04de87841eaf21ead9af8f2304efd57c89" + [[package]] name = "is_terminal_polyfill" version = "1.70.0" @@ -5635,9 +5659,9 @@ dependencies = [ [[package]] name = "libgit2-sys" -version = "0.16.2+1.7.2" +version = "0.17.0+1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee4126d8b4ee5c9d9ea891dd875cfdc1e9d0950437179104b183d7d8a74d24e8" +checksum = "10472326a8a6477c3c20a64547b0059e4b0d086869eee31e6d7da728a8eb7224" dependencies = [ "cc", "libc", @@ -7499,7 +7523,7 @@ dependencies = [ "pbjson", "pbjson-build", "prost 0.12.6", - "prost-build", + "prost-build 0.12.6", "serde", ] @@ -7955,7 +7979,7 @@ dependencies = [ "once_cell", "parking_lot 0.12.3", "prost 0.12.6", - "prost-build", + "prost-build 0.12.6", "prost-derive 0.12.6", "protobuf", "sha2", @@ -8200,6 +8224,16 @@ dependencies = [ "prost-derive 0.12.6", ] +[[package]] +name = "prost" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e13db3d3fde688c61e2446b4d843bc27a7e8af269a69440c0308021dc92333cc" +dependencies = [ + "bytes", + "prost-derive 0.13.1", +] + [[package]] name = "prost-build" version = "0.12.6" @@ -8221,6 +8255,25 @@ dependencies = [ "tempfile", ] +[[package]] +name = "prost-build" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5bb182580f71dd070f88d01ce3de9f4da5021db7115d2e1c3605a754153b77c1" +dependencies = [ + "bytes", + "heck 0.5.0", + "itertools 0.12.1", + "log", + "multimap", + "once_cell", + "petgraph", + "prost 0.13.1", + "prost-types 0.13.1", + "regex", + "tempfile", +] + [[package]] name = "prost-derive" version = "0.11.9" @@ -8247,6 +8300,19 @@ dependencies = [ "syn 2.0.66", ] +[[package]] +name = "prost-derive" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18bec9b0adc4eba778b33684b7ba3e7137789434769ee3ce3930463ef904cfca" +dependencies = [ + "anyhow", + "itertools 0.12.1", + "proc-macro2", + "quote", + "syn 2.0.66", +] + [[package]] name = "prost-types" version = "0.11.9" @@ -8265,6 +8331,15 @@ dependencies = [ "prost 0.12.6", ] +[[package]] +name = "prost-types" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cee5168b05f49d4b0ca581206eb14a7b22fafd963efe729ac48eb03266e25cc2" +dependencies = [ + "prost 0.13.1", +] + [[package]] name = "protobuf" version = "2.28.0" @@ -8780,16 +8855,6 @@ version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a66a03ae7c801facd77a29370b4faec201768915ac14a721ba36f20bc9c209b" -[[package]] -name = "regress" -version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ed9969cad8051328011596bf549629f1b800cf1731e7964b1eef8dfc480d2c2" -dependencies = [ - "hashbrown 0.13.2", - "memchr", -] - [[package]] name = "regress" version = "0.9.1" @@ -9728,12 +9793,6 @@ version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f" -[[package]] -name = "safe-lock" -version = "0.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "077d73db7973cccf63eb4aff1e5a34dc2459baa867512088269ea5f2f4253c90" - [[package]] name = "safe-proc-macro2" version = "1.0.67" @@ -9754,18 +9813,18 @@ dependencies = [ [[package]] name = "safe-regex" -version = "0.2.6" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6ab4bc484ef480a9ce79b381efd7b6767700f514d47bc599036e9d6f7f3c49d" +checksum = "5194fafa3cb9da89e0cab6dffa1f3fdded586bd6396d12be11b4cae0c7ee45c2" dependencies = [ "safe-regex-macro", ] [[package]] name = "safe-regex-compiler" -version = "0.2.6" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6d71f8c78bffb07962595e1bfa5ed11d24dd855eedc50b6a735f5ef648ce621b" +checksum = "e822ae1e61251bcfd698317c237cf83f7c57161a5dc24ee609a85697f1ed15b3" dependencies = [ "safe-proc-macro2", "safe-quote", @@ -9773,9 +9832,9 @@ dependencies = [ [[package]] name = "safe-regex-macro" -version = "0.2.6" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0909ab4b77511df24201cd66541d6a028887c77ecc065f277c68a12a663274ef" +checksum = "2768de7e6ef19f59c5fd3c3ac207ef12b68a49f95e3172d67e4a04cfd992ca06" dependencies = [ "safe-proc-macro2", "safe-regex-compiler", @@ -10313,6 +10372,19 @@ dependencies = [ "keccak", ] +[[package]] +name = "shadow-rs" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0a600f795d0894cda22235b44eea4b85c2a35b405f65523645ac8e35b306817a" +dependencies = [ + "const_format", + "git2", + "is_debug", + "time", + "tzdb", +] + [[package]] name = "sharded-slab" version = "0.1.7" @@ -11002,21 +11074,23 @@ dependencies = [ "promql", "prost 0.12.6", "snafu 0.8.3", - "substrait 0.17.1", + "substrait 0.37.3", "tokio", ] [[package]] name = "substrait" -version = "0.17.1" +version = "0.34.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1e8440a1c9b95a7c9a00a19f78b980749e8c945eb880687a5d673cea83729c5" +checksum = "04c77dec9b6c4e48ac828937bbe7cf473b0933168c5d76d51a5816ace7046be9" dependencies = [ - "git2", - "heck 0.4.1", + "heck 0.5.0", + "pbjson", + "pbjson-build", + "pbjson-types", "prettyplease", "prost 0.12.6", - "prost-build", + "prost-build 0.12.6", "prost-types 0.12.6", "schemars", "semver", @@ -11024,31 +11098,28 @@ dependencies = [ "serde_json", "serde_yaml", "syn 2.0.66", - "typify 0.0.14", + "typify", "walkdir", ] [[package]] name = "substrait" -version = "0.34.1" +version = "0.37.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04c77dec9b6c4e48ac828937bbe7cf473b0933168c5d76d51a5816ace7046be9" +checksum = "9ec889155c56a34200d2c5aee147b8d29545fa7cce7f68b38d927f5d24ced8ef" dependencies = [ "heck 0.5.0", - "pbjson", - "pbjson-build", - "pbjson-types", "prettyplease", - "prost 0.12.6", - "prost-build", - "prost-types 0.12.6", + "prost 0.13.1", + "prost-build 0.13.1", + "prost-types 0.13.1", "schemars", "semver", "serde", "serde_json", "serde_yaml", "syn 2.0.66", - "typify 0.1.0", + "typify", "walkdir", ] @@ -12090,7 +12161,7 @@ checksum = "be4ef6dd70a610078cb4e338a0f79d06bc759ff1b22d2120c2ff02ae264ba9c2" dependencies = [ "prettyplease", "proc-macro2", - "prost-build", + "prost-build 0.12.6", "quote", "syn 2.0.66", ] @@ -12469,42 +12540,14 @@ dependencies = [ "syn 2.0.66", ] -[[package]] -name = "typify" -version = "0.0.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c2e3b707a653e2915a2fc2c4ee96a3d30b9554b9435eb4cc8b5c6c74bbdd3044" -dependencies = [ - "typify-impl 0.0.14", - "typify-macro 0.0.14", -] - [[package]] name = "typify" version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "adb6beec125971dda80a086f90b4a70f60f222990ce4d63ad0fc140492f53444" dependencies = [ - "typify-impl 0.1.0", - "typify-macro 0.1.0", -] - -[[package]] -name = "typify-impl" -version = "0.0.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d9c752192779f666e4c868672dee56a652b82c08032c7e9d23f6a845b282298" -dependencies = [ - "heck 0.4.1", - "log", - "proc-macro2", - "quote", - "regress 0.7.1", - "schemars", - "serde_json", - "syn 2.0.66", - "thiserror", - "unicode-ident", + "typify-impl", + "typify-macro", ] [[package]] @@ -12517,7 +12560,7 @@ dependencies = [ "log", "proc-macro2", "quote", - "regress 0.9.1", + "regress", "schemars", "semver", "serde", @@ -12529,35 +12572,48 @@ dependencies = [ [[package]] name = "typify-macro" -version = "0.0.14" +version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a14defd554507e72a2bb93cd081c8b374cfed43b3d986b141ad3839d9fd6986b" +checksum = "f8e6491896e955692d68361c68db2b263e3bec317ec0b684e0e2fa882fb6e31e" dependencies = [ "proc-macro2", "quote", "schemars", + "semver", "serde", "serde_json", "serde_tokenstream", "syn 2.0.66", - "typify-impl 0.0.14", + "typify-impl", ] [[package]] -name = "typify-macro" -version = "0.1.0" +name = "tz-rs" +version = "0.6.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8e6491896e955692d68361c68db2b263e3bec317ec0b684e0e2fa882fb6e31e" +checksum = "33851b15c848fad2cf4b105c6bb66eb9512b6f6c44a4b13f57c53c73c707e2b4" dependencies = [ - "proc-macro2", - "quote", - "schemars", - "semver", - "serde", - "serde_json", - "serde_tokenstream", - "syn 2.0.66", - "typify-impl 0.1.0", + "const_fn", +] + +[[package]] +name = "tzdb" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b580f6b365fa89f5767cdb619a55d534d04a4e14c2d7e5b9a31e94598687fb1" +dependencies = [ + "iana-time-zone", + "tz-rs", + "tzdb_data", +] + +[[package]] +name = "tzdb_data" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1889fdffac09d65c1d95c42d5202e9b21ad8c758f426e9fe09088817ea998d6" +dependencies = [ + "tz-rs", ] [[package]] diff --git a/src/catalog/src/information_schema/memory_table/tables.rs b/src/catalog/src/information_schema/memory_table/tables.rs index e1696ab8e106..ecbab36cf8e4 100644 --- a/src/catalog/src/information_schema/memory_table/tables.rs +++ b/src/catalog/src/information_schema/memory_table/tables.rs @@ -80,7 +80,7 @@ pub fn get_schema_columns(table_name: &str) -> (SchemaRef, Vec) { "GIT_BRANCH", "GIT_COMMIT", "GIT_COMMIT_SHORT", - "GIT_DIRTY", + "GIT_CLEAN", "PKG_VERSION", ]), vec![ @@ -89,7 +89,7 @@ pub fn get_schema_columns(table_name: &str) -> (SchemaRef, Vec) { Arc::new(StringVector::from(vec![build_info .commit_short .to_string()])), - Arc::new(StringVector::from(vec![build_info.dirty.to_string()])), + Arc::new(StringVector::from(vec![build_info.clean.to_string()])), Arc::new(StringVector::from(vec![build_info.version.to_string()])), ], ) diff --git a/src/client/Cargo.toml b/src/client/Cargo.toml index 17ef3ac1b721..469d7d1a7e2b 100644 --- a/src/client/Cargo.toml +++ b/src/client/Cargo.toml @@ -49,4 +49,4 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] } [dev-dependencies.substrait_proto] package = "substrait" -version = "0.17" +version = "0.37" diff --git a/src/cmd/Cargo.toml b/src/cmd/Cargo.toml index 6a0ef4a2f23c..ded9025709d3 100644 --- a/src/cmd/Cargo.toml +++ b/src/cmd/Cargo.toml @@ -83,12 +83,10 @@ tikv-jemallocator = "0.5" [dev-dependencies] client = { workspace = true, features = ["testing"] } common-test-util.workspace = true +common-version.workspace = true serde.workspace = true temp-env = "0.3" tempfile.workspace = true [target.'cfg(not(windows))'.dev-dependencies] rexpect = "0.5" - -[build-dependencies] -common-version.workspace = true diff --git a/src/cmd/src/bin/greptime.rs b/src/cmd/src/bin/greptime.rs index d5a35c6837e8..707d2daa05b5 100644 --- a/src/cmd/src/bin/greptime.rs +++ b/src/cmd/src/bin/greptime.rs @@ -21,7 +21,7 @@ use cmd::{cli, datanode, flownode, frontend, metasrv, standalone, App}; use common_version::version; #[derive(Parser)] -#[command(name = "greptime", author, version, long_version = version!(), about)] +#[command(name = "greptime", author, version, long_version = version(), about)] #[command(propagate_version = true)] pub(crate) struct Command { #[clap(subcommand)] diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index b8e929c90af6..eb1281e188f0 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -266,7 +266,7 @@ impl StartCommand { &opts.component.tracing, opts.component.node_id.map(|x| x.to_string()), ); - log_versions(version!(), short_version!()); + log_versions(version(), short_version()); info!("Datanode start command: {:#?}", self); info!("Datanode options: {:#?}", opts); @@ -338,7 +338,7 @@ mod tests { mode = "distributed" enable_memory_catalog = false node_id = 42 - + rpc_addr = "127.0.0.1:4001" rpc_hostname = "192.168.0.1" [grpc] @@ -365,7 +365,7 @@ mod tests { mode = "distributed" enable_memory_catalog = false node_id = 42 - + [grpc] addr = "127.0.0.1:3001" hostname = "127.0.0.1" diff --git a/src/cmd/src/flownode.rs b/src/cmd/src/flownode.rs index 554c5d46c0d2..328693f326fc 100644 --- a/src/cmd/src/flownode.rs +++ b/src/cmd/src/flownode.rs @@ -214,7 +214,7 @@ impl StartCommand { &opts.component.tracing, opts.component.node_id.map(|x| x.to_string()), ); - log_versions(version!(), short_version!()); + log_versions(version(), short_version()); info!("Flownode start command: {:#?}", self); info!("Flownode options: {:#?}", opts); diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index c94950adf727..5789a0321686 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -259,7 +259,7 @@ impl StartCommand { &opts.component.tracing, opts.component.node_id.clone(), ); - log_versions(version!(), short_version!()); + log_versions(version(), short_version()); info!("Frontend start command: {:#?}", self); info!("Frontend options: {:#?}", opts); diff --git a/src/cmd/src/metasrv.rs b/src/cmd/src/metasrv.rs index 3b89fdce112e..55845936bb74 100644 --- a/src/cmd/src/metasrv.rs +++ b/src/cmd/src/metasrv.rs @@ -233,7 +233,7 @@ impl StartCommand { &opts.component.tracing, None, ); - log_versions(version!(), short_version!()); + log_versions(version(), short_version()); info!("Metasrv start command: {:#?}", self); info!("Metasrv options: {:#?}", opts); @@ -296,7 +296,7 @@ mod tests { [logging] level = "debug" dir = "/tmp/greptimedb/test/logs" - + [failure_detector] threshold = 8.0 min_std_deviation = "100ms" diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index bb531a79a3f9..cf025ccf4e2e 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -413,7 +413,7 @@ impl StartCommand { &opts.component.tracing, None, ); - log_versions(version!(), short_version!()); + log_versions(version(), short_version()); info!("Standalone start command: {:#?}", self); info!("Standalone options: {opts:#?}"); diff --git a/src/common/greptimedb-telemetry/Cargo.toml b/src/common/greptimedb-telemetry/Cargo.toml index b46f1e5aa902..90bca28052dd 100644 --- a/src/common/greptimedb-telemetry/Cargo.toml +++ b/src/common/greptimedb-telemetry/Cargo.toml @@ -11,6 +11,7 @@ workspace = true async-trait.workspace = true common-runtime.workspace = true common-telemetry.workspace = true +common-version.workspace = true reqwest.workspace = true serde.workspace = true tokio.workspace = true @@ -20,6 +21,3 @@ uuid.workspace = true common-test-util.workspace = true hyper = { version = "0.14", features = ["full"] } tempfile.workspace = true - -[build-dependencies] -common-version.workspace = true diff --git a/src/common/greptimedb-telemetry/build.rs b/src/common/greptimedb-telemetry/build.rs deleted file mode 100644 index 5b7f1458843c..000000000000 --- a/src/common/greptimedb-telemetry/build.rs +++ /dev/null @@ -1,17 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -fn main() { - common_version::setup_build_info(); -} diff --git a/src/common/greptimedb-telemetry/src/lib.rs b/src/common/greptimedb-telemetry/src/lib.rs index 6f036f34e7b0..1f02c524e7c5 100644 --- a/src/common/greptimedb-telemetry/src/lib.rs +++ b/src/common/greptimedb-telemetry/src/lib.rs @@ -22,6 +22,7 @@ use std::time::Duration; use common_runtime::error::{Error, Result}; use common_runtime::{BoxedTaskFunction, RepeatedTask, TaskFunction}; use common_telemetry::{debug, info}; +use common_version::build_info; use reqwest::{Client, Response}; use serde::{Deserialize, Serialize}; @@ -114,11 +115,11 @@ pub enum Mode { #[async_trait::async_trait] pub trait Collector { fn get_version(&self) -> String { - env!("CARGO_PKG_VERSION").to_string() + build_info().version.to_string() } fn get_git_hash(&self) -> String { - env!("GIT_COMMIT").to_string() + build_info().commit.to_string() } fn get_os(&self) -> String { @@ -286,6 +287,7 @@ mod tests { use std::time::Duration; use common_test_util::ports; + use common_version::build_info; use hyper::service::{make_service_fn, service_fn}; use hyper::Server; use reqwest::{Client, Response}; @@ -431,8 +433,8 @@ mod tests { let body = response.json::().await.unwrap(); assert_eq!(env::consts::ARCH, body.arch); assert_eq!(env::consts::OS, body.os); - assert_eq!(env!("CARGO_PKG_VERSION"), body.version); - assert_eq!(env!("GIT_COMMIT"), body.git_commit); + assert_eq!(build_info().version, body.version); + assert_eq!(build_info().commit, body.git_commit); assert_eq!(Mode::Standalone, body.mode); assert_eq!(1, body.nodes.unwrap()); diff --git a/src/common/substrait/Cargo.toml b/src/common/substrait/Cargo.toml index 3da8b6310017..0e4e498cd305 100644 --- a/src/common/substrait/Cargo.toml +++ b/src/common/substrait/Cargo.toml @@ -23,7 +23,7 @@ snafu.workspace = true [dependencies.substrait_proto] package = "substrait" -version = "0.17" +version = "0.37" [dev-dependencies] datatypes.workspace = true diff --git a/src/common/version/Cargo.toml b/src/common/version/Cargo.toml index 1f7444e22b39..6d602cabfe0b 100644 --- a/src/common/version/Cargo.toml +++ b/src/common/version/Cargo.toml @@ -11,6 +11,11 @@ workspace = true codec = ["dep:serde", "dep:schemars"] [dependencies] -build-data = "0.1.4" +const_format = "0.2" schemars = { workspace = true, optional = true } serde = { workspace = true, optional = true } +shadow-rs = "0.29" + +[build-dependencies] +build-data = "0.2" +shadow-rs = "0.29" diff --git a/src/cmd/build.rs b/src/common/version/build.rs similarity index 65% rename from src/cmd/build.rs rename to src/common/version/build.rs index 87615c2c990f..eeb383771864 100644 --- a/src/cmd/build.rs +++ b/src/common/version/build.rs @@ -12,9 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. -fn main() { - // Trigger this script if the git branch/commit changes - println!("cargo:rerun-if-changed=.git/refs/heads"); +use build_data::{format_timestamp, get_source_time}; - common_version::setup_build_info(); +fn main() -> shadow_rs::SdResult<()> { + println!("cargo:rerun-if-changed=.git/refs/heads"); + println!( + "cargo:rustc-env=SOURCE_TIMESTAMP={}", + if let Ok(t) = get_source_time() { + format_timestamp(t) + } else { + "".to_string() + } + ); + build_data::set_BUILD_TIMESTAMP(); + shadow_rs::new() } diff --git a/src/common/version/src/lib.rs b/src/common/version/src/lib.rs index fd179769cd2b..a5a350c1ac72 100644 --- a/src/common/version/src/lib.rs +++ b/src/common/version/src/lib.rs @@ -12,28 +12,23 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::borrow::Cow; +#![allow(clippy::print_stdout)] + use std::fmt::Display; -use std::sync::OnceLock; -const UNKNOWN: &str = "unknown"; +shadow_rs::shadow!(build); #[derive(Clone, Debug, PartialEq)] -#[cfg_attr( - feature = "codec", - derive(serde::Serialize, serde::Deserialize, schemars::JsonSchema) -)] pub struct BuildInfo { - pub branch: Cow<'static, str>, - pub commit: Cow<'static, str>, - pub commit_short: Cow<'static, str>, - pub dirty: Cow<'static, str>, - pub timestamp: Cow<'static, str>, - - /// Rustc Version - pub rustc: Cow<'static, str>, - /// GreptimeDB Version - pub version: Cow<'static, str>, + pub branch: &'static str, + pub commit: &'static str, + pub commit_short: &'static str, + pub clean: bool, + pub source_time: &'static str, + pub build_time: &'static str, + pub rustc: &'static str, + pub target: &'static str, + pub version: &'static str, } impl Display for BuildInfo { @@ -45,7 +40,7 @@ impl Display for BuildInfo { format!("branch: {}", self.branch), format!("commit: {}", self.commit), format!("commit_short: {}", self.commit_short), - format!("dirty: {}", self.dirty), + format!("clean: {}", self.clean), format!("version: {}", self.version), ] .join("\n") @@ -53,78 +48,82 @@ impl Display for BuildInfo { } } -static BUILD: OnceLock = OnceLock::new(); - -pub fn build_info() -> &'static BuildInfo { - BUILD.get_or_init(|| { - let branch = build_data::get_git_branch() - .map(Cow::Owned) - .unwrap_or(Cow::Borrowed(UNKNOWN)); - let commit = build_data::get_git_commit() - .map(Cow::Owned) - .unwrap_or(Cow::Borrowed(UNKNOWN)); - let commit_short = build_data::get_git_commit_short() - .map(Cow::Owned) - .unwrap_or(Cow::Borrowed(UNKNOWN)); - let dirty = build_data::get_git_dirty() - .map(|b| Cow::Owned(b.to_string())) - .unwrap_or(Cow::Borrowed(UNKNOWN)); - let timestamp = build_data::get_source_time() - .map(|ts| Cow::Owned(build_data::format_timestamp(ts))) - .unwrap_or(Cow::Borrowed(UNKNOWN)); - let rustc = build_data::get_rustc_version() - .map(Cow::Owned) - .unwrap_or(Cow::Borrowed(UNKNOWN)); - let version = Cow::Borrowed(env!("CARGO_PKG_VERSION")); +#[derive(Clone, Debug, PartialEq)] +#[cfg_attr( + feature = "codec", + derive(serde::Serialize, serde::Deserialize, schemars::JsonSchema) +)] +pub struct OwnedBuildInfo { + pub branch: String, + pub commit: String, + pub commit_short: String, + pub clean: bool, + pub source_time: String, + pub build_time: String, + pub rustc: String, + pub target: String, + pub version: String, +} - BuildInfo { - branch, - commit, - commit_short, - dirty, - timestamp, - rustc, - version, +impl From for OwnedBuildInfo { + fn from(info: BuildInfo) -> Self { + OwnedBuildInfo { + branch: info.branch.to_string(), + commit: info.commit.to_string(), + commit_short: info.commit_short.to_string(), + clean: info.clean, + source_time: info.source_time.to_string(), + build_time: info.build_time.to_string(), + rustc: info.rustc.to_string(), + target: info.target.to_string(), + version: info.version.to_string(), } - }) + } +} + +impl Display for OwnedBuildInfo { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}", + [ + format!("branch: {}", self.branch), + format!("commit: {}", self.commit), + format!("commit_short: {}", self.commit_short), + format!("clean: {}", self.clean), + format!("version: {}", self.version), + ] + .join("\n") + ) + } } -#[allow(clippy::print_stdout)] -pub fn setup_build_info() { - let build_info = build_info(); - println!("cargo:rustc-env=GIT_COMMIT={}", build_info.commit); - println!( - "cargo:rustc-env=GIT_COMMIT_SHORT={}", - build_info.commit_short - ); - println!("cargo:rustc-env=GIT_BRANCH={}", build_info.branch); - println!("cargo:rustc-env=GIT_DIRTY={}", build_info.dirty); - println!("cargo:rustc-env=GIT_DIRTY={}", build_info.dirty); - println!("cargo:rustc-env=RUSTC_VERSION={}", build_info.rustc); - println!("cargo:rustc-env=SOURCE_TIMESTAMP={}", build_info.timestamp); +pub const fn build_info() -> BuildInfo { + BuildInfo { + branch: build::BRANCH, + commit: build::COMMIT_HASH, + commit_short: build::SHORT_COMMIT, + clean: build::GIT_CLEAN, + source_time: env!("SOURCE_TIMESTAMP"), + build_time: env!("BUILD_TIMESTAMP"), + rustc: build::RUST_VERSION, + target: build::BUILD_TARGET, + version: build::PKG_VERSION, + } } -/// Get the string for the output of cli "--version". -#[macro_export] -macro_rules! version { - () => { - concat!( - "\nbranch: ", - env!("GIT_BRANCH"), - "\ncommit: ", - env!("GIT_COMMIT"), - "\ndirty: ", - env!("GIT_DIRTY"), - "\nversion: ", - env!("CARGO_PKG_VERSION") - ) - }; +const BUILD_INFO: BuildInfo = build_info(); + +pub const fn version() -> &'static str { + const_format::formatcp!( + "\nbranch: {}\ncommit: {}\nclean: {}\nversion: {}", + BUILD_INFO.branch, + BUILD_INFO.commit, + BUILD_INFO.clean, + BUILD_INFO.version, + ) } -/// Short version for reporting metrics. -#[macro_export] -macro_rules! short_version { - () => { - concat!(env!("GIT_BRANCH"), "-", env!("GIT_COMMIT_SHORT")) - }; +pub const fn short_version() -> &'static str { + const_format::formatcp!("{}-{}", BUILD_INFO.branch, BUILD_INFO.commit_short,) } diff --git a/src/servers/build.rs b/src/servers/build.rs index 0b735a6800fb..5f7081d02423 100644 --- a/src/servers/build.rs +++ b/src/servers/build.rs @@ -13,7 +13,6 @@ // limitations under the License. fn main() { - common_version::setup_build_info(); #[cfg(feature = "dashboard")] fetch_dashboard_assets(); } @@ -24,7 +23,7 @@ fn fetch_dashboard_assets() { let message = "Failed to fetch dashboard assets"; let help = r#" -You can manually execute "fetch-dashboard-assets.sh" to see why, +You can manually execute "fetch-dashboard-assets.sh" to see why, or it's a network error, just try again or enable/disable some proxy."#; // It's very unlikely to be failed to get the current dir here, see `current_dir`'s docs. diff --git a/src/servers/src/http/handler.rs b/src/servers/src/http/handler.rs index cfdd7d5d8922..c5e599dfe921 100644 --- a/src/servers/src/http/handler.rs +++ b/src/servers/src/http/handler.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::collections::HashMap; -use std::env; use std::time::Instant; use aide::transform::TransformOperation; @@ -311,13 +310,14 @@ pub async fn status() -> Json> { let hostname = hostname::get() .map(|s| s.to_string_lossy().to_string()) .unwrap_or_else(|_| "unknown".to_string()); + let build_info = common_version::build_info(); Json(StatusResponse { - source_time: env!("SOURCE_TIMESTAMP"), - commit: env!("GIT_COMMIT"), - branch: env!("GIT_BRANCH"), - rustc_version: env!("RUSTC_VERSION"), + source_time: build_info.source_time, + commit: build_info.commit, + branch: build_info.branch, + rustc_version: build_info.rustc, hostname, - version: env!("CARGO_PKG_VERSION"), + version: build_info.version, }) } diff --git a/src/servers/src/http/prometheus.rs b/src/servers/src/http/prometheus.rs index 8f24dc11f721..33a702d09d75 100644 --- a/src/servers/src/http/prometheus.rs +++ b/src/servers/src/http/prometheus.rs @@ -26,7 +26,7 @@ use common_query::{Output, OutputData}; use common_recordbatch::RecordBatches; use common_telemetry::tracing; use common_time::util::{current_time_rfc3339, yesterday_rfc3339}; -use common_version::BuildInfo; +use common_version::OwnedBuildInfo; use datatypes::prelude::ConcreteDataType; use datatypes::scalars::ScalarVector; use datatypes::vectors::{Float64Vector, StringVector}; @@ -100,7 +100,7 @@ pub enum PrometheusResponse { Series(Vec>), LabelValues(Vec), FormatQuery(String), - BuildInfo(BuildInfo), + BuildInfo(OwnedBuildInfo), } impl Default for PrometheusResponse { @@ -148,7 +148,7 @@ pub struct BuildInfoQuery {} )] pub async fn build_info_query() -> PrometheusJsonResponse { let build_info = common_version::build_info().clone(); - PrometheusJsonResponse::success(PrometheusResponse::BuildInfo(build_info)) + PrometheusJsonResponse::success(PrometheusResponse::BuildInfo(build_info.into())) } #[derive(Debug, Default, Serialize, Deserialize, JsonSchema)] diff --git a/src/servers/tests/http/http_handler_test.rs b/src/servers/tests/http/http_handler_test.rs index 97ae34afd97e..9ff41ec43426 100644 --- a/src/servers/tests/http/http_handler_test.rs +++ b/src/servers/tests/http/http_handler_test.rs @@ -568,13 +568,14 @@ async fn test_status() { let hostname = hostname::get() .map(|s| s.to_string_lossy().to_string()) .unwrap_or_else(|_| "unknown".to_string()); + let build_info = common_version::build_info(); let expected_json = http_handler::StatusResponse { - source_time: env!("SOURCE_TIMESTAMP"), - commit: env!("GIT_COMMIT"), - branch: env!("GIT_BRANCH"), - rustc_version: env!("RUSTC_VERSION"), + source_time: build_info.source_time, + commit: build_info.commit, + branch: build_info.branch, + rustc_version: build_info.rustc, hostname, - version: env!("CARGO_PKG_VERSION"), + version: build_info.version, }; let Json(json) = http_handler::status().await; diff --git a/tests/cases/distributed/information_schema/cluster_info.result b/tests/cases/distributed/information_schema/cluster_info.result index c2c29a6f2d76..a59d59ee6082 100644 --- a/tests/cases/distributed/information_schema/cluster_info.result +++ b/tests/cases/distributed/information_schema/cluster_info.result @@ -19,7 +19,7 @@ DESC TABLE CLUSTER_INFO; -- SQLNESS REPLACE version node_version -- SQLNESS REPLACE (\s\d\.\d\.\d\s) Version --- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash +-- SQLNESS REPLACE (\s[a-z0-9]{7,8}\s) Hash -- SQLNESS REPLACE (\s[\-0-9T:\.]{15,}) Start_time -- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration -- SQLNESS REPLACE [\s\-]+ @@ -29,7 +29,7 @@ SELECT * FROM CLUSTER_INFO ORDER BY peer_type; -- SQLNESS REPLACE version node_version -- SQLNESS REPLACE (\s\d\.\d\.\d\s) Version --- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash +-- SQLNESS REPLACE (\s[a-z0-9]{7,8}\s) Hash -- SQLNESS REPLACE (\s[\-0-9T:\.]{15,}) Start_time -- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration -- SQLNESS REPLACE [\s\-]+ @@ -39,7 +39,7 @@ SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE = 'METASRV' ORDER BY peer_type; -- SQLNESS REPLACE version node_version -- SQLNESS REPLACE (\s\d\.\d\.\d\s) Version --- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash +-- SQLNESS REPLACE (\s[a-z0-9]{7,8}\s) Hash -- SQLNESS REPLACE (\s[\-0-9T:\.]{15,}) Start_time -- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration -- SQLNESS REPLACE [\s\-]+ @@ -49,7 +49,7 @@ SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE = 'FRONTEND' ORDER BY peer_type; -- SQLNESS REPLACE version node_version -- SQLNESS REPLACE (\s\d\.\d\.\d\s) Version --- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash +-- SQLNESS REPLACE (\s[a-z0-9]{7,8}\s) Hash -- SQLNESS REPLACE (\s[\-0-9T:\.]{15,}) Start_time -- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration -- SQLNESS REPLACE [\s\-]+ @@ -59,7 +59,7 @@ SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE != 'FRONTEND' ORDER BY peer_type; -- SQLNESS REPLACE version node_version -- SQLNESS REPLACE (\s\d\.\d\.\d\s) Version --- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash +-- SQLNESS REPLACE (\s[a-z0-9]{7,8}\s) Hash -- SQLNESS REPLACE (\s[\-0-9T:\.]{15,}) Start_time -- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration -- SQLNESS REPLACE [\s\-]+ diff --git a/tests/cases/distributed/information_schema/cluster_info.sql b/tests/cases/distributed/information_schema/cluster_info.sql index fcdd4eb5106a..f017e2f7fdf3 100644 --- a/tests/cases/distributed/information_schema/cluster_info.sql +++ b/tests/cases/distributed/information_schema/cluster_info.sql @@ -4,7 +4,7 @@ DESC TABLE CLUSTER_INFO; -- SQLNESS REPLACE version node_version -- SQLNESS REPLACE (\s\d\.\d\.\d\s) Version --- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash +-- SQLNESS REPLACE (\s[a-z0-9]{7,8}\s) Hash -- SQLNESS REPLACE (\s[\-0-9T:\.]{15,}) Start_time -- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration -- SQLNESS REPLACE [\s\-]+ @@ -12,7 +12,7 @@ SELECT * FROM CLUSTER_INFO ORDER BY peer_type; -- SQLNESS REPLACE version node_version -- SQLNESS REPLACE (\s\d\.\d\.\d\s) Version --- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash +-- SQLNESS REPLACE (\s[a-z0-9]{7,8}\s) Hash -- SQLNESS REPLACE (\s[\-0-9T:\.]{15,}) Start_time -- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration -- SQLNESS REPLACE [\s\-]+ @@ -20,7 +20,7 @@ SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE = 'METASRV' ORDER BY peer_type; -- SQLNESS REPLACE version node_version -- SQLNESS REPLACE (\s\d\.\d\.\d\s) Version --- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash +-- SQLNESS REPLACE (\s[a-z0-9]{7,8}\s) Hash -- SQLNESS REPLACE (\s[\-0-9T:\.]{15,}) Start_time -- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration -- SQLNESS REPLACE [\s\-]+ @@ -28,7 +28,7 @@ SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE = 'FRONTEND' ORDER BY peer_type; -- SQLNESS REPLACE version node_version -- SQLNESS REPLACE (\s\d\.\d\.\d\s) Version --- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash +-- SQLNESS REPLACE (\s[a-z0-9]{7,8}\s) Hash -- SQLNESS REPLACE (\s[\-0-9T:\.]{15,}) Start_time -- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration -- SQLNESS REPLACE [\s\-]+ @@ -36,7 +36,7 @@ SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE != 'FRONTEND' ORDER BY peer_type; -- SQLNESS REPLACE version node_version -- SQLNESS REPLACE (\s\d\.\d\.\d\s) Version --- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash +-- SQLNESS REPLACE (\s[a-z0-9]{7,8}\s) Hash -- SQLNESS REPLACE (\s[\-0-9T:\.]{15,}) Start_time -- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration -- SQLNESS REPLACE [\s\-]+ diff --git a/tests/cases/standalone/common/function/system.result b/tests/cases/standalone/common/function/system.result index 5af6bbfcd006..ba03e47f1385 100644 --- a/tests/cases/standalone/common/function/system.result +++ b/tests/cases/standalone/common/function/system.result @@ -1,12 +1,12 @@ -- SQLNESS REPLACE branch:\s+.+ branch: BRANCH -- SQLNESS REPLACE commit:\s+.+ commit: COMMIT -- SQLNESS REPLACE commit_short:\s+.+ commit_short: COMMIT_SHORT --- SQLNESS REPLACE dirty:\s+.+ dirty: DIRTY +-- SQLNESS REPLACE clean:\s+.+ clean: CLEAN -- SQLNESS REPLACE version:\s+.+ version: VERSION -- SQLNESS REPLACE [\s\-]+ SELECT build(); -++|build()|++|branch:BRANCH|commit:COMMIT|commit_short:COMMIT_SHORT|dirty:DIRTY|version:VERSION++ +++|build()|++|branch:BRANCH|commit:COMMIT|commit_short:COMMIT_SHORT|clean:CLEAN|version:VERSION++ -- SQLNESS REPLACE greptimedb-[\d\.]+ greptimedb-VERSION SELECT version(); diff --git a/tests/cases/standalone/common/function/system.sql b/tests/cases/standalone/common/function/system.sql index 9a4e25487cf4..d945c8baf800 100644 --- a/tests/cases/standalone/common/function/system.sql +++ b/tests/cases/standalone/common/function/system.sql @@ -1,7 +1,7 @@ -- SQLNESS REPLACE branch:\s+.+ branch: BRANCH -- SQLNESS REPLACE commit:\s+.+ commit: COMMIT -- SQLNESS REPLACE commit_short:\s+.+ commit_short: COMMIT_SHORT --- SQLNESS REPLACE dirty:\s+.+ dirty: DIRTY +-- SQLNESS REPLACE clean:\s+.+ clean: CLEAN -- SQLNESS REPLACE version:\s+.+ version: VERSION -- SQLNESS REPLACE [\s\-]+ SELECT build(); diff --git a/tests/cases/standalone/common/system/information_schema.result b/tests/cases/standalone/common/system/information_schema.result index 3f38df1c43ed..ffcff633eb40 100644 --- a/tests/cases/standalone/common/system/information_schema.result +++ b/tests/cases/standalone/common/system/information_schema.result @@ -51,9 +51,9 @@ select * from information_schema.columns order by table_schema, table_name, colu | table_catalog | table_schema | table_name | column_name | ordinal_position | character_maximum_length | character_octet_length | numeric_precision | numeric_scale | datetime_precision | character_set_name | collation_name | column_key | extra | privileges | generation_expression | greptime_data_type | data_type | semantic_type | column_default | is_nullable | column_type | column_comment | srs_id | +---------------+--------------------+---------------------------------------+-----------------------------------+------------------+--------------------------+------------------------+-------------------+---------------+--------------------+--------------------+----------------+------------+-------+---------------+-----------------------+----------------------+-----------------+---------------+----------------+-------------+-----------------+----------------+--------+ | greptime | information_schema | build_info | git_branch | 1 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | +| greptime | information_schema | build_info | git_clean | 4 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | | greptime | information_schema | build_info | git_commit | 2 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | | greptime | information_schema | build_info | git_commit_short | 3 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | -| greptime | information_schema | build_info | git_dirty | 4 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | | greptime | information_schema | build_info | pkg_version | 5 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | | greptime | information_schema | character_sets | character_set_name | 1 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | | greptime | information_schema | character_sets | default_collate_name | 2 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | @@ -578,7 +578,7 @@ desc table build_info; | git_branch | String | | NO | | FIELD | | git_commit | String | | NO | | FIELD | | git_commit_short | String | | NO | | FIELD | -| git_dirty | String | | NO | | FIELD | +| git_clean | String | | NO | | FIELD | | pkg_version | String | | NO | | FIELD | +------------------+--------+-----+------+---------+---------------+ diff --git a/tests/cases/standalone/information_schema/cluster_info.result b/tests/cases/standalone/information_schema/cluster_info.result index df549b652fba..629e3865c9ca 100644 --- a/tests/cases/standalone/information_schema/cluster_info.result +++ b/tests/cases/standalone/information_schema/cluster_info.result @@ -19,7 +19,7 @@ DESC TABLE CLUSTER_INFO; -- SQLNESS REPLACE version node_version -- SQLNESS REPLACE (\d\.\d\.\d) Version --- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash +-- SQLNESS REPLACE (\s[a-z0-9]{7,8}\s) Hash -- SQLNESS REPLACE (\s[\-0-9T:\.]{15,}) Start_time -- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration -- SQLNESS REPLACE [\s\-]+ @@ -29,7 +29,7 @@ SELECT * FROM CLUSTER_INFO; -- SQLNESS REPLACE version node_version -- SQLNESS REPLACE (\d\.\d\.\d) Version --- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash +-- SQLNESS REPLACE (\s[a-z0-9]{7,8}\s) Hash -- SQLNESS REPLACE (\s[\-0-9T:\.]{15,}) Start_time -- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration -- SQLNESS REPLACE [\s\-]+ @@ -44,7 +44,7 @@ SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE != 'STANDALONE'; -- SQLNESS REPLACE version node_version -- SQLNESS REPLACE (\d\.\d\.\d) Version --- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash +-- SQLNESS REPLACE (\s[a-z0-9]{7,8}\s) Hash -- SQLNESS REPLACE (\s[\-0-9T:\.]{15,}) Start_time -- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration -- SQLNESS REPLACE [\s\-]+ diff --git a/tests/cases/standalone/information_schema/cluster_info.sql b/tests/cases/standalone/information_schema/cluster_info.sql index 4905f6d1fd23..e0a1fce07e71 100644 --- a/tests/cases/standalone/information_schema/cluster_info.sql +++ b/tests/cases/standalone/information_schema/cluster_info.sql @@ -4,7 +4,7 @@ DESC TABLE CLUSTER_INFO; -- SQLNESS REPLACE version node_version -- SQLNESS REPLACE (\d\.\d\.\d) Version --- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash +-- SQLNESS REPLACE (\s[a-z0-9]{7,8}\s) Hash -- SQLNESS REPLACE (\s[\-0-9T:\.]{15,}) Start_time -- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration -- SQLNESS REPLACE [\s\-]+ @@ -12,7 +12,7 @@ SELECT * FROM CLUSTER_INFO; -- SQLNESS REPLACE version node_version -- SQLNESS REPLACE (\d\.\d\.\d) Version --- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash +-- SQLNESS REPLACE (\s[a-z0-9]{7,8}\s) Hash -- SQLNESS REPLACE (\s[\-0-9T:\.]{15,}) Start_time -- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration -- SQLNESS REPLACE [\s\-]+ @@ -22,7 +22,7 @@ SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE != 'STANDALONE'; -- SQLNESS REPLACE version node_version -- SQLNESS REPLACE (\d\.\d\.\d) Version --- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash +-- SQLNESS REPLACE (\s[a-z0-9]{7,8}\s) Hash -- SQLNESS REPLACE (\s[\-0-9T:\.]{15,}) Start_time -- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration -- SQLNESS REPLACE [\s\-]+ From 63acc30ce744283b105db2274d2e110a5d71a63e Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Fri, 12 Jul 2024 14:56:13 +0800 Subject: [PATCH 2/7] =?UTF-8?q?perf:=20fine=E2=80=93tuned=20plan=20steps?= =?UTF-8?q?=20(#4258)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * perf: fine–tuned plan steps Signed-off-by: Ruihang Xia * fix clippy Signed-off-by: Ruihang Xia * handle explain plan Signed-off-by: Ruihang Xia * handle explain plan again Signed-off-by: Ruihang Xia * fix clippy Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- Cargo.lock | 47 ++++++++++++++---------------- Cargo.toml | 18 ++++++------ src/query/src/datafusion.rs | 36 ++++++++++++++++++++++- src/query/src/dist_plan/planner.rs | 16 +++++----- 4 files changed, 73 insertions(+), 44 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0acae30c2cab..fdf1952380d1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -739,12 +739,9 @@ dependencies = [ [[package]] name = "atomic" -version = "0.6.0" +version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d818003e740b63afc82337e3160717f4f63078720a810b7b903e70a5d1d2994" -dependencies = [ - "bytemuck", -] +checksum = "c59bdb34bc650a32731b31bd8f0829cc15d24a708ee31559e0bb34f2bc320cba" [[package]] name = "atomic-waker" @@ -2766,7 +2763,7 @@ checksum = "e8566979429cf69b49a5c740c60791108e86440e8be149bbea4fe54d2c32d6e2" [[package]] name = "datafusion" version = "38.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=729b356ef543ffcda6813c7b5373507a04ae0109#729b356ef543ffcda6813c7b5373507a04ae0109" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=d7bda5c9b762426e81f144296deadc87e5f4a0b8#d7bda5c9b762426e81f144296deadc87e5f4a0b8" dependencies = [ "ahash 0.8.11", "arrow", @@ -2818,7 +2815,7 @@ dependencies = [ [[package]] name = "datafusion-common" version = "38.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=729b356ef543ffcda6813c7b5373507a04ae0109#729b356ef543ffcda6813c7b5373507a04ae0109" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=d7bda5c9b762426e81f144296deadc87e5f4a0b8#d7bda5c9b762426e81f144296deadc87e5f4a0b8" dependencies = [ "ahash 0.8.11", "arrow", @@ -2839,7 +2836,7 @@ dependencies = [ [[package]] name = "datafusion-common-runtime" version = "38.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=729b356ef543ffcda6813c7b5373507a04ae0109#729b356ef543ffcda6813c7b5373507a04ae0109" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=d7bda5c9b762426e81f144296deadc87e5f4a0b8#d7bda5c9b762426e81f144296deadc87e5f4a0b8" dependencies = [ "tokio", ] @@ -2847,7 +2844,7 @@ dependencies = [ [[package]] name = "datafusion-execution" version = "38.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=729b356ef543ffcda6813c7b5373507a04ae0109#729b356ef543ffcda6813c7b5373507a04ae0109" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=d7bda5c9b762426e81f144296deadc87e5f4a0b8#d7bda5c9b762426e81f144296deadc87e5f4a0b8" dependencies = [ "arrow", "chrono", @@ -2867,7 +2864,7 @@ dependencies = [ [[package]] name = "datafusion-expr" version = "38.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=729b356ef543ffcda6813c7b5373507a04ae0109#729b356ef543ffcda6813c7b5373507a04ae0109" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=d7bda5c9b762426e81f144296deadc87e5f4a0b8#d7bda5c9b762426e81f144296deadc87e5f4a0b8" dependencies = [ "ahash 0.8.11", "arrow", @@ -2884,7 +2881,7 @@ dependencies = [ [[package]] name = "datafusion-functions" version = "38.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=729b356ef543ffcda6813c7b5373507a04ae0109#729b356ef543ffcda6813c7b5373507a04ae0109" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=d7bda5c9b762426e81f144296deadc87e5f4a0b8#d7bda5c9b762426e81f144296deadc87e5f4a0b8" dependencies = [ "arrow", "base64 0.22.1", @@ -2910,7 +2907,7 @@ dependencies = [ [[package]] name = "datafusion-functions-aggregate" version = "38.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=729b356ef543ffcda6813c7b5373507a04ae0109#729b356ef543ffcda6813c7b5373507a04ae0109" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=d7bda5c9b762426e81f144296deadc87e5f4a0b8#d7bda5c9b762426e81f144296deadc87e5f4a0b8" dependencies = [ "ahash 0.8.11", "arrow", @@ -2927,7 +2924,7 @@ dependencies = [ [[package]] name = "datafusion-functions-array" version = "38.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=729b356ef543ffcda6813c7b5373507a04ae0109#729b356ef543ffcda6813c7b5373507a04ae0109" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=d7bda5c9b762426e81f144296deadc87e5f4a0b8#d7bda5c9b762426e81f144296deadc87e5f4a0b8" dependencies = [ "arrow", "arrow-array", @@ -2946,7 +2943,7 @@ dependencies = [ [[package]] name = "datafusion-optimizer" version = "38.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=729b356ef543ffcda6813c7b5373507a04ae0109#729b356ef543ffcda6813c7b5373507a04ae0109" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=d7bda5c9b762426e81f144296deadc87e5f4a0b8#d7bda5c9b762426e81f144296deadc87e5f4a0b8" dependencies = [ "arrow", "async-trait", @@ -2964,7 +2961,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" version = "38.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=729b356ef543ffcda6813c7b5373507a04ae0109#729b356ef543ffcda6813c7b5373507a04ae0109" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=d7bda5c9b762426e81f144296deadc87e5f4a0b8#d7bda5c9b762426e81f144296deadc87e5f4a0b8" dependencies = [ "ahash 0.8.11", "arrow", @@ -2994,7 +2991,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr-common" version = "38.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=729b356ef543ffcda6813c7b5373507a04ae0109#729b356ef543ffcda6813c7b5373507a04ae0109" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=d7bda5c9b762426e81f144296deadc87e5f4a0b8#d7bda5c9b762426e81f144296deadc87e5f4a0b8" dependencies = [ "arrow", "datafusion-common", @@ -3005,7 +3002,7 @@ dependencies = [ [[package]] name = "datafusion-physical-plan" version = "38.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=729b356ef543ffcda6813c7b5373507a04ae0109#729b356ef543ffcda6813c7b5373507a04ae0109" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=d7bda5c9b762426e81f144296deadc87e5f4a0b8#d7bda5c9b762426e81f144296deadc87e5f4a0b8" dependencies = [ "ahash 0.8.11", "arrow", @@ -3038,7 +3035,7 @@ dependencies = [ [[package]] name = "datafusion-sql" version = "38.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=729b356ef543ffcda6813c7b5373507a04ae0109#729b356ef543ffcda6813c7b5373507a04ae0109" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=d7bda5c9b762426e81f144296deadc87e5f4a0b8#d7bda5c9b762426e81f144296deadc87e5f4a0b8" dependencies = [ "arrow", "arrow-array", @@ -3054,7 +3051,7 @@ dependencies = [ [[package]] name = "datafusion-substrait" version = "38.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=729b356ef543ffcda6813c7b5373507a04ae0109#729b356ef543ffcda6813c7b5373507a04ae0109" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=d7bda5c9b762426e81f144296deadc87e5f4a0b8#d7bda5c9b762426e81f144296deadc87e5f4a0b8" dependencies = [ "async-recursion", "chrono", @@ -6410,9 +6407,9 @@ dependencies = [ [[package]] name = "multimap" -version = "0.8.3" +version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" +checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03" [[package]] name = "mur3" @@ -12900,9 +12897,9 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" [[package]] name = "uuid" -version = "1.9.1" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5de17fd2f7da591098415cff336e12965a28061ddace43b59cb3c430179c9439" +checksum = "a183cf7feeba97b4dd1c0d46788634f6221d87fa961b305bed08c851829efcc0" dependencies = [ "atomic", "getrandom", @@ -12913,9 +12910,9 @@ dependencies = [ [[package]] name = "uuid-macro-internal" -version = "1.9.1" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3ff64d5cde1e2cb5268bdb497235b6bd255ba8244f910dbc3574e59593de68c" +checksum = "9881bea7cbe687e36c9ab3b778c36cd0487402e270304e8b1296d5085303c1a2" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index 26d2baf2b7cd..3985bb525534 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -104,15 +104,15 @@ clap = { version = "4.4", features = ["derive"] } config = "0.13.0" crossbeam-utils = "0.8" dashmap = "5.4" -datafusion = { git = "https://github.com/apache/datafusion.git", rev = "729b356ef543ffcda6813c7b5373507a04ae0109" } -datafusion-common = { git = "https://github.com/apache/datafusion.git", rev = "729b356ef543ffcda6813c7b5373507a04ae0109" } -datafusion-expr = { git = "https://github.com/apache/datafusion.git", rev = "729b356ef543ffcda6813c7b5373507a04ae0109" } -datafusion-functions = { git = "https://github.com/apache/datafusion.git", rev = "729b356ef543ffcda6813c7b5373507a04ae0109" } -datafusion-optimizer = { git = "https://github.com/apache/datafusion.git", rev = "729b356ef543ffcda6813c7b5373507a04ae0109" } -datafusion-physical-expr = { git = "https://github.com/apache/datafusion.git", rev = "729b356ef543ffcda6813c7b5373507a04ae0109" } -datafusion-physical-plan = { git = "https://github.com/apache/datafusion.git", rev = "729b356ef543ffcda6813c7b5373507a04ae0109" } -datafusion-sql = { git = "https://github.com/apache/datafusion.git", rev = "729b356ef543ffcda6813c7b5373507a04ae0109" } -datafusion-substrait = { git = "https://github.com/apache/datafusion.git", rev = "729b356ef543ffcda6813c7b5373507a04ae0109" } +datafusion = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "d7bda5c9b762426e81f144296deadc87e5f4a0b8" } +datafusion-common = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "d7bda5c9b762426e81f144296deadc87e5f4a0b8" } +datafusion-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "d7bda5c9b762426e81f144296deadc87e5f4a0b8" } +datafusion-functions = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "d7bda5c9b762426e81f144296deadc87e5f4a0b8" } +datafusion-optimizer = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "d7bda5c9b762426e81f144296deadc87e5f4a0b8" } +datafusion-physical-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "d7bda5c9b762426e81f144296deadc87e5f4a0b8" } +datafusion-physical-plan = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "d7bda5c9b762426e81f144296deadc87e5f4a0b8" } +datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "d7bda5c9b762426e81f144296deadc87e5f4a0b8" } +datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "d7bda5c9b762426e81f144296deadc87e5f4a0b8" } derive_builder = "0.12" dotenv = "0.15" etcd-client = { version = "0.13" } diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs index 14c01b05d389..fe57987a5c7c 100644 --- a/src/query/src/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -48,6 +48,7 @@ use table::TableRef; use crate::analyze::DistAnalyzeExec; use crate::dataframe::DataFrame; pub use crate::datafusion::planner::DfContextProviderAdapter; +use crate::dist_plan::MergeScanLogicalPlan; use crate::error::{ CatalogSnafu, CreateRecordBatchSnafu, DataFusionSnafu, MissingTableMutationHandlerSnafu, MissingTimestampColumnSnafu, QueryExecutionSnafu, Result, TableMutationSnafu, @@ -373,8 +374,41 @@ impl PhysicalPlanner for DatafusionQueryEngine { match logical_plan { LogicalPlan::DfPlan(df_plan) => { let state = ctx.state(); + + // special handle EXPLAIN plan + if matches!(df_plan, DfLogicalPlan::Explain(_)) { + return state + .create_physical_plan(df_plan) + .await + .context(error::DatafusionSnafu) + .map_err(BoxedError::new) + .context(QueryExecutionSnafu); + } + + // analyze first + let analyzed_plan = state + .analyzer() + .execute_and_check(df_plan.clone(), state.config_options(), |_, _| {}) + .context(error::DatafusionSnafu) + .map_err(BoxedError::new) + .context(QueryExecutionSnafu)?; + // skip optimize for MergeScan + let optimized_plan = if let DfLogicalPlan::Extension(ext) = &analyzed_plan + && ext.node.name() == MergeScanLogicalPlan::name() + { + analyzed_plan.clone() + } else { + state + .optimizer() + .optimize(analyzed_plan, state, |_, _| {}) + .context(error::DatafusionSnafu) + .map_err(BoxedError::new) + .context(QueryExecutionSnafu)? + }; + let physical_plan = state - .create_physical_plan(df_plan) + .query_planner() + .create_physical_plan(&optimized_plan, state) .await .context(error::DatafusionSnafu) .map_err(BoxedError::new) diff --git a/src/query/src/dist_plan/planner.rs b/src/query/src/dist_plan/planner.rs index 41227e868730..73168ff1bda8 100644 --- a/src/query/src/dist_plan/planner.rs +++ b/src/query/src/dist_plan/planner.rs @@ -27,7 +27,6 @@ use datafusion::physical_planner::{ExtensionPlanner, PhysicalPlanner}; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor}; use datafusion_common::TableReference; use datafusion_expr::{LogicalPlan, UserDefinedLogicalNode}; -use datafusion_optimizer::analyzer::Analyzer; use session::context::QueryContext; use snafu::{OptionExt, ResultExt}; use store_api::storage::RegionId; @@ -72,8 +71,9 @@ impl ExtensionPlanner for DistExtensionPlanner { let input_plan = merge_scan.input(); let fallback = |logical_plan| async move { + let optimized_plan = self.optimize_input_logical_plan(session_state, logical_plan)?; planner - .create_physical_plan(logical_plan, session_state) + .create_physical_plan(&optimized_plan, session_state) .await .map(Some) }; @@ -83,15 +83,15 @@ impl ExtensionPlanner for DistExtensionPlanner { return fallback(input_plan).await; } - let optimized_plan = self.optimize_input_logical_plan(session_state, input_plan)?; + let optimized_plan = input_plan; let Some(table_name) = Self::extract_full_table_name(input_plan)? else { // no relation found in input plan, going to execute them locally - return fallback(&optimized_plan).await; + return fallback(optimized_plan).await; }; let Ok(regions) = self.get_regions(&table_name).await else { // no peers found, going to execute them locally - return fallback(&optimized_plan).await; + return fallback(optimized_plan).await; }; // TODO(ruihang): generate different execution plans for different variant merge operation @@ -137,16 +137,14 @@ impl DistExtensionPlanner { Ok(table.table_info().region_ids()) } - // TODO(ruihang): find a more elegant way to optimize input logical plan + /// Input logical plan is analyzed. Thus only call logical optimizer to optimize it. fn optimize_input_logical_plan( &self, session_state: &SessionState, plan: &LogicalPlan, ) -> Result { let state = session_state.clone(); - let analyzer = Analyzer::default(); - let state = state.with_analyzer_rules(analyzer.rules); - state.optimize(plan) + state.optimizer().optimize(plan.clone(), &state, |_, _| {}) } } From 05c7d3eb42adc1f7d14b93c4362d8a7dc2384909 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Fri, 12 Jul 2024 16:09:25 +0800 Subject: [PATCH 3/7] docs(config): add enable_region_failover option to configuration (#4355) docs(config): Add enable_region_failover option to configuration --- config/config.md | 1 + config/metasrv.example.toml | 6 ++++++ 2 files changed, 7 insertions(+) diff --git a/config/config.md b/config/config.md index cb90eac70f48..fb322e3e604d 100644 --- a/config/config.md +++ b/config/config.md @@ -259,6 +259,7 @@ | `use_memory_store` | Bool | `false` | Store data in memory. | | `enable_telemetry` | Bool | `true` | Whether to enable greptimedb telemetry. | | `store_key_prefix` | String | `""` | If it's not empty, the metasrv will store all data with this key prefix. | +| `enable_region_failover` | Bool | `false` | Whether to enable region failover.
This feature is only available on GreptimeDB running on cluster mode and
- Using Remote WAL
- Using shared storage (e.g., s3). | | `runtime` | -- | -- | The runtime options. | | `runtime.read_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. | | `runtime.write_rt_size` | Integer | `8` | The number of threads to execute the runtime for global write operations. | diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml index 1128d274cef2..e341479ca8f1 100644 --- a/config/metasrv.example.toml +++ b/config/metasrv.example.toml @@ -25,6 +25,12 @@ enable_telemetry = true ## If it's not empty, the metasrv will store all data with this key prefix. store_key_prefix = "" +## Whether to enable region failover. +## This feature is only available on GreptimeDB running on cluster mode and +## - Using Remote WAL +## - Using shared storage (e.g., s3). +enable_region_failover = false + ## The runtime options. [runtime] ## The number of threads to execute the runtime for global read operations. From 9f2d53c3df4575f361e666dc3b9858beb6718e38 Mon Sep 17 00:00:00 2001 From: irenjj Date: Fri, 12 Jul 2024 16:17:18 +0800 Subject: [PATCH 4/7] refactor: Remove the StandaloneKafkaConfig struct (#4253) * refactor: Remove the StandaloneKafkaConfig struct * remove the redundant assignment * remove rudundant struct * simplify replication_factor * add KafkaTopicConfig * fix check * fix check * fix check * add flatten with * revert config.md * fix test params * fix test param * fix missing params when provider is kafka * remove unsed files * remove with prefix * fix doc * fix test * fix clippy --- src/cmd/src/standalone.rs | 8 +- src/cmd/tests/load_config_test.rs | 4 +- src/common/meta/src/wal_options_allocator.rs | 7 +- .../kafka/topic_manager.rs | 21 ++-- src/common/wal/src/config.rs | 113 +++++------------- src/common/wal/src/config/kafka.rs | 2 - src/common/wal/src/config/kafka/common.rs | 34 ++++++ src/common/wal/src/config/kafka/datanode.rs | 6 +- src/common/wal/src/config/kafka/metasrv.rs | 30 +---- tests-integration/src/tests/test_util.rs | 15 ++- tests-integration/tests/region_migration.rs | 50 +++++--- 11 files changed, 150 insertions(+), 140 deletions(-) diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index cf025ccf4e2e..47e503c1874c 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -40,7 +40,7 @@ use common_telemetry::info; use common_telemetry::logging::{LoggingOptions, TracingOptions}; use common_time::timezone::set_default_timezone; use common_version::{short_version, version}; -use common_wal::config::StandaloneWalConfig; +use common_wal::config::DatanodeWalConfig; use datanode::config::{DatanodeOptions, ProcedureConfig, RegionEngineConfig, StorageConfig}; use datanode::datanode::{Datanode, DatanodeBuilder}; use file_engine::config::EngineConfig as FileEngineConfig; @@ -130,7 +130,7 @@ pub struct StandaloneOptions { pub opentsdb: OpentsdbOptions, pub influxdb: InfluxdbOptions, pub prom_store: PromStoreOptions, - pub wal: StandaloneWalConfig, + pub wal: DatanodeWalConfig, pub storage: StorageConfig, pub metadata_store: KvBackendConfig, pub procedure: ProcedureConfig, @@ -155,7 +155,7 @@ impl Default for StandaloneOptions { opentsdb: OpentsdbOptions::default(), influxdb: InfluxdbOptions::default(), prom_store: PromStoreOptions::default(), - wal: StandaloneWalConfig::default(), + wal: DatanodeWalConfig::default(), storage: StorageConfig::default(), metadata_store: KvBackendConfig::default(), procedure: ProcedureConfig::default(), @@ -204,7 +204,7 @@ impl StandaloneOptions { DatanodeOptions { node_id: Some(0), enable_telemetry: cloned_opts.enable_telemetry, - wal: cloned_opts.wal.into(), + wal: cloned_opts.wal, storage: cloned_opts.storage, region_engine: cloned_opts.region_engine, grpc: cloned_opts.grpc, diff --git a/src/cmd/tests/load_config_test.rs b/src/cmd/tests/load_config_test.rs index cf1cf6bc5048..fff3f3e8c9a6 100644 --- a/src/cmd/tests/load_config_test.rs +++ b/src/cmd/tests/load_config_test.rs @@ -24,7 +24,7 @@ use common_grpc::channel_manager::{ use common_runtime::global::RuntimeOptions; use common_telemetry::logging::LoggingOptions; use common_wal::config::raft_engine::RaftEngineConfig; -use common_wal::config::{DatanodeWalConfig, StandaloneWalConfig}; +use common_wal::config::DatanodeWalConfig; use datanode::config::{DatanodeOptions, RegionEngineConfig, StorageConfig}; use file_engine::config::EngineConfig; use frontend::frontend::FrontendOptions; @@ -206,7 +206,7 @@ fn test_load_standalone_example_config() { }, component: StandaloneOptions { default_timezone: Some("UTC".to_string()), - wal: StandaloneWalConfig::RaftEngine(RaftEngineConfig { + wal: DatanodeWalConfig::RaftEngine(RaftEngineConfig { dir: Some("/tmp/greptimedb/wal".to_string()), sync_period: Some(Duration::from_secs(10)), ..Default::default() diff --git a/src/common/meta/src/wal_options_allocator.rs b/src/common/meta/src/wal_options_allocator.rs index 09b03c5b7dca..5fb3db6e20eb 100644 --- a/src/common/meta/src/wal_options_allocator.rs +++ b/src/common/meta/src/wal_options_allocator.rs @@ -123,6 +123,7 @@ pub fn prepare_wal_options( #[cfg(test)] mod tests { + use common_wal::config::kafka::common::KafkaTopicConfig; use common_wal::config::kafka::MetasrvKafkaConfig; use common_wal::test_util::run_test_with_kafka_wal; @@ -160,9 +161,13 @@ mod tests { .collect::>(); // Creates a topic manager. - let config = MetasrvKafkaConfig { + let kafka_topic = KafkaTopicConfig { replication_factor: broker_endpoints.len() as i16, + ..Default::default() + }; + let config = MetasrvKafkaConfig { broker_endpoints, + kafka_topic, ..Default::default() }; let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef; diff --git a/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs b/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs index fb0130d0dfc7..ec88e37cd14d 100644 --- a/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs +++ b/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs @@ -56,11 +56,11 @@ impl TopicManager { /// Creates a new topic manager. pub fn new(config: MetasrvKafkaConfig, kv_backend: KvBackendRef) -> Self { // Topics should be created. - let topics = (0..config.num_topics) - .map(|topic_id| format!("{}_{topic_id}", config.topic_name_prefix)) + let topics = (0..config.kafka_topic.num_topics) + .map(|topic_id| format!("{}_{topic_id}", config.kafka_topic.topic_name_prefix)) .collect::>(); - let selector = match config.selector_type { + let selector = match config.kafka_topic.selector_type { TopicSelectorType::RoundRobin => RoundRobinTopicSelector::with_shuffle(), }; @@ -76,7 +76,7 @@ impl TopicManager { /// The initializer first tries to restore persisted topics from the kv backend. /// If not enough topics retrieved, the initializer will try to contact the Kafka cluster and request creating more topics. pub async fn start(&self) -> Result<()> { - let num_topics = self.config.num_topics; + let num_topics = self.config.kafka_topic.num_topics; ensure!(num_topics > 0, InvalidNumTopicsSnafu { num_topics }); // Topics should be created. @@ -185,9 +185,9 @@ impl TopicManager { match client .create_topic( topic.clone(), - self.config.num_partitions, - self.config.replication_factor, - self.config.create_topic_timeout.as_millis() as i32, + self.config.kafka_topic.num_partitions, + self.config.kafka_topic.replication_factor, + self.config.kafka_topic.create_topic_timeout.as_millis() as i32, ) .await { @@ -242,6 +242,7 @@ impl TopicManager { #[cfg(test)] mod tests { + use common_wal::config::kafka::common::KafkaTopicConfig; use common_wal::test_util::run_test_with_kafka_wal; use super::*; @@ -283,9 +284,13 @@ mod tests { .collect::>(); // Creates a topic manager. - let config = MetasrvKafkaConfig { + let kafka_topic = KafkaTopicConfig { replication_factor: broker_endpoints.len() as i16, + ..Default::default() + }; + let config = MetasrvKafkaConfig { broker_endpoints, + kafka_topic, ..Default::default() }; let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef; diff --git a/src/common/wal/src/config.rs b/src/common/wal/src/config.rs index 0b47c32ee21d..6edee1703c81 100644 --- a/src/common/wal/src/config.rs +++ b/src/common/wal/src/config.rs @@ -17,7 +17,7 @@ pub mod raft_engine; use serde::{Deserialize, Serialize}; -use crate::config::kafka::{DatanodeKafkaConfig, MetasrvKafkaConfig, StandaloneKafkaConfig}; +use crate::config::kafka::{DatanodeKafkaConfig, MetasrvKafkaConfig}; use crate::config::raft_engine::RaftEngineConfig; /// Wal configurations for metasrv. @@ -43,80 +43,43 @@ impl Default for DatanodeWalConfig { } } -/// Wal configurations for standalone. -#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] -#[serde(tag = "provider", rename_all = "snake_case")] -pub enum StandaloneWalConfig { - RaftEngine(RaftEngineConfig), - Kafka(StandaloneKafkaConfig), -} - -impl Default for StandaloneWalConfig { - fn default() -> Self { - Self::RaftEngine(RaftEngineConfig::default()) - } -} - -impl From for MetasrvWalConfig { - fn from(config: StandaloneWalConfig) -> Self { +impl From for MetasrvWalConfig { + fn from(config: DatanodeWalConfig) -> Self { match config { - StandaloneWalConfig::RaftEngine(_) => Self::RaftEngine, - StandaloneWalConfig::Kafka(config) => Self::Kafka(MetasrvKafkaConfig { + DatanodeWalConfig::RaftEngine(_) => Self::RaftEngine, + DatanodeWalConfig::Kafka(config) => Self::Kafka(MetasrvKafkaConfig { broker_endpoints: config.broker_endpoints, - num_topics: config.num_topics, - selector_type: config.selector_type, - topic_name_prefix: config.topic_name_prefix, - num_partitions: config.num_partitions, - replication_factor: config.replication_factor, - create_topic_timeout: config.create_topic_timeout, backoff: config.backoff, + kafka_topic: config.kafka_topic, }), } } } -impl From for StandaloneWalConfig { +impl From for DatanodeWalConfig { fn from(config: MetasrvWalConfig) -> Self { match config { MetasrvWalConfig::RaftEngine => Self::RaftEngine(RaftEngineConfig::default()), - MetasrvWalConfig::Kafka(config) => Self::Kafka(StandaloneKafkaConfig { + MetasrvWalConfig::Kafka(config) => Self::Kafka(DatanodeKafkaConfig { broker_endpoints: config.broker_endpoints, - num_topics: config.num_topics, - selector_type: config.selector_type, - topic_name_prefix: config.topic_name_prefix, - num_partitions: config.num_partitions, - replication_factor: config.replication_factor, - create_topic_timeout: config.create_topic_timeout, backoff: config.backoff, + kafka_topic: config.kafka_topic, ..Default::default() }), } } } -impl From for DatanodeWalConfig { - fn from(config: StandaloneWalConfig) -> Self { - match config { - StandaloneWalConfig::RaftEngine(config) => Self::RaftEngine(config), - StandaloneWalConfig::Kafka(config) => Self::Kafka(DatanodeKafkaConfig { - broker_endpoints: config.broker_endpoints, - max_batch_bytes: config.max_batch_bytes, - consumer_wait_timeout: config.consumer_wait_timeout, - backoff: config.backoff, - }), - } - } -} - #[cfg(test)] mod tests { use std::time::Duration; use common_base::readable_size::ReadableSize; + use tests::kafka::common::KafkaTopicConfig; use super::*; use crate::config::kafka::common::BackoffConfig; - use crate::config::{DatanodeKafkaConfig, MetasrvKafkaConfig, StandaloneKafkaConfig}; + use crate::config::{DatanodeKafkaConfig, MetasrvKafkaConfig}; use crate::TopicSelectorType; #[test] @@ -168,11 +131,6 @@ mod tests { let toml_str = r#" provider = "kafka" broker_endpoints = ["127.0.0.1:9092"] - num_topics = 32 - selector_type = "round_robin" - topic_name_prefix = "greptimedb_wal_topic" - replication_factor = 1 - create_topic_timeout = "30s" max_batch_bytes = "1MB" linger = "200ms" consumer_wait_timeout = "100ms" @@ -180,24 +138,32 @@ mod tests { backoff_max = "10s" backoff_base = 2 backoff_deadline = "5mins" + num_topics = 32 + num_partitions = 1 + selector_type = "round_robin" + replication_factor = 1 + create_topic_timeout = "30s" + topic_name_prefix = "greptimedb_wal_topic" "#; // Deserialized to MetasrvWalConfig. let metasrv_wal_config: MetasrvWalConfig = toml::from_str(toml_str).unwrap(); let expected = MetasrvKafkaConfig { broker_endpoints: vec!["127.0.0.1:9092".to_string()], - num_topics: 32, - selector_type: TopicSelectorType::RoundRobin, - topic_name_prefix: "greptimedb_wal_topic".to_string(), - num_partitions: 1, - replication_factor: 1, - create_topic_timeout: Duration::from_secs(30), backoff: BackoffConfig { init: Duration::from_millis(500), max: Duration::from_secs(10), base: 2, deadline: Some(Duration::from_secs(60 * 5)), }, + kafka_topic: KafkaTopicConfig { + num_topics: 32, + selector_type: TopicSelectorType::RoundRobin, + topic_name_prefix: "greptimedb_wal_topic".to_string(), + num_partitions: 1, + replication_factor: 1, + create_topic_timeout: Duration::from_secs(30), + }, }; assert_eq!(metasrv_wal_config, MetasrvWalConfig::Kafka(expected)); @@ -213,28 +179,15 @@ mod tests { base: 2, deadline: Some(Duration::from_secs(60 * 5)), }, - }; - assert_eq!(datanode_wal_config, DatanodeWalConfig::Kafka(expected)); - - // Deserialized to StandaloneWalConfig. - let standalone_wal_config: StandaloneWalConfig = toml::from_str(toml_str).unwrap(); - let expected = StandaloneKafkaConfig { - broker_endpoints: vec!["127.0.0.1:9092".to_string()], - num_topics: 32, - selector_type: TopicSelectorType::RoundRobin, - topic_name_prefix: "greptimedb_wal_topic".to_string(), - num_partitions: 1, - replication_factor: 1, - create_topic_timeout: Duration::from_secs(30), - max_batch_bytes: ReadableSize::mb(1), - consumer_wait_timeout: Duration::from_millis(100), - backoff: BackoffConfig { - init: Duration::from_millis(500), - max: Duration::from_secs(10), - base: 2, - deadline: Some(Duration::from_secs(60 * 5)), + kafka_topic: KafkaTopicConfig { + num_topics: 32, + selector_type: TopicSelectorType::RoundRobin, + topic_name_prefix: "greptimedb_wal_topic".to_string(), + num_partitions: 1, + replication_factor: 1, + create_topic_timeout: Duration::from_secs(30), }, }; - assert_eq!(standalone_wal_config, StandaloneWalConfig::Kafka(expected)); + assert_eq!(datanode_wal_config, DatanodeWalConfig::Kafka(expected)); } } diff --git a/src/common/wal/src/config/kafka.rs b/src/common/wal/src/config/kafka.rs index f47e444521f2..27265d00987e 100644 --- a/src/common/wal/src/config/kafka.rs +++ b/src/common/wal/src/config/kafka.rs @@ -15,8 +15,6 @@ pub mod common; pub mod datanode; pub mod metasrv; -pub mod standalone; pub use datanode::DatanodeKafkaConfig; pub use metasrv::MetasrvKafkaConfig; -pub use standalone::StandaloneKafkaConfig; diff --git a/src/common/wal/src/config/kafka/common.rs b/src/common/wal/src/config/kafka/common.rs index ea708d96159c..e61823938546 100644 --- a/src/common/wal/src/config/kafka/common.rs +++ b/src/common/wal/src/config/kafka/common.rs @@ -17,6 +17,8 @@ use std::time::Duration; use serde::{Deserialize, Serialize}; use serde_with::with_prefix; +use crate::{TopicSelectorType, TOPIC_NAME_PREFIX}; + with_prefix!(pub backoff_prefix "backoff_"); /// Backoff configurations for kafka clients. @@ -46,3 +48,35 @@ impl Default for BackoffConfig { } } } + +/// Topic configurations for kafka clients. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(default)] +pub struct KafkaTopicConfig { + /// Number of topics to be created upon start. + pub num_topics: usize, + /// Number of partitions per topic. + pub num_partitions: i32, + /// The type of the topic selector with which to select a topic for a region. + pub selector_type: TopicSelectorType, + /// The replication factor of each topic. + pub replication_factor: i16, + /// The timeout of topic creation. + #[serde(with = "humantime_serde")] + pub create_topic_timeout: Duration, + /// Topic name prefix. + pub topic_name_prefix: String, +} + +impl Default for KafkaTopicConfig { + fn default() -> Self { + Self { + num_topics: 64, + num_partitions: 1, + selector_type: TopicSelectorType::RoundRobin, + replication_factor: 1, + create_topic_timeout: Duration::from_secs(30), + topic_name_prefix: TOPIC_NAME_PREFIX.to_string(), + } + } +} diff --git a/src/common/wal/src/config/kafka/datanode.rs b/src/common/wal/src/config/kafka/datanode.rs index ae97c1017cf5..b01e0635f637 100644 --- a/src/common/wal/src/config/kafka/datanode.rs +++ b/src/common/wal/src/config/kafka/datanode.rs @@ -17,7 +17,7 @@ use std::time::Duration; use common_base::readable_size::ReadableSize; use serde::{Deserialize, Serialize}; -use crate::config::kafka::common::{backoff_prefix, BackoffConfig}; +use crate::config::kafka::common::{backoff_prefix, BackoffConfig, KafkaTopicConfig}; use crate::BROKER_ENDPOINT; /// Kafka wal configurations for datanode. @@ -36,6 +36,9 @@ pub struct DatanodeKafkaConfig { /// The backoff config. #[serde(flatten, with = "backoff_prefix")] pub backoff: BackoffConfig, + /// The kafka topic config. + #[serde(flatten)] + pub kafka_topic: KafkaTopicConfig, } impl Default for DatanodeKafkaConfig { @@ -46,6 +49,7 @@ impl Default for DatanodeKafkaConfig { max_batch_bytes: ReadableSize::mb(1), consumer_wait_timeout: Duration::from_millis(100), backoff: BackoffConfig::default(), + kafka_topic: KafkaTopicConfig::default(), } } } diff --git a/src/common/wal/src/config/kafka/metasrv.rs b/src/common/wal/src/config/kafka/metasrv.rs index 99efe762fbc0..519992e17579 100644 --- a/src/common/wal/src/config/kafka/metasrv.rs +++ b/src/common/wal/src/config/kafka/metasrv.rs @@ -12,12 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::time::Duration; - use serde::{Deserialize, Serialize}; -use crate::config::kafka::common::{backoff_prefix, BackoffConfig}; -use crate::{TopicSelectorType, BROKER_ENDPOINT, TOPIC_NAME_PREFIX}; +use crate::config::kafka::common::{backoff_prefix, BackoffConfig, KafkaTopicConfig}; +use crate::BROKER_ENDPOINT; /// Kafka wal configurations for metasrv. #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] @@ -25,37 +23,21 @@ use crate::{TopicSelectorType, BROKER_ENDPOINT, TOPIC_NAME_PREFIX}; pub struct MetasrvKafkaConfig { /// The broker endpoints of the Kafka cluster. pub broker_endpoints: Vec, - /// The number of topics to be created upon start. - pub num_topics: usize, - /// The type of the topic selector with which to select a topic for a region. - pub selector_type: TopicSelectorType, - /// Topic name prefix. - pub topic_name_prefix: String, - /// The number of partitions per topic. - pub num_partitions: i32, - /// The replication factor of each topic. - pub replication_factor: i16, - /// The timeout of topic creation. - #[serde(with = "humantime_serde")] - pub create_topic_timeout: Duration, /// The backoff config. #[serde(flatten, with = "backoff_prefix")] pub backoff: BackoffConfig, + /// The kafka config. + #[serde(flatten)] + pub kafka_topic: KafkaTopicConfig, } impl Default for MetasrvKafkaConfig { fn default() -> Self { let broker_endpoints = vec![BROKER_ENDPOINT.to_string()]; - let replication_factor = broker_endpoints.len() as i16; Self { broker_endpoints, - num_topics: 64, - selector_type: TopicSelectorType::RoundRobin, - topic_name_prefix: TOPIC_NAME_PREFIX.to_string(), - num_partitions: 1, - replication_factor, - create_topic_timeout: Duration::from_secs(30), backoff: BackoffConfig::default(), + kafka_topic: KafkaTopicConfig::default(), } } } diff --git a/tests-integration/src/tests/test_util.rs b/tests-integration/src/tests/test_util.rs index 7bb29ce3318e..491a93086953 100644 --- a/tests-integration/src/tests/test_util.rs +++ b/tests-integration/src/tests/test_util.rs @@ -21,6 +21,7 @@ use common_query::Output; use common_recordbatch::util; use common_telemetry::warn; use common_test_util::find_workspace_path; +use common_wal::config::kafka::common::KafkaTopicConfig; use common_wal::config::kafka::{DatanodeKafkaConfig, MetasrvKafkaConfig}; use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig}; use frontend::instance::Instance; @@ -231,8 +232,11 @@ pub(crate) async fn standalone_with_kafka_wal() -> Option Option Date: Fri, 12 Jul 2024 17:17:15 +0800 Subject: [PATCH 5/7] feat: support `text/plain` format for log ingestion (#4300) * feat: support text/plain format of log input * refactor: pipeline query and delete using dataframe api * chore: minor refactor * refactor: skip jsonify when processing plan/text * refactor: support array(string) as pipeline engine input --- Cargo.lock | 4 +- src/pipeline/src/etl/processor.rs | 13 ++++ src/pipeline/src/manager/error.rs | 8 +++ src/pipeline/src/manager/table.rs | 74 +++++++++++----------- src/pipeline/src/manager/util.rs | 25 ++++---- src/pipeline/tests/pipeline.rs | 55 +++++++++++++++- src/servers/src/http/event.rs | 76 +++++++++++++++++----- tests-integration/tests/http.rs | 101 ++++++++++++++++++++++++++++++ 8 files changed, 291 insertions(+), 65 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fdf1952380d1..a48b6afee2ad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6963,9 +6963,9 @@ checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575" [[package]] name = "opendal" -version = "0.47.3" +version = "0.47.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cac4826fe3d5482a49b92955b0f6b06ce45b46ec84484176588209bfbf996870" +checksum = "ff159a2da374ef2d64848a6547943cf1af7d2ceada5ae77be175e1389aa07ae3" dependencies = [ "anyhow", "async-trait", diff --git a/src/pipeline/src/etl/processor.rs b/src/pipeline/src/etl/processor.rs index 390538098ad4..5d1396067dca 100644 --- a/src/pipeline/src/etl/processor.rs +++ b/src/pipeline/src/etl/processor.rs @@ -93,6 +93,19 @@ pub trait Processor: std::fmt::Debug + Send + Sync + 'static { Value::Map(map) => { values.push(self.exec_map(map)?); } + Value::String(_) => { + let fields = self.fields(); + if fields.len() != 1 { + return Err(format!( + "{} processor: expected fields length 1 when processing line input, but got {}", + self.kind(), + fields.len() + )); + } + let field = fields.first().unwrap(); + + values.push(self.exec_field(&val, field).map(Value::Map)?); + } _ if self.ignore_processor_array_failure() => { warn!("expected a map, but got {val}") } diff --git a/src/pipeline/src/manager/error.rs b/src/pipeline/src/manager/error.rs index 07332590f1f0..4467b42b51c3 100644 --- a/src/pipeline/src/manager/error.rs +++ b/src/pipeline/src/manager/error.rs @@ -81,6 +81,13 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to create dataframe"))] + DataFrame { + source: query::error::Error, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("General catalog error"))] Catalog { source: catalog::error::Error, @@ -126,6 +133,7 @@ impl ErrorExt for Error { | InvalidPipelineVersion { .. } => StatusCode::InvalidArguments, BuildDfLogicalPlan { .. } => StatusCode::Internal, ExecuteInternalStatement { source, .. } => source.status_code(), + DataFrame { source, .. } => source.status_code(), Catalog { source, .. } => source.status_code(), CreateTable { source, .. } => source.status_code(), } diff --git a/src/pipeline/src/manager/table.rs b/src/pipeline/src/manager/table.rs index 58df2bcabb9b..3c69f59f2a05 100644 --- a/src/pipeline/src/manager/table.rs +++ b/src/pipeline/src/manager/table.rs @@ -24,33 +24,32 @@ use common_query::OutputData; use common_recordbatch::util as record_util; use common_telemetry::{debug, info}; use common_time::timestamp::{TimeUnit, Timestamp}; -use datafusion::datasource::DefaultTableSource; use datafusion::logical_expr::col; use datafusion_common::{TableReference, ToDFSchema}; -use datafusion_expr::{DmlStatement, LogicalPlan as DfLogicalPlan, LogicalPlanBuilder}; +use datafusion_expr::{DmlStatement, LogicalPlan as DfLogicalPlan}; use datatypes::prelude::ScalarVector; use datatypes::timestamp::TimestampNanosecond; use datatypes::vectors::{StringVector, TimestampNanosecondVector, Vector}; use moka::sync::Cache; use operator::insert::InserterRef; use operator::statement::StatementExecutorRef; +use query::dataframe::DataFrame; use query::plan::LogicalPlan; use query::QueryEngineRef; use session::context::{QueryContextBuilder, QueryContextRef}; use snafu::{ensure, OptionExt, ResultExt}; use table::metadata::TableInfo; -use table::table::adapter::DfTableProviderAdapter; use table::TableRef; use crate::error::{ BuildDfLogicalPlanSnafu, CastTypeSnafu, CollectRecordsSnafu, CompilePipelineSnafu, - ExecuteInternalStatementSnafu, InsertPipelineSnafu, InvalidPipelineVersionSnafu, - PipelineNotFoundSnafu, Result, + DataFrameSnafu, ExecuteInternalStatementSnafu, InsertPipelineSnafu, + InvalidPipelineVersionSnafu, PipelineNotFoundSnafu, Result, }; use crate::etl::transform::GreptimeTransformer; use crate::etl::{parse, Content, Pipeline}; use crate::manager::{PipelineInfo, PipelineVersion}; -use crate::util::{build_plan_filter, generate_pipeline_cache_key}; +use crate::util::{generate_pipeline_cache_key, prepare_dataframe_conditions}; pub(crate) const PIPELINE_TABLE_NAME: &str = "pipelines"; pub(crate) const PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME: &str = "name"; @@ -337,15 +336,24 @@ impl PipelineTable { return Ok(None); } - // 2. do delete + // 2. prepare dataframe + let dataframe = self + .query_engine + .read_table(self.table.clone()) + .context(DataFrameSnafu)?; + let DataFrame::DataFusion(dataframe) = dataframe; + + let dataframe = dataframe + .filter(prepare_dataframe_conditions(schema, name, version)) + .context(BuildDfLogicalPlanSnafu)?; + + // 3. prepare dml stmt let table_info = self.table.table_info(); let table_name = TableReference::full( table_info.catalog_name.clone(), table_info.schema_name.clone(), table_info.name.clone(), ); - let table_provider = Arc::new(DfTableProviderAdapter::new(self.table.clone())); - let table_source = Arc::new(DefaultTableSource::new(table_provider)); let df_schema = Arc::new( table_info @@ -357,24 +365,17 @@ impl PipelineTable { .context(BuildDfLogicalPlanSnafu)?, ); - // create scan plan - let logical_plan = LogicalPlanBuilder::scan(table_name.clone(), table_source, None) - .context(BuildDfLogicalPlanSnafu)? - .filter(build_plan_filter(schema, name, version)) - .context(BuildDfLogicalPlanSnafu)? - .build() - .context(BuildDfLogicalPlanSnafu)?; - // create dml stmt let stmt = DmlStatement::new( table_name, df_schema, datafusion_expr::WriteOp::Delete, - Arc::new(logical_plan), + Arc::new(dataframe.into_parts().1), ); let plan = LogicalPlan::DfPlan(DfLogicalPlan::Dml(stmt)); + // 4. execute dml stmt let output = self .query_engine .execute(plan, Self::query_ctx(&table_info)) @@ -404,24 +405,19 @@ impl PipelineTable { name: &str, version: PipelineVersion, ) -> Result> { - let table_info = self.table.table_info(); - - let table_name = TableReference::full( - table_info.catalog_name.clone(), - table_info.schema_name.clone(), - table_info.name.clone(), - ); - - let table_provider = Arc::new(DfTableProviderAdapter::new(self.table.clone())); - let table_source = Arc::new(DefaultTableSource::new(table_provider)); + // 1. prepare dataframe + let dataframe = self + .query_engine + .read_table(self.table.clone()) + .context(DataFrameSnafu)?; + let DataFrame::DataFusion(dataframe) = dataframe; - let plan = LogicalPlanBuilder::scan(table_name, table_source, None) - .context(BuildDfLogicalPlanSnafu)? - .filter(build_plan_filter(schema, name, version)) + let dataframe = dataframe + .filter(prepare_dataframe_conditions(schema, name, version)) .context(BuildDfLogicalPlanSnafu)? - .project(vec![ - col(PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME), - col(PIPELINE_TABLE_CREATED_AT_COLUMN_NAME), + .select_columns(&[ + PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME, + PIPELINE_TABLE_CREATED_AT_COLUMN_NAME, ]) .context(BuildDfLogicalPlanSnafu)? .sort(vec![ @@ -429,15 +425,18 @@ impl PipelineTable { ]) .context(BuildDfLogicalPlanSnafu)? .limit(0, Some(1)) - .context(BuildDfLogicalPlanSnafu)? - .build() .context(BuildDfLogicalPlanSnafu)?; + let plan = LogicalPlan::DfPlan(dataframe.into_parts().1); + + let table_info = self.table.table_info(); + debug!("find_pipeline_by_name: plan: {:?}", plan); + // 2. execute plan let output = self .query_engine - .execute(LogicalPlan::DfPlan(plan), Self::query_ctx(&table_info)) + .execute(plan, Self::query_ctx(&table_info)) .await .context(ExecuteInternalStatementSnafu)?; let stream = match output.data { @@ -446,6 +445,7 @@ impl PipelineTable { _ => unreachable!(), }; + // 3. construct result let records = record_util::collect(stream) .await .context(CollectRecordsSnafu)?; diff --git a/src/pipeline/src/manager/util.rs b/src/pipeline/src/manager/util.rs index 6133c64215d0..a7d968edcf03 100644 --- a/src/pipeline/src/manager/util.rs +++ b/src/pipeline/src/manager/util.rs @@ -13,7 +13,7 @@ // limitations under the License. use common_time::Timestamp; -use datafusion_expr::{and, col, lit, Expr}; +use datafusion_expr::{col, lit, Expr}; use datatypes::timestamp::TimestampNanosecond; use crate::error::{InvalidPipelineVersionSnafu, Result}; @@ -34,19 +34,22 @@ pub fn to_pipeline_version(version_str: Option) -> Result Expr { - let schema_and_name_filter = and( - col(PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME).eq(lit(schema)), +pub(crate) fn prepare_dataframe_conditions( + schema: &str, + name: &str, + version: PipelineVersion, +) -> Expr { + let mut conditions = vec![ col(PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME).eq(lit(name)), - ); + col(PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME).eq(lit(schema)), + ]; + if let Some(v) = version { - and( - schema_and_name_filter, - col(PIPELINE_TABLE_CREATED_AT_COLUMN_NAME).eq(lit(v.0.to_iso8601_string())), - ) - } else { - schema_and_name_filter + conditions + .push(col(PIPELINE_TABLE_CREATED_AT_COLUMN_NAME).eq(lit(v.0.to_iso8601_string()))); } + + conditions.into_iter().reduce(Expr::and).unwrap() } pub(crate) fn generate_pipeline_cache_key( diff --git a/src/pipeline/tests/pipeline.rs b/src/pipeline/tests/pipeline.rs index 08f2ad38116f..af3b5a8c2075 100644 --- a/src/pipeline/tests/pipeline.rs +++ b/src/pipeline/tests/pipeline.rs @@ -14,7 +14,8 @@ use common_telemetry::tracing::info; use greptime_proto::v1::value::ValueData::{ - BoolValue, F64Value, StringValue, TimestampSecondValue, U32Value, U64Value, U8Value, + BoolValue, F64Value, StringValue, TimestampNanosecondValue, TimestampSecondValue, U32Value, + U64Value, U8Value, }; use greptime_proto::v1::Value as GreptimeValue; use pipeline::{parse, Content, GreptimeTransformer, Pipeline, Value}; @@ -455,3 +456,55 @@ transform: info!("\n"); } } + +#[test] +fn test_simple_data() { + let input_value_str = r#" +{ + "line": "2024-05-25 20:16:37.217 hello world" +} +"#; + let input_value: Value = serde_json::from_str::(input_value_str) + .unwrap() + .try_into() + .unwrap(); + + let pipeline_yaml = r#" +processors: + - dissect: + fields: + - line + patterns: + - "%{+ts} %{+ts} %{content}" + - date: + fields: + - ts + formats: + - "%Y-%m-%d %H:%M:%S%.3f" + +transform: + - fields: + - content + type: string + - field: ts + type: time + index: timestamp +"#; + + let yaml_content = Content::Yaml(pipeline_yaml.into()); + let pipeline: Pipeline = parse(&yaml_content).unwrap(); + let output = pipeline.exec(input_value).unwrap(); + let r = output + .rows + .into_iter() + .flat_map(|v| v.values) + .map(|v| v.value_data.unwrap()) + .collect::>(); + + let expected = vec![ + StringValue("hello world".into()), + TimestampNanosecondValue(1716668197217000000), + ]; + + assert_eq!(expected, r); +} diff --git a/src/servers/src/http/event.rs b/src/servers/src/http/event.rs index ea436009b004..53d3b8d1f3ea 100644 --- a/src/servers/src/http/event.rs +++ b/src/servers/src/http/event.rs @@ -25,7 +25,6 @@ use axum::http::{Request, StatusCode}; use axum::response::{IntoResponse, Response}; use axum::{async_trait, BoxError, Extension, TypedHeader}; use common_telemetry::{error, warn}; -use mime_guess::mime; use pipeline::error::{CastTypeSnafu, PipelineTransformSnafu}; use pipeline::util::to_pipeline_version; use pipeline::{PipelineVersion, Value as PipelineValue}; @@ -250,15 +249,7 @@ pub async fn log_ingester( let ignore_errors = query_params.ignore_errors.unwrap_or(false); - let m: mime::Mime = content_type.clone().into(); - let value = match m.subtype() { - mime::JSON => transform_ndjson_array_factory( - Deserializer::from_str(&payload).into_iter(), - ignore_errors, - )?, - // add more content type support - _ => UnsupportedContentTypeSnafu { content_type }.fail()?, - }; + let value = extract_pipeline_value_by_content_type(content_type, payload, ignore_errors)?; ingest_logs_inner( handler, @@ -271,18 +262,43 @@ pub async fn log_ingester( .await } +fn extract_pipeline_value_by_content_type( + content_type: ContentType, + payload: String, + ignore_errors: bool, +) -> Result { + Ok(match content_type { + ct if ct == ContentType::json() => { + let json_value = transform_ndjson_array_factory( + Deserializer::from_str(&payload).into_iter(), + ignore_errors, + )?; + + PipelineValue::try_from(json_value) + .map_err(|reason| CastTypeSnafu { msg: reason }.build()) + .context(PipelineSnafu)? + } + ct if ct == ContentType::text() || ct == ContentType::text_utf8() => { + let arr = payload + .lines() + .filter(|line| !line.is_empty()) + .map(|line| PipelineValue::String(line.to_string())) + .collect::>(); + PipelineValue::Array(arr.into()) + } + _ => UnsupportedContentTypeSnafu { content_type }.fail()?, + }) +} + async fn ingest_logs_inner( state: LogHandlerRef, pipeline_name: String, version: PipelineVersion, table_name: String, - payload: Value, + pipeline_data: PipelineValue, query_ctx: QueryContextRef, ) -> Result { let start = std::time::Instant::now(); - let pipeline_data = PipelineValue::try_from(payload) - .map_err(|reason| CastTypeSnafu { msg: reason }.build()) - .context(PipelineSnafu)?; let pipeline = state .get_pipeline(&pipeline_name, version, query_ctx.clone()) @@ -321,3 +337,35 @@ pub struct LogState { pub log_handler: LogHandlerRef, pub log_validator: Option, } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_transform_ndjson() { + let s = "{\"a\": 1}\n{\"b\": 2}"; + let a = transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false) + .unwrap() + .to_string(); + assert_eq!(a, "[{\"a\":1},{\"b\":2}]"); + + let s = "{\"a\": 1}"; + let a = transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false) + .unwrap() + .to_string(); + assert_eq!(a, "[{\"a\":1}]"); + + let s = "[{\"a\": 1}]"; + let a = transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false) + .unwrap() + .to_string(); + assert_eq!(a, "[{\"a\":1}]"); + + let s = "[{\"a\": 1}, {\"b\": 2}]"; + let a = transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false) + .unwrap() + .to_string(); + assert_eq!(a, "[{\"a\":1},{\"b\":2}]"); + } +} diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 9a7d98279034..06ee1a0221f7 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -78,6 +78,7 @@ macro_rules! http_tests { test_vm_proto_remote_write, test_pipeline_api, + test_plain_text_ingestion, ); )* }; @@ -1127,3 +1128,103 @@ transform: guard.remove_all().await; } + +pub async fn test_plain_text_ingestion(store_type: StorageType) { + common_telemetry::init_default_ut_logging(); + let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "test_pipeline_api").await; + + // handshake + let client = TestClient::new(app); + + let body = r#" +processors: + - dissect: + fields: + - line + patterns: + - "%{+ts} %{+ts} %{content}" + - date: + fields: + - ts + formats: + - "%Y-%m-%d %H:%M:%S%.3f" + +transform: + - fields: + - content + type: string + - field: ts + type: time + index: timestamp +"#; + + // 1. create pipeline + let res = client + .post("/v1/events/pipelines/test") + .header("Content-Type", "application/x-yaml") + .body(body) + .send() + .await; + + assert_eq!(res.status(), StatusCode::OK); + + let content = res.text().await; + + let content = serde_json::from_str(&content); + assert!(content.is_ok()); + // {"execution_time_ms":13,"pipelines":[{"name":"test","version":"2024-07-04 08:31:00.987136"}]} + let content: Value = content.unwrap(); + + let version_str = content + .get("pipelines") + .unwrap() + .as_array() + .unwrap() + .first() + .unwrap() + .get("version") + .unwrap() + .as_str() + .unwrap() + .to_string(); + assert!(!version_str.is_empty()); + + // 2. write data + let data_body = r#" +2024-05-25 20:16:37.217 hello +2024-05-25 20:16:37.218 hello world +"#; + let res = client + .post("/v1/events/logs?db=public&table=logs1&pipeline_name=test") + .header("Content-Type", "text/plain") + .body(data_body) + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + + // 3. select data + let res = client.get("/v1/sql?sql=select * from logs1").send().await; + assert_eq!(res.status(), StatusCode::OK); + let resp = res.text().await; + + let resp: Value = serde_json::from_str(&resp).unwrap(); + let v = resp + .get("output") + .unwrap() + .as_array() + .unwrap() + .first() + .unwrap() + .get("records") + .unwrap() + .get("rows") + .unwrap() + .to_string(); + + assert_eq!( + v, + r#"[["hello",1716668197217000000],["hello world",1716668197218000000]]"#, + ); + + guard.remove_all().await; +} From 16075ada67c10eb034eeb523fdda9a58a71f46b6 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Fri, 12 Jul 2024 22:36:23 +0800 Subject: [PATCH 6/7] feat: impl optimizer rule to handle last_value case (#4357) * feat: impl optimizer rule to handle last_value case Signed-off-by: Ruihang Xia * rename file Signed-off-by: Ruihang Xia * update sqlness result Signed-off-by: Ruihang Xia * Update src/query/src/optimizer/scan_hint.rs Co-authored-by: Jeremyhi * split methods Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia Co-authored-by: Jeremyhi --- src/query/src/dummy_catalog.rs | 11 +- src/query/src/optimizer.rs | 2 +- src/query/src/optimizer/order_hint.rs | 168 --------- src/query/src/optimizer/scan_hint.rs | 354 ++++++++++++++++++ src/query/src/query_engine/state.rs | 4 +- .../common/select/last_value.result | 34 ++ .../standalone/common/select/last_value.sql | 28 ++ .../common/tql-explain-analyze/explain.result | 2 +- 8 files changed, 430 insertions(+), 173 deletions(-) delete mode 100644 src/query/src/optimizer/order_hint.rs create mode 100644 src/query/src/optimizer/scan_hint.rs create mode 100644 tests/cases/standalone/common/select/last_value.result create mode 100644 tests/cases/standalone/common/select/last_value.sql diff --git a/src/query/src/dummy_catalog.rs b/src/query/src/dummy_catalog.rs index bc7c94e12abb..17a5995ea1bc 100644 --- a/src/query/src/dummy_catalog.rs +++ b/src/query/src/dummy_catalog.rs @@ -30,7 +30,7 @@ use datatypes::arrow::datatypes::SchemaRef; use snafu::ResultExt; use store_api::metadata::RegionMetadataRef; use store_api::region_engine::RegionEngineRef; -use store_api::storage::{RegionId, ScanRequest}; +use store_api::storage::{RegionId, ScanRequest, TimeSeriesRowSelector}; use table::table::scan::RegionScanExec; use crate::error::{GetRegionMetadataSnafu, Result}; @@ -192,11 +192,20 @@ impl DummyTableProvider { } } + pub fn region_metadata(&self) -> RegionMetadataRef { + self.metadata.clone() + } + /// Sets the ordering hint of the query to the provider. pub fn with_ordering_hint(&self, order_opts: &[OrderOption]) { self.scan_request.lock().unwrap().output_ordering = Some(order_opts.to_vec()); } + /// Sets the time series selector hint of the query to the provider. + pub fn with_time_series_selector_hint(&self, selector: TimeSeriesRowSelector) { + self.scan_request.lock().unwrap().series_row_selector = Some(selector); + } + /// Gets the scan request of the provider. #[cfg(test)] pub fn scan_request(&self) -> ScanRequest { diff --git a/src/query/src/optimizer.rs b/src/query/src/optimizer.rs index 15b63d5784e7..2f97a9bd32c6 100644 --- a/src/query/src/optimizer.rs +++ b/src/query/src/optimizer.rs @@ -13,9 +13,9 @@ // limitations under the License. pub mod count_wildcard; -pub mod order_hint; pub mod parallelize_scan; pub mod remove_duplicate; +pub mod scan_hint; pub mod string_normalization; #[cfg(test)] pub(crate) mod test_util; diff --git a/src/query/src/optimizer/order_hint.rs b/src/query/src/optimizer/order_hint.rs deleted file mode 100644 index 55bf314b48d7..000000000000 --- a/src/query/src/optimizer/order_hint.rs +++ /dev/null @@ -1,168 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use arrow_schema::SortOptions; -use common_recordbatch::OrderOption; -use datafusion::datasource::DefaultTableSource; -use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeVisitor}; -use datafusion_common::Result as DataFusionResult; -use datafusion_expr::expr::Sort; -use datafusion_expr::{Expr, LogicalPlan}; -use datafusion_optimizer::{OptimizerConfig, OptimizerRule}; - -use crate::dummy_catalog::DummyTableProvider; - -/// This rule will pass the nearest order requirement to the leaf table -/// scan node as ordering hint. -pub struct OrderHintRule; - -impl OptimizerRule for OrderHintRule { - fn try_optimize( - &self, - plan: &LogicalPlan, - _config: &dyn OptimizerConfig, - ) -> DataFusionResult> { - Self::optimize(plan).map(Some) - } - - fn name(&self) -> &str { - "OrderHintRule" - } -} - -impl OrderHintRule { - fn optimize(plan: &LogicalPlan) -> DataFusionResult { - let mut visitor = OrderHintVisitor::default(); - let _ = plan.visit(&mut visitor)?; - - if let Some(order_expr) = visitor.order_expr.take() { - plan.clone() - .transform_down(&|plan| Self::set_ordering_hint(plan, &order_expr)) - .map(|x| x.data) - } else { - Ok(plan.clone()) - } - } - - fn set_ordering_hint( - plan: LogicalPlan, - order_expr: &[Sort], - ) -> DataFusionResult> { - match &plan { - LogicalPlan::TableScan(table_scan) => { - let mut transformed = false; - if let Some(source) = table_scan - .source - .as_any() - .downcast_ref::() - { - // The provider in the region server is [DummyTableProvider]. - if let Some(adapter) = source - .table_provider - .as_any() - .downcast_ref::() - { - let mut opts = Vec::with_capacity(order_expr.len()); - for sort in order_expr { - let name = match sort.expr.try_as_col() { - Some(col) => col.name.clone(), - None => return Ok(Transformed::no(plan)), - }; - opts.push(OrderOption { - name, - options: SortOptions { - descending: !sort.asc, - nulls_first: sort.nulls_first, - }, - }) - } - adapter.with_ordering_hint(&opts); - transformed = true; - } - } - if transformed { - Ok(Transformed::yes(plan)) - } else { - Ok(Transformed::no(plan)) - } - } - _ => Ok(Transformed::no(plan)), - } - } -} - -/// Find the most closest order requirement to the leaf node. -#[derive(Default)] -struct OrderHintVisitor { - order_expr: Option>, -} - -impl TreeNodeVisitor<'_> for OrderHintVisitor { - type Node = LogicalPlan; - - fn f_down(&mut self, node: &Self::Node) -> DataFusionResult { - if let LogicalPlan::Sort(sort) = node { - let mut exprs = vec![]; - for expr in &sort.expr { - if let Expr::Sort(sort_expr) = expr { - exprs.push(sort_expr.clone()); - } - } - self.order_expr = Some(exprs); - } - Ok(TreeNodeRecursion::Continue) - } -} - -#[cfg(test)] -mod test { - use std::sync::Arc; - - use datafusion_expr::{col, LogicalPlanBuilder}; - use datafusion_optimizer::OptimizerContext; - use store_api::storage::RegionId; - - use super::*; - use crate::optimizer::test_util::mock_table_provider; - - #[test] - fn set_order_hint() { - let provider = Arc::new(mock_table_provider(RegionId::new(1, 1))); - let table_source = Arc::new(DefaultTableSource::new(provider.clone())); - let plan = LogicalPlanBuilder::scan("t", table_source, None) - .unwrap() - .sort(vec![col("ts").sort(true, false)]) - .unwrap() - .sort(vec![col("ts").sort(false, true)]) - .unwrap() - .build() - .unwrap(); - - let context = OptimizerContext::default(); - OrderHintRule.try_optimize(&plan, &context).unwrap(); - - // should read the first (with `.sort(true, false)`) sort option - let scan_req = provider.scan_request(); - assert_eq!( - OrderOption { - name: "ts".to_string(), - options: SortOptions { - descending: false, - nulls_first: false - } - }, - scan_req.output_ordering.as_ref().unwrap()[0] - ); - } -} diff --git a/src/query/src/optimizer/scan_hint.rs b/src/query/src/optimizer/scan_hint.rs new file mode 100644 index 000000000000..506b5c3c0ecc --- /dev/null +++ b/src/query/src/optimizer/scan_hint.rs @@ -0,0 +1,354 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashSet; + +use api::v1::SemanticType; +use arrow_schema::SortOptions; +use common_recordbatch::OrderOption; +use datafusion::datasource::DefaultTableSource; +use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeVisitor}; +use datafusion_common::{Column, Result as DataFusionResult}; +use datafusion_expr::expr::Sort; +use datafusion_expr::{utils, Expr, LogicalPlan}; +use datafusion_optimizer::{OptimizerConfig, OptimizerRule}; +use store_api::storage::TimeSeriesRowSelector; + +use crate::dummy_catalog::DummyTableProvider; + +/// This rule will traverse the plan to collect necessary hints for leaf +/// table scan node and set them in [`ScanRequest`]. Hints include: +/// - the nearest order requirement to the leaf table scan node as ordering hint. +/// - the group by columns when all aggregate functions are `last_value` as +/// time series row selector hint. +/// +/// [`ScanRequest`]: store_api::storage::ScanRequest +pub struct ScanHintRule; + +impl OptimizerRule for ScanHintRule { + fn try_optimize( + &self, + plan: &LogicalPlan, + _config: &dyn OptimizerConfig, + ) -> DataFusionResult> { + Self::optimize(plan).map(Some) + } + + fn name(&self) -> &str { + "ScanHintRule" + } +} + +impl ScanHintRule { + fn optimize(plan: &LogicalPlan) -> DataFusionResult { + let mut visitor = ScanHintVisitor::default(); + let _ = plan.visit(&mut visitor)?; + + if visitor.need_rewrite() { + plan.clone() + .transform_down(&|plan| Self::set_hints(plan, &visitor)) + .map(|x| x.data) + } else { + Ok(plan.clone()) + } + } + + fn set_hints( + plan: LogicalPlan, + visitor: &ScanHintVisitor, + ) -> DataFusionResult> { + match &plan { + LogicalPlan::TableScan(table_scan) => { + let mut transformed = false; + if let Some(source) = table_scan + .source + .as_any() + .downcast_ref::() + { + // The provider in the region server is [DummyTableProvider]. + if let Some(adapter) = source + .table_provider + .as_any() + .downcast_ref::() + { + // set order_hint + if let Some(order_expr) = &visitor.order_expr { + Self::set_order_hint(adapter, order_expr); + } + + // set time series selector hint + if let Some((group_by_cols, order_by_col)) = &visitor.ts_row_selector { + Self::set_time_series_row_selector_hint( + adapter, + group_by_cols, + order_by_col, + ); + } + + transformed = true; + } + } + if transformed { + Ok(Transformed::yes(plan)) + } else { + Ok(Transformed::no(plan)) + } + } + _ => Ok(Transformed::no(plan)), + } + } + + fn set_order_hint(adapter: &DummyTableProvider, order_expr: &Vec) { + let mut opts = Vec::with_capacity(order_expr.len()); + for sort in order_expr { + let name = match sort.expr.try_as_col() { + Some(col) => col.name.clone(), + None => return, + }; + opts.push(OrderOption { + name, + options: SortOptions { + descending: !sort.asc, + nulls_first: sort.nulls_first, + }, + }); + } + adapter.with_ordering_hint(&opts); + } + + fn set_time_series_row_selector_hint( + adapter: &DummyTableProvider, + group_by_cols: &HashSet, + order_by_col: &Column, + ) { + let region_metadata = adapter.region_metadata(); + let mut should_set_selector_hint = true; + // check if order_by column is time index + if let Some(column_metadata) = region_metadata.column_by_name(&order_by_col.name) { + if column_metadata.semantic_type != SemanticType::Timestamp { + should_set_selector_hint = false; + } + } else { + should_set_selector_hint = false; + } + + // check if all group_by columns are primary key + for col in group_by_cols { + let Some(column_metadata) = region_metadata.column_by_name(&col.name) else { + should_set_selector_hint = false; + break; + }; + if column_metadata.semantic_type != SemanticType::Tag { + should_set_selector_hint = false; + break; + } + } + + if should_set_selector_hint { + adapter.with_time_series_selector_hint(TimeSeriesRowSelector::LastRow); + } + } +} + +/// Traverse and fetch hints. +#[derive(Default)] +struct ScanHintVisitor { + /// The closest order requirement to the leaf node. + order_expr: Option>, + /// Row selection on time series distribution. + /// This field stores saved `group_by` columns when all aggregate functions are `last_value` + /// and the `order_by` column which should be time index. + ts_row_selector: Option<(HashSet, Column)>, +} + +impl TreeNodeVisitor<'_> for ScanHintVisitor { + type Node = LogicalPlan; + + fn f_down(&mut self, node: &Self::Node) -> DataFusionResult { + // Get order requirement from sort plan + if let LogicalPlan::Sort(sort) = node { + let mut exprs = vec![]; + for expr in &sort.expr { + if let Expr::Sort(sort_expr) = expr { + exprs.push(sort_expr.clone()); + } + } + self.order_expr = Some(exprs); + } + + // Get time series row selector from aggr plan + if let LogicalPlan::Aggregate(aggregate) = node { + let mut is_all_last_value = !aggregate.aggr_expr.is_empty(); + let mut order_by_expr = None; + for expr in &aggregate.aggr_expr { + // check function name + let Expr::AggregateFunction(func) = expr else { + is_all_last_value = false; + break; + }; + if func.func_def.name() != "last_value" || func.filter.is_some() || func.distinct { + is_all_last_value = false; + break; + } + // check order by requirement + if let Some(order_by) = &func.order_by + && let Some(first_order_by) = order_by.first() + && order_by.len() == 1 + { + if let Some(existing_order_by) = &order_by_expr { + if existing_order_by != first_order_by { + is_all_last_value = false; + break; + } + } else if let Expr::Sort(sort_expr) = first_order_by { + // only allow `order by xxx [DESC]`, xxx is a bare column reference + if sort_expr.asc || !matches!(&*sort_expr.expr, Expr::Column(_)) { + is_all_last_value = false; + break; + } + order_by_expr = Some(first_order_by.clone()); + } + } + } + is_all_last_value &= order_by_expr.is_some(); + if is_all_last_value { + // make sure all the exprs are DIRECT `col` and collect them + let mut group_by_cols = HashSet::with_capacity(aggregate.group_expr.len()); + for expr in &aggregate.group_expr { + if let Expr::Column(col) = expr { + group_by_cols.insert(col.clone()); + } else { + is_all_last_value = false; + break; + } + } + // Safety: checked in the above loop + let Expr::Sort(Sort { + expr: order_by_col, .. + }) = order_by_expr.unwrap() + else { + unreachable!() + }; + let Expr::Column(order_by_col) = *order_by_col else { + unreachable!() + }; + if is_all_last_value { + self.ts_row_selector = Some((group_by_cols, order_by_col)); + } + } + } + + if self.ts_row_selector.is_some() + && (matches!(node, LogicalPlan::Subquery(_)) || node.inputs().len() > 1) + { + // clean previous time series selector hint when encounter subqueries or join + self.ts_row_selector = None; + } + + if let LogicalPlan::Filter(filter) = node + && let Some(group_by_exprs) = &self.ts_row_selector + { + let mut filter_referenced_cols = HashSet::default(); + utils::expr_to_columns(&filter.predicate, &mut filter_referenced_cols)?; + // ensure only group_by columns are used in filter + if !filter_referenced_cols.is_subset(&group_by_exprs.0) { + self.ts_row_selector = None; + } + } + + Ok(TreeNodeRecursion::Continue) + } +} + +impl ScanHintVisitor { + fn need_rewrite(&self) -> bool { + self.order_expr.is_some() || self.ts_row_selector.is_some() + } +} + +#[cfg(test)] +mod test { + use std::sync::Arc; + + use datafusion_expr::expr::{AggregateFunction, AggregateFunctionDefinition}; + use datafusion_expr::{col, LogicalPlanBuilder}; + use datafusion_optimizer::OptimizerContext; + use datafusion_physical_expr::expressions::LastValue; + use store_api::storage::RegionId; + + use super::*; + use crate::optimizer::test_util::mock_table_provider; + + #[test] + fn set_order_hint() { + let provider = Arc::new(mock_table_provider(RegionId::new(1, 1))); + let table_source = Arc::new(DefaultTableSource::new(provider.clone())); + let plan = LogicalPlanBuilder::scan("t", table_source, None) + .unwrap() + .sort(vec![col("ts").sort(true, false)]) + .unwrap() + .sort(vec![col("ts").sort(false, true)]) + .unwrap() + .build() + .unwrap(); + + let context = OptimizerContext::default(); + ScanHintRule.try_optimize(&plan, &context).unwrap(); + + // should read the first (with `.sort(true, false)`) sort option + let scan_req = provider.scan_request(); + assert_eq!( + OrderOption { + name: "ts".to_string(), + options: SortOptions { + descending: false, + nulls_first: false + } + }, + scan_req.output_ordering.as_ref().unwrap()[0] + ); + } + + #[test] + fn set_time_series_row_selector_hint() { + let provider = Arc::new(mock_table_provider(RegionId::new(1, 1))); + let table_source = Arc::new(DefaultTableSource::new(provider.clone())); + let plan = LogicalPlanBuilder::scan("t", table_source, None) + .unwrap() + .aggregate( + vec![col("k0")], + vec![Expr::AggregateFunction(AggregateFunction { + func_def: AggregateFunctionDefinition::UDF(Arc::new(LastValue::new().into())), + args: vec![col("v0")], + distinct: false, + filter: None, + order_by: Some(vec![Expr::Sort(Sort { + expr: Box::new(col("ts")), + asc: false, + nulls_first: true, + })]), + null_treatment: None, + })], + ) + .unwrap() + .build() + .unwrap(); + + let context = OptimizerContext::default(); + ScanHintRule.try_optimize(&plan, &context).unwrap(); + + let scan_req = provider.scan_request(); + let _ = scan_req.series_row_selector.unwrap(); + } +} diff --git a/src/query/src/query_engine/state.rs b/src/query/src/query_engine/state.rs index 7d164af52071..2ba2ea85a9f1 100644 --- a/src/query/src/query_engine/state.rs +++ b/src/query/src/query_engine/state.rs @@ -42,9 +42,9 @@ use table::TableRef; use crate::dist_plan::{DistExtensionPlanner, DistPlannerAnalyzer}; use crate::optimizer::count_wildcard::CountWildcardToTimeIndexRule; -use crate::optimizer::order_hint::OrderHintRule; use crate::optimizer::parallelize_scan::ParallelizeScan; use crate::optimizer::remove_duplicate::RemoveDuplicate; +use crate::optimizer::scan_hint::ScanHintRule; use crate::optimizer::string_normalization::StringNormalizationRule; use crate::optimizer::type_conversion::TypeConversionRule; use crate::optimizer::ExtensionAnalyzerRule; @@ -109,7 +109,7 @@ impl QueryEngineState { } let mut optimizer = Optimizer::new(); - optimizer.rules.push(Arc::new(OrderHintRule)); + optimizer.rules.push(Arc::new(ScanHintRule)); // add physical optimizer let mut physical_optimizer = PhysicalOptimizer::new(); diff --git a/tests/cases/standalone/common/select/last_value.result b/tests/cases/standalone/common/select/last_value.result new file mode 100644 index 000000000000..c73205e98973 --- /dev/null +++ b/tests/cases/standalone/common/select/last_value.result @@ -0,0 +1,34 @@ +create table t ( + ts timestamp time index, + host string primary key, + not_pk string, + val double, +); + +Affected Rows: 0 + +insert into t values + (0, 'a', 'πŸŒ•', 1.0), + (1, 'b', 'πŸŒ–', 2.0), + (2, 'a', 'πŸŒ—', 3.0), + (3, 'c', '🌘', 4.0), + (4, 'a', 'πŸŒ‘', 5.0), + (5, 'b', 'πŸŒ’', 6.0), + (6, 'a', 'πŸŒ“', 7.0), + (7, 'c', 'πŸŒ”', 8.0), + (8, 'd', 'πŸŒ•', 9.0); + +Affected Rows: 9 + +-- Wait for #4354 +-- explain analyze +-- select +-- last_value(host order by ts), +-- last_value(not_pk order by ts), +-- last_value(val order by ts) +-- from t +-- group by host; +drop table t; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/select/last_value.sql b/tests/cases/standalone/common/select/last_value.sql new file mode 100644 index 000000000000..8297a985b16a --- /dev/null +++ b/tests/cases/standalone/common/select/last_value.sql @@ -0,0 +1,28 @@ +create table t ( + ts timestamp time index, + host string primary key, + not_pk string, + val double, +); + +insert into t values + (0, 'a', 'πŸŒ•', 1.0), + (1, 'b', 'πŸŒ–', 2.0), + (2, 'a', 'πŸŒ—', 3.0), + (3, 'c', '🌘', 4.0), + (4, 'a', 'πŸŒ‘', 5.0), + (5, 'b', 'πŸŒ’', 6.0), + (6, 'a', 'πŸŒ“', 7.0), + (7, 'c', 'πŸŒ”', 8.0), + (8, 'd', 'πŸŒ•', 9.0); + +-- Wait for #4354 +-- explain analyze +-- select +-- last_value(host order by ts), +-- last_value(not_pk order by ts), +-- last_value(val order by ts) +-- from t +-- group by host; + +drop table t; diff --git a/tests/cases/standalone/common/tql-explain-analyze/explain.result b/tests/cases/standalone/common/tql-explain-analyze/explain.result index 966a68f9ef54..bcaf39803f45 100644 --- a/tests/cases/standalone/common/tql-explain-analyze/explain.result +++ b/tests/cases/standalone/common/tql-explain-analyze/explain.result @@ -125,7 +125,7 @@ TQL EXPLAIN VERBOSE (0, 10, '5s') test; | logical_plan after common_sub_expression_eliminate_| SAME TEXT AS ABOVE_| | logical_plan after eliminate_group_by_constant_| SAME TEXT AS ABOVE_| | logical_plan after optimize_projections_| SAME TEXT AS ABOVE_| -| logical_plan after OrderHintRule_| SAME TEXT AS ABOVE_| +| logical_plan after ScanHintRule_| SAME TEXT AS ABOVE_| | logical_plan_| PromInstantManipulate: range=[0..0], lookback=[300000], interval=[300000], time index=[j]_| |_|_PromSeriesNormalize: offset=[0], time index=[j], filter NaN: [false]_| |_|_PromSeriesDivide: tags=["k"]_| From 5a1732279b2c9d66ac24110247cd360d8416639d Mon Sep 17 00:00:00 2001 From: Yingwen Date: Fri, 12 Jul 2024 22:40:06 +0800 Subject: [PATCH 7/7] feat: Implement reader that returns the last row of each series (#4354) * feat: last row reader * feat: scan use last row reader * test: test last row selector * chore: update comment --- src/mito2/src/engine.rs | 2 + src/mito2/src/engine/row_selector_test.rs | 104 +++++++++++++++ src/mito2/src/read.rs | 1 + src/mito2/src/read/last_row.rs | 153 ++++++++++++++++++++++ src/mito2/src/read/scan_region.rs | 4 +- src/mito2/src/read/seq_scan.rs | 22 ++-- 6 files changed, 276 insertions(+), 10 deletions(-) create mode 100644 src/mito2/src/engine/row_selector_test.rs create mode 100644 src/mito2/src/read/last_row.rs diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index bbb9cfe36df0..8cd3da32f740 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -49,6 +49,8 @@ mod projection_test; #[cfg(test)] mod prune_test; #[cfg(test)] +mod row_selector_test; +#[cfg(test)] mod set_readonly_test; #[cfg(test)] mod truncate_test; diff --git a/src/mito2/src/engine/row_selector_test.rs b/src/mito2/src/engine/row_selector_test.rs new file mode 100644 index 000000000000..001d0f2f6ab8 --- /dev/null +++ b/src/mito2/src/engine/row_selector_test.rs @@ -0,0 +1,104 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::Rows; +use common_recordbatch::RecordBatches; +use store_api::region_engine::RegionEngine; +use store_api::region_request::RegionRequest; +use store_api::storage::{RegionId, ScanRequest, TimeSeriesRowSelector}; + +use crate::config::MitoConfig; +use crate::test_util::batch_util::sort_batches_and_print; +use crate::test_util::{ + build_rows_for_key, flush_region, put_rows, rows_schema, CreateRequestBuilder, TestEnv, +}; + +async fn test_last_row(append_mode: bool) { + let mut env = TestEnv::new(); + let engine = env.create_engine(MitoConfig::default()).await; + let region_id = RegionId::new(1, 1); + + let request = CreateRequestBuilder::new() + .insert_option("append_mode", &append_mode.to_string()) + .build(); + let column_schemas = rows_schema(&request); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + // Flush 3 SSTs. + // a, field 1, 2 + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows_for_key("a", 1, 3, 1), + }; + put_rows(&engine, region_id, rows).await; + flush_region(&engine, region_id, None).await; + // a, field 0, 1 + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows_for_key("a", 0, 2, 0), + }; + put_rows(&engine, region_id, rows).await; + flush_region(&engine, region_id, None).await; + // b, field 0, 1 + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows_for_key("b", 0, 2, 0), + }; + put_rows(&engine, region_id, rows).await; + flush_region(&engine, region_id, None).await; + + // Memtable. + // a, field 2, 3 + let rows = Rows { + schema: column_schemas, + rows: build_rows_for_key("a", 2, 4, 2), + }; + put_rows(&engine, region_id, rows).await; + + let expected = "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| a | 3.0 | 1970-01-01T00:00:03 | +| b | 1.0 | 1970-01-01T00:00:01 | ++-------+---------+---------------------+"; + // Scans in parallel. + let scanner = engine + .scanner( + region_id, + ScanRequest { + series_row_selector: Some(TimeSeriesRowSelector::LastRow), + ..Default::default() + }, + ) + .unwrap(); + assert_eq!(3, scanner.num_files()); + assert_eq!(1, scanner.num_memtables()); + let stream = scanner.scan().await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + assert_eq!(expected, sort_batches_and_print(&batches, &["tag_0", "ts"])); +} + +#[tokio::test] +async fn test_last_row_append_mode_disabled() { + test_last_row(false).await; +} + +#[tokio::test] +async fn test_last_row_append_mode_enabled() { + test_last_row(true).await; +} diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index 8b9c54920786..5c3d17119aae 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -16,6 +16,7 @@ pub mod compat; pub mod dedup; +pub(crate) mod last_row; pub mod merge; pub mod projection; pub(crate) mod scan_region; diff --git a/src/mito2/src/read/last_row.rs b/src/mito2/src/read/last_row.rs new file mode 100644 index 000000000000..85f82760610e --- /dev/null +++ b/src/mito2/src/read/last_row.rs @@ -0,0 +1,153 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Utilities to read the last row of each time series. +use async_trait::async_trait; + +use crate::error::Result; +use crate::read::{Batch, BatchReader, BoxedBatchReader}; + +/// Reader to keep the last row for each time series. +/// It assumes that batches from the input reader are +/// - sorted +/// - all deleted rows has been filtered. +/// - not empty +/// +/// This reader is different from the [MergeMode](crate::region::options::MergeMode) as +/// it focus on time series (the same key). +pub(crate) struct LastRowReader { + /// Inner reader. + reader: BoxedBatchReader, + /// The last batch pending to return. + last_batch: Option, +} + +impl LastRowReader { + /// Creates a new `LastRowReader`. + pub(crate) fn new(reader: BoxedBatchReader) -> Self { + Self { + reader, + last_batch: None, + } + } + + /// Returns the last row of the next key. + pub(crate) async fn next_last_row(&mut self) -> Result> { + while let Some(batch) = self.reader.next_batch().await? { + if let Some(last) = &self.last_batch { + if last.primary_key() == batch.primary_key() { + // Same key, update last batch. + self.last_batch = Some(batch); + } else { + // Different key, return the last row in `last` and update `last_batch` by + // current batch. + debug_assert!(!last.is_empty()); + let last_row = last.slice(last.num_rows() - 1, 1); + self.last_batch = Some(batch); + return Ok(Some(last_row)); + } + } else { + self.last_batch = Some(batch); + } + } + + if let Some(last) = self.last_batch.take() { + // This is the last key. + let last_row = last.slice(last.num_rows() - 1, 1); + return Ok(Some(last_row)); + } + + Ok(None) + } +} + +#[async_trait] +impl BatchReader for LastRowReader { + async fn next_batch(&mut self) -> Result> { + self.next_last_row().await + } +} + +#[cfg(test)] +mod tests { + use api::v1::OpType; + + use super::*; + use crate::test_util::{check_reader_result, new_batch, VecBatchReader}; + + #[tokio::test] + async fn test_last_row_one_batch() { + let input = [new_batch( + b"k1", + &[1, 2], + &[11, 11], + &[OpType::Put, OpType::Put], + &[21, 22], + )]; + let reader = VecBatchReader::new(&input); + let mut reader = LastRowReader::new(Box::new(reader)); + check_reader_result( + &mut reader, + &[new_batch(b"k1", &[2], &[11], &[OpType::Put], &[22])], + ) + .await; + + // Only one row. + let input = [new_batch(b"k1", &[1], &[11], &[OpType::Put], &[21])]; + let reader = VecBatchReader::new(&input); + let mut reader = LastRowReader::new(Box::new(reader)); + check_reader_result( + &mut reader, + &[new_batch(b"k1", &[1], &[11], &[OpType::Put], &[21])], + ) + .await; + } + + #[tokio::test] + async fn test_last_row_multi_batch() { + let input = [ + new_batch( + b"k1", + &[1, 2], + &[11, 11], + &[OpType::Put, OpType::Put], + &[21, 22], + ), + new_batch( + b"k1", + &[3, 4], + &[11, 11], + &[OpType::Put, OpType::Put], + &[23, 24], + ), + new_batch( + b"k2", + &[1, 2], + &[11, 11], + &[OpType::Put, OpType::Put], + &[31, 32], + ), + ]; + let reader = VecBatchReader::new(&input); + let mut reader = LastRowReader::new(Box::new(reader)); + check_reader_result( + &mut reader, + &[ + new_batch(b"k1", &[4], &[11], &[OpType::Put], &[24]), + new_batch(b"k2", &[2], &[11], &[OpType::Put], &[32]), + ], + ) + .await; + } +} diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 3ba32250bf23..0fe4c7efa29e 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -209,8 +209,8 @@ impl ScanRegion { /// Returns a [Scanner] to scan the region. pub(crate) fn scanner(self) -> Result { - if self.version.options.append_mode { - // If table uses append mode, we use unordered scan in query. + if self.version.options.append_mode && self.request.series_row_selector.is_none() { + // If table is append only and there is no series row selector, we use unordered scan in query. // We still use seq scan in compaction. self.unordered_scan().map(Scanner::Unordered) } else { diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 7204bf87e7b0..ca5de750586e 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -29,13 +29,14 @@ use datatypes::schema::SchemaRef; use smallvec::smallvec; use snafu::ResultExt; use store_api::region_engine::{PartitionRange, RegionScanner, ScannerProperties}; -use store_api::storage::ColumnId; +use store_api::storage::{ColumnId, TimeSeriesRowSelector}; use table::predicate::Predicate; use tokio::sync::Semaphore; use crate::error::{PartitionOutOfRangeSnafu, Result}; use crate::memtable::MemtableRef; use crate::read::dedup::{DedupReader, LastNonNull, LastRow}; +use crate::read::last_row::LastRowReader; use crate::read::merge::MergeReaderBuilder; use crate::read::scan_region::{ FileRangeCollector, ScanInput, ScanPart, ScanPartList, StreamContext, @@ -210,8 +211,8 @@ impl SeqScan { let reader = builder.build().await?; let dedup = !stream_ctx.input.append_mode; - if dedup { - let reader = match stream_ctx.input.merge_mode { + let reader = if dedup { + match stream_ctx.input.merge_mode { MergeMode::LastRow => Box::new(DedupReader::new( reader, LastRow::new(stream_ctx.input.filter_deleted), @@ -220,12 +221,17 @@ impl SeqScan { reader, LastNonNull::new(stream_ctx.input.filter_deleted), )) as _, - }; - Ok(Some(reader)) + } } else { - let reader = Box::new(reader); - Ok(Some(reader)) - } + Box::new(reader) as _ + }; + + let reader = match &stream_ctx.input.series_row_selector { + Some(TimeSeriesRowSelector::LastRow) => Box::new(LastRowReader::new(reader)) as _, + None => reader, + }; + + Ok(Some(reader)) } /// Scans the given partition when the part list is set properly.