From ddd667a27f833f70601278a8d3c04697ec27a4d2 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Wed, 14 Aug 2024 19:17:14 +0800 Subject: [PATCH] Reapply "chore: bump `tonic` to v0.12 (#17889)" This reverts commit 756d84093eb9c657bfe46b796688a7949bb84d1d. --- Cargo.lock | 360 ++++++++++++------ Cargo.toml | 18 +- ci/docker-compose.yml | 1 - e2e_test/sink/kafka/protobuf.slt | 16 +- src/batch/Cargo.toml | 2 +- src/batch/src/executor/hash_agg.rs | 2 +- .../executor/join/distributed_lookup_join.rs | 7 +- src/batch/src/executor/join/hash_join.rs | 2 +- .../src/executor/join/local_lookup_join.rs | 4 +- src/batch/src/executor/mod.rs | 4 +- src/batch/src/executor/order_by.rs | 2 +- src/batch/src/executor/row_seq_scan.rs | 7 +- src/batch/src/spill/spill_op.rs | 2 +- src/batch/src/task/broadcast_channel.rs | 2 +- src/batch/src/task/task_execution.rs | 2 +- src/bench/Cargo.toml | 2 +- src/common/Cargo.toml | 2 +- src/common/common_service/Cargo.toml | 2 +- src/common/common_service/src/tracing.rs | 8 +- src/common/metrics/Cargo.toml | 14 +- src/common/metrics/src/monitor/connection.rs | 130 ++++++- .../src/vnode_mapping/vnode_placement.rs | 4 +- src/compute/Cargo.toml | 5 +- .../src/rpc/service/monitor_service.rs | 13 +- src/connector/Cargo.toml | 14 +- src/error/src/tonic.rs | 2 +- src/expr/impl/Cargo.toml | 4 +- src/frontend/Cargo.toml | 2 +- src/frontend/src/catalog/source_catalog.rs | 9 +- .../src/catalog/system_catalog/mod.rs | 2 +- src/frontend/src/observer/observer_manager.rs | 3 +- .../src/scheduler/distributed/stage.rs | 6 +- src/frontend/src/scheduler/snapshot.rs | 6 +- src/meta/Cargo.toml | 4 +- src/meta/node/src/server.rs | 26 +- src/meta/service/Cargo.toml | 2 +- src/meta/service/src/ddl_service.rs | 2 +- src/meta/src/barrier/mod.rs | 2 +- src/meta/src/controller/cluster.rs | 6 +- src/meta/src/hummock/manager/commit_epoch.rs | 2 +- src/meta/src/hummock/model/pinned_snapshot.rs | 2 +- src/meta/src/hummock/model/pinned_version.rs | 2 +- src/meta/src/manager/catalog/user.rs | 32 +- src/meta/src/rpc/intercept.rs | 8 +- src/object_store/Cargo.toml | 2 +- src/prost/build.rs | 2 +- src/prost/src/lib.rs | 1 + src/risedevtool/Cargo.toml | 2 +- src/rpc_client/Cargo.toml | 6 +- src/rpc_client/src/meta_client.rs | 2 +- src/rpc_client/src/tracing.rs | 50 +-- src/storage/backup/src/lib.rs | 2 +- .../compaction_group/hummock_version_ext.rs | 2 +- src/storage/hummock_sdk/src/time_travel.rs | 2 +- src/storage/hummock_sdk/src/version.rs | 16 +- src/stream/Cargo.toml | 2 +- src/tests/simulation/Cargo.toml | 2 +- src/tests/state_cleaning_test/Cargo.toml | 2 +- src/utils/runtime/Cargo.toml | 2 +- 59 files changed, 513 insertions(+), 329 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f1a6e60f2880d..7183c990bfce9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1751,7 +1751,7 @@ dependencies = [ "http 1.1.0", "http-body 1.0.0", "http-body-util", - "hyper 1.1.0", + "hyper 1.4.1", "hyper-util", "itoa", "matchit", @@ -2791,22 +2791,22 @@ dependencies = [ [[package]] name = "console-api" -version = "0.7.0" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a257c22cd7e487dd4a13d413beabc512c5052f0bc048db0da6a84c3d8a6142fd" +checksum = "86ed14aa9c9f927213c6e4f3ef75faaad3406134efe84ba2cb7983431d5f0931" dependencies = [ "futures-core", - "prost 0.12.1", - "prost-types 0.12.1", - "tonic 0.11.0", + "prost 0.13.1", + "prost-types 0.13.1", + "tonic 0.12.1", "tracing-core", ] [[package]] name = "console-subscriber" -version = "0.3.0" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31c4cc54bae66f7d9188996404abdf7fdfa23034ef8e43478c8810828abad758" +checksum = "e2e3a111a37f3333946ebf9da370ba5c5577b18eb342ec683eb488dd21980302" dependencies = [ "console-api", "crossbeam-channel", @@ -2814,14 +2814,15 @@ dependencies = [ "futures-task", "hdrhistogram", "humantime", - "prost 0.12.1", - "prost-types 0.12.1", + "hyper-util", + "prost 0.13.1", + "prost-types 0.13.1", "serde", "serde_json", "thread_local", "tokio", - "tokio-stream", - "tonic 0.11.0", + "tokio-stream 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)", + "tonic 0.12.1", "tracing", "tracing-core", "tracing-subscriber", @@ -4436,15 +4437,15 @@ dependencies = [ [[package]] name = "etcd-client" -version = "0.12.4" +version = "0.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ae697f3928e8c89ae6f4dcf788059f49fd01a76dc53e63628f5a33881f5715e" +checksum = "39bde3ce50a626efeb1caa9ab1083972d178bebb55ca627639c8ded507dfcbde" dependencies = [ - "http 0.2.9", - "prost 0.12.1", + "http 1.1.0", + "prost 0.13.1", "tokio", - "tokio-stream", - "tonic 0.10.2", + "tokio-stream 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)", + "tonic 0.12.1", "tonic-build", "tower", "tower-service", @@ -5185,7 +5186,7 @@ dependencies = [ "thiserror", "time", "tokio", - "tokio-stream", + "tokio-stream 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)", "url", "yup-oauth2", ] @@ -5301,9 +5302,9 @@ dependencies = [ [[package]] name = "google-cloud-auth" -version = "0.15.0" +version = "0.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e09ed5b2998bc8d0d3df09c859028210d4961b8fe779cfda8dc8ca4e83d5def2" +checksum = "1112c453c2e155b3e683204ffff52bcc6d6495d04b68d9e90cd24161270c5058" dependencies = [ "async-trait", "base64 0.21.7", @@ -5323,9 +5324,9 @@ dependencies = [ [[package]] name = "google-cloud-bigquery" -version = "0.9.0" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e321c127945bb44a5cf5129c37530e2494b97afefe7f334a983ac754e40914e" +checksum = "305cb7214d11b719e9f00f982c1ee1304c674f7a8dfc44a43b8bad3c909750c2" dependencies = [ "anyhow", "arrow 50.0.0", @@ -5350,29 +5351,29 @@ dependencies = [ [[package]] name = "google-cloud-gax" -version = "0.17.0" +version = "0.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8cb60314136e37de9e2a05ddb427b9c5a39c3d188de2e2f026c6af74425eef44" +checksum = "9c3eaaad103912825594d674a4b1e556ccbb05a13a6cac17dcfd871997fb760a" dependencies = [ "google-cloud-token", - "http 0.2.9", + "http 1.1.0", "thiserror", "tokio", "tokio-retry", - "tonic 0.10.2", + "tonic 0.12.1", "tower", "tracing", ] [[package]] name = "google-cloud-googleapis" -version = "0.13.0" +version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32cd184c52aa2619ac1b16ad8b5a752e91d25be88a8cf08eaec19777dfacbe54" +checksum = "0ae8ab26ef7c7c3f7dfb9cc3982293d031d8e78c85d00ddfb704b5c35aeff7c8" dependencies = [ - "prost 0.12.1", - "prost-types 0.12.1", - "tonic 0.10.2", + "prost 0.13.1", + "prost-types 0.13.1", + "tonic 0.12.1", ] [[package]] @@ -5388,9 +5389,9 @@ dependencies = [ [[package]] name = "google-cloud-pubsub" -version = "0.25.0" +version = "0.28.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a35e4a008db5cf01a5c03d3c67bd90b3cad77427ca949f3c8eddd90c4a3c932" +checksum = "55ef73601dcec5ea144e59969e921d35d66000211603fee8023b7947af09248f" dependencies = [ "async-channel 1.9.0", "async-stream", @@ -5398,7 +5399,7 @@ dependencies = [ "google-cloud-gax", "google-cloud-googleapis", "google-cloud-token", - "prost-types 0.12.1", + "prost-types 0.13.1", "thiserror", "tokio", "tokio-util", @@ -5407,9 +5408,9 @@ dependencies = [ [[package]] name = "google-cloud-token" -version = "0.1.1" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0fcd62eb34e3de2f085bcc33a09c3e17c4f65650f36d53eb328b00d63bcb536a" +checksum = "8f49c12ba8b21d128a2ce8585955246977fbce4415f680ebf9199b6f9d6d725f" dependencies = [ "async-trait", ] @@ -5595,9 +5596,9 @@ checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" [[package]] name = "hermit-abi" -version = "0.3.2" +version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "443144c8cdadd93ebf52ddb4056d257f5b52c04d3c804e657d19eb73fc33668b" +checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" [[package]] name = "hex" @@ -5760,9 +5761,9 @@ dependencies = [ [[package]] name = "hyper" -version = "1.1.0" +version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fb5aa53871fc917b1a9ed87b683a5d86db645e23acb32c2e0785a353e522fb75" +checksum = "50dfd22e0e76d0f662d429a5f80fcaf3855009297eab6a0a9f8543834744ba05" dependencies = [ "bytes", "futures-channel", @@ -5774,6 +5775,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", + "smallvec", "tokio", "want", ] @@ -5803,7 +5805,7 @@ checksum = "a0bea761b46ae2b24eb4aef630d8d1c398157b6fc29e6350ecf090a0b70c952c" dependencies = [ "futures-util", "http 1.1.0", - "hyper 1.1.0", + "hyper 1.4.1", "hyper-util", "rustls 0.22.4", "rustls-pki-types", @@ -5824,6 +5826,19 @@ dependencies = [ "tokio-io-timeout", ] +[[package]] +name = "hyper-timeout" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3203a961e5c83b6f5498933e78b6b263e208c197b63e9c6c53cc82ffd3f63793" +dependencies = [ + "hyper 1.4.1", + "hyper-util", + "pin-project-lite", + "tokio", + "tower-service", +] + [[package]] name = "hyper-tls" version = "0.5.0" @@ -5845,7 +5860,7 @@ checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0" dependencies = [ "bytes", "http-body-util", - "hyper 1.1.0", + "hyper 1.4.1", "hyper-util", "native-tls", "tokio", @@ -5855,16 +5870,16 @@ dependencies = [ [[package]] name = "hyper-util" -version = "0.1.3" +version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca38ef113da30126bbff9cd1705f9273e15d45498615d138b0c20279ac7a76aa" +checksum = "3ab92f4f49ee4fb4f997c784b7a2e0fa70050211e0b6a287f898c3c9785ca956" dependencies = [ "bytes", "futures-channel", "futures-util", "http 1.1.0", "http-body 1.0.0", - "hyper 1.1.0", + "hyper 1.4.1", "pin-project-lite", "socket2 0.5.6", "tokio", @@ -6814,13 +6829,13 @@ dependencies = [ [[package]] name = "madsim-etcd-client" -version = "0.4.0+0.12.1" +version = "0.6.0+0.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02b4b5de48bb7f3f7eae0bca62b3ed0b7d714b1b273d7347329b92c3a2eef113" +checksum = "8edcf23498cb590e415ce2ba6c7f186c7aa3340e7aa716ddddb34faf0a9ffdfb" dependencies = [ "etcd-client", "futures-util", - "http 0.2.9", + "http 1.1.0", "madsim", "serde", "serde_with 3.8.0", @@ -6828,7 +6843,7 @@ dependencies = [ "thiserror", "tokio", "toml 0.8.12", - "tonic 0.10.2", + "tonic 0.12.1", "tracing", ] @@ -6881,29 +6896,29 @@ dependencies = [ [[package]] name = "madsim-tonic" -version = "0.4.1+0.10.0" +version = "0.5.1+0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "813977c7870103e113a0332d97731f961bc48aaa8860edd318ef7d7754214436" +checksum = "61c668c82f0c2aca7ffed3235047f2539e6e41278c7c47a822999f3b7a067887" dependencies = [ "async-stream", "chrono", "futures-util", "madsim", "tokio", - "tonic 0.10.2", + "tonic 0.12.1", "tower", "tracing", ] [[package]] name = "madsim-tonic-build" -version = "0.4.2+0.10.0" +version = "0.5.0+0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4a2ad2776ba20221ccbe4e136e2fa0f7ab90eebd608373177f3e74a198a288ec" +checksum = "f271a476bbaa9d2139e1e1a5beb869c6119e805a0b67ad2b2857e4a8785b111a" dependencies = [ "prettyplease 0.2.15", "proc-macro2", - "prost-build 0.12.1", + "prost-build 0.13.1", "quote", "syn 2.0.66", "tonic-build", @@ -7932,7 +7947,7 @@ dependencies = [ "rand", "thiserror", "tokio", - "tokio-stream", + "tokio-stream 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -7985,20 +8000,20 @@ dependencies = [ [[package]] name = "otlp-embedded" version = "0.0.1" -source = "git+https://github.com/risingwavelabs/otlp-embedded?rev=492c244e0be91feb659c0cd48a624bbd96045a33#492c244e0be91feb659c0cd48a624bbd96045a33" +source = "git+https://github.com/risingwavelabs/otlp-embedded?rev=e6cd165b9bc85783b42c106e99186b86b73e3507#e6cd165b9bc85783b42c106e99186b86b73e3507" dependencies = [ "axum 0.7.4", "datasize", "hex", - "itertools 0.12.1", - "madsim-tonic", - "madsim-tonic-build", - "prost 0.12.1", + "itertools 0.13.0", + "prost 0.13.1", "rust-embed", "schnellru", "serde", "serde_json", "tokio", + "tonic 0.12.1", + "tonic-build", "tracing", ] @@ -8935,6 +8950,16 @@ dependencies = [ "prost-derive 0.12.1", ] +[[package]] +name = "prost" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e13db3d3fde688c61e2446b4d843bc27a7e8af269a69440c0308021dc92333cc" +dependencies = [ + "bytes", + "prost-derive 0.13.1", +] + [[package]] name = "prost-build" version = "0.11.9" @@ -8979,6 +9004,27 @@ dependencies = [ "which", ] +[[package]] +name = "prost-build" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5bb182580f71dd070f88d01ce3de9f4da5021db7115d2e1c3605a754153b77c1" +dependencies = [ + "bytes", + "heck 0.5.0", + "itertools 0.13.0", + "log", + "multimap 0.10.0", + "once_cell", + "petgraph", + "prettyplease 0.2.15", + "prost 0.13.1", + "prost-types 0.13.1", + "regex", + "syn 2.0.66", + "tempfile", +] + [[package]] name = "prost-derive" version = "0.11.9" @@ -9005,6 +9051,19 @@ dependencies = [ "syn 2.0.66", ] +[[package]] +name = "prost-derive" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18bec9b0adc4eba778b33684b7ba3e7137789434769ee3ce3930463ef904cfca" +dependencies = [ + "anyhow", + "itertools 0.13.0", + "proc-macro2", + "quote", + "syn 2.0.66", +] + [[package]] name = "prost-helpers" version = "0.1.0" @@ -9016,13 +9075,13 @@ dependencies = [ [[package]] name = "prost-reflect" -version = "0.13.0" +version = "0.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ae9372e3227f3685376a0836e5c248611eafc95a0be900d44bc6cdf225b700f" +checksum = "55a6a9143ae25c25fa7b6a48d6cc08b10785372060009c25140a4e7c340e95af" dependencies = [ "once_cell", - "prost 0.12.1", - "prost-types 0.12.1", + "prost 0.13.1", + "prost-types 0.13.1", ] [[package]] @@ -9043,6 +9102,15 @@ dependencies = [ "prost 0.12.1", ] +[[package]] +name = "prost-types" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cee5168b05f49d4b0ca581206eb14a7b22fafd963efe729ac48eb03266e25cc2" +dependencies = [ + "prost 0.13.1", +] + [[package]] name = "protobuf" version = "2.28.0" @@ -9160,7 +9228,7 @@ dependencies = [ "indoc", "libc", "memoffset", - "parking_lot 0.11.2", + "parking_lot 0.12.1", "portable-atomic", "pyo3-build-config", "pyo3-ffi", @@ -9623,7 +9691,7 @@ dependencies = [ "http 1.1.0", "http-body 1.0.0", "http-body-util", - "hyper 1.1.0", + "hyper 1.4.1", "hyper-rustls 0.26.0", "hyper-tls 0.6.0", "hyper-util", @@ -9813,7 +9881,7 @@ dependencies = [ "bytes", "itertools 0.12.1", "parking_lot 0.12.1", - "prost 0.12.1", + "prost 0.13.1", "risingwave_common", "risingwave_hummock_sdk", "risingwave_meta_model_v2", @@ -9855,7 +9923,7 @@ dependencies = [ "parquet 52.0.0", "paste", "prometheus", - "prost 0.12.1", + "prost 0.13.1", "rand", "risingwave_common", "risingwave_common_estimate_size", @@ -9875,7 +9943,7 @@ dependencies = [ "thiserror-ext", "tikv-jemallocator", "tokio-metrics", - "tokio-stream", + "tokio-stream 0.1.15 (git+https://github.com/madsim-rs/tokio.git?rev=0dd1055)", "tokio-util", "tracing", "twox-hash", @@ -9918,7 +9986,7 @@ dependencies = [ "serde", "serde_yaml", "thiserror-ext", - "tokio-stream", + "tokio-stream 0.1.15 (git+https://github.com/madsim-rs/tokio.git?rev=0dd1055)", "toml 0.8.12", "tracing", "tracing-subscriber", @@ -10024,7 +10092,7 @@ dependencies = [ "governor", "hashbrown 0.14.3", "hex", - "http 0.2.9", + "http 1.1.0", "http-body 0.4.5", "humantime", "hytra", @@ -10050,7 +10118,7 @@ dependencies = [ "pretty_assertions", "procfs 0.16.0", "prometheus", - "prost 0.12.1", + "prost 0.13.1", "rand", "regex", "reqwest 0.12.4", @@ -10132,14 +10200,18 @@ dependencies = [ name = "risingwave_common_metrics" version = "1.11.0-alpha" dependencies = [ + "auto_impl", "bytes", "clap", "darwin-libproc", "easy-ext", "futures", "http 0.2.9", - "http-body 0.4.5", + "http 1.1.0", + "http-body 1.0.0", "hyper 0.14.27", + "hyper 1.4.1", + "hyper-util", "hytra", "itertools 0.12.1", "libc", @@ -10180,7 +10252,7 @@ dependencies = [ "anyhow", "bincode 1.3.3", "parking_lot 0.12.1", - "prost 0.12.1", + "prost 0.13.1", "risingwave_pb", "serde", "thiserror", @@ -10195,7 +10267,7 @@ dependencies = [ "async-trait", "axum 0.7.4", "futures", - "hyper 0.14.27", + "http 1.1.0", "madsim-tokio", "madsim-tonic", "prometheus", @@ -10249,7 +10321,7 @@ dependencies = [ "madsim-tokio", "madsim-tonic", "parking_lot 0.12.1", - "prost 0.12.1", + "prost 0.13.1", "risingwave_common", "risingwave_common_heap_profiling", "risingwave_common_service", @@ -10275,14 +10347,15 @@ dependencies = [ "foyer", "futures", "futures-async-stream", - "hyper 0.14.27", + "http 1.1.0", + "hyper 1.4.1", "itertools 0.12.1", "madsim-tokio", "madsim-tonic", "maplit", "pprof", "prometheus", - "prost 0.12.1", + "prost 0.13.1", "rand", "risingwave_batch", "risingwave_common", @@ -10301,7 +10374,7 @@ dependencies = [ "tempfile", "thiserror-ext", "tikv-jemalloc-ctl", - "tokio-stream", + "tokio-stream 0.1.15 (git+https://github.com/madsim-rs/tokio.git?rev=0dd1055)", "tower", "tracing", "uuid", @@ -10388,10 +10461,10 @@ dependencies = [ "postgres-openssl", "pretty_assertions", "prometheus", - "prost 0.12.1", + "prost 0.13.1", "prost-build 0.12.1", "prost-reflect", - "prost-types 0.12.1", + "prost-types 0.13.1", "protobuf-native", "protobuf-src", "pulsar", @@ -10431,7 +10504,7 @@ dependencies = [ "time", "tokio-postgres", "tokio-retry", - "tokio-stream", + "tokio-stream 0.1.15 (git+https://github.com/madsim-rs/tokio.git?rev=0dd1055)", "tokio-util", "tracing", "tracing-subscriber", @@ -10489,7 +10562,7 @@ dependencies = [ "madsim-tokio", "madsim-tonic", "memcomparable", - "prost 0.12.1", + "prost 0.13.1", "regex", "risingwave_common", "risingwave_connector", @@ -10729,7 +10802,7 @@ dependencies = [ "pretty-xmlish", "pretty_assertions", "prometheus", - "prost 0.12.1", + "prost 0.13.1", "rand", "risingwave_batch", "risingwave_common", @@ -10756,7 +10829,7 @@ dependencies = [ "tempfile", "thiserror", "thiserror-ext", - "tokio-stream", + "tokio-stream 0.1.15 (git+https://github.com/madsim-rs/tokio.git?rev=0dd1055)", "tracing", "uuid", "workspace-hack", @@ -10781,7 +10854,7 @@ dependencies = [ "hex", "itertools 0.12.1", "parse-display", - "prost 0.12.1", + "prost 0.13.1", "risingwave_common", "risingwave_common_estimate_size", "risingwave_pb", @@ -10839,7 +10912,7 @@ dependencies = [ "madsim-tokio", "mockall", "parking_lot 0.12.1", - "prost 0.12.1", + "prost 0.13.1", "risingwave_common", "risingwave_hummock_sdk", "risingwave_pb", @@ -10858,7 +10931,7 @@ dependencies = [ "futures", "jni", "madsim-tokio", - "prost 0.12.1", + "prost 0.13.1", "risingwave_common", "risingwave_expr", "risingwave_hummock_sdk", @@ -10888,7 +10961,7 @@ dependencies = [ "jni", "madsim-tokio", "paste", - "prost 0.12.1", + "prost 0.13.1", "risingwave_common", "risingwave_expr", "risingwave_hummock_sdk", @@ -10955,7 +11028,7 @@ dependencies = [ "function_name", "futures", "hex", - "hyper 0.14.27", + "http 1.1.0", "itertools 0.12.1", "jsonbb", "madsim-etcd-client", @@ -10970,7 +11043,7 @@ dependencies = [ "parking_lot 0.12.1", "prometheus", "prometheus-http-query", - "prost 0.12.1", + "prost 0.13.1", "rand", "risingwave_backup", "risingwave_common", @@ -10997,7 +11070,7 @@ dependencies = [ "thiserror", "thiserror-ext", "tokio-retry", - "tokio-stream", + "tokio-stream 0.1.15 (git+https://github.com/madsim-rs/tokio.git?rev=0dd1055)", "tower", "tower-http", "tracing", @@ -11043,7 +11116,7 @@ dependencies = [ name = "risingwave_meta_model_v2" version = "1.11.0-alpha" dependencies = [ - "prost 0.12.1", + "prost 0.13.1", "risingwave_common", "risingwave_hummock_sdk", "risingwave_pb", @@ -11097,7 +11170,7 @@ dependencies = [ "itertools 0.12.1", "madsim-tokio", "madsim-tonic", - "prost 0.12.1", + "prost 0.13.1", "rand", "regex", "risingwave_common", @@ -11110,7 +11183,7 @@ dependencies = [ "serde_json", "sync-point", "thiserror-ext", - "tokio-stream", + "tokio-stream 0.1.15 (git+https://github.com/madsim-rs/tokio.git?rev=0dd1055)", "tracing", "workspace-hack", ] @@ -11162,8 +11235,8 @@ dependencies = [ "madsim-tonic-build", "pbjson", "pbjson-build", - "prost 0.12.1", - "prost-build 0.12.1", + "prost 0.13.1", + "prost-build 0.13.1", "prost-helpers", "risingwave_error", "serde", @@ -11218,8 +11291,8 @@ dependencies = [ "easy-ext", "either", "futures", - "http 0.2.9", - "hyper 0.14.27", + "http 1.1.0", + "hyper 1.4.1", "itertools 0.12.1", "lru 0.7.6", "madsim-tokio", @@ -11236,7 +11309,7 @@ dependencies = [ "thiserror", "thiserror-ext", "tokio-retry", - "tokio-stream", + "tokio-stream 0.1.15 (git+https://github.com/madsim-rs/tokio.git?rev=0dd1055)", "tower", "tracing", "url", @@ -11322,7 +11395,7 @@ dependencies = [ "tempfile", "tikv-jemallocator", "tokio-postgres", - "tokio-stream", + "tokio-stream 0.1.15 (git+https://github.com/madsim-rs/tokio.git?rev=0dd1055)", "tracing", "tracing-subscriber", ] @@ -11390,7 +11463,7 @@ dependencies = [ "serde", "serde_with 3.8.0", "tokio-postgres", - "tokio-stream", + "tokio-stream 0.1.15 (git+https://github.com/madsim-rs/tokio.git?rev=0dd1055)", "toml 0.8.12", "tracing", "workspace-hack", @@ -11437,7 +11510,7 @@ dependencies = [ "parking_lot 0.12.1", "procfs 0.16.0", "prometheus", - "prost 0.12.1", + "prost 0.13.1", "rand", "risingwave_backup", "risingwave_common", @@ -11506,7 +11579,7 @@ dependencies = [ "pin-project", "prehash", "prometheus", - "prost 0.12.1", + "prost 0.13.1", "rand", "risingwave_common", "risingwave_common_estimate_size", @@ -11531,7 +11604,7 @@ dependencies = [ "thiserror-ext", "tokio-metrics", "tokio-retry", - "tokio-stream", + "tokio-stream 0.1.15 (git+https://github.com/madsim-rs/tokio.git?rev=0dd1055)", "tracing", "tracing-test", "workspace-hack", @@ -12077,9 +12150,9 @@ dependencies = [ [[package]] name = "schnellru" -version = "0.2.1" +version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "772575a524feeb803e5b0fcbc6dd9f367e579488197c94c6e4023aad2305774d" +checksum = "c9a8ef13a93c54d20580de1e5c413e624e53121d42fc7e2c11d10ef7f8b02367" dependencies = [ "ahash 0.8.11", "cfg-if", @@ -13122,7 +13195,7 @@ dependencies = [ "thiserror", "time", "tokio", - "tokio-stream", + "tokio-stream 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)", "tracing", "url", "uuid", @@ -13876,7 +13949,7 @@ dependencies = [ "futures-util", "pin-project-lite", "tokio", - "tokio-stream", + "tokio-stream 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -13970,8 +14043,19 @@ dependencies = [ [[package]] name = "tokio-stream" -version = "0.1.14" -source = "git+https://github.com/madsim-rs/tokio.git?rev=fe39bb8e#fe39bb8e8ab0ed96ee1b4477ab5508c20ce017fb" +version = "0.1.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "267ac89e0bec6e691e5813911606935d77c476ff49024f98abcea3e7b15e37af" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "tokio-stream" +version = "0.1.15" +source = "git+https://github.com/madsim-rs/tokio.git?rev=0dd1055#0dd105567b323c863c29f794d2221ed588956d8d" dependencies = [ "futures-core", "madsim-tokio", @@ -14074,12 +14158,11 @@ dependencies = [ "axum 0.6.20", "base64 0.21.7", "bytes", - "flate2", "h2 0.3.26", "http 0.2.9", "http-body 0.4.5", "hyper 0.14.27", - "hyper-timeout", + "hyper-timeout 0.4.1", "percent-encoding", "pin-project", "prost 0.12.1", @@ -14087,12 +14170,11 @@ dependencies = [ "rustls-pemfile 1.0.4", "tokio", "tokio-rustls 0.24.1", - "tokio-stream", + "tokio-stream 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)", "tower", "tower-layer", "tower-service", "tracing", - "webpki-roots 0.25.2", ] [[package]] @@ -14110,27 +14192,61 @@ dependencies = [ "http 0.2.9", "http-body 0.4.5", "hyper 0.14.27", - "hyper-timeout", + "hyper-timeout 0.4.1", "percent-encoding", "pin-project", "prost 0.12.1", "tokio", - "tokio-stream", + "tokio-stream 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)", "tower", "tower-layer", "tower-service", "tracing", ] +[[package]] +name = "tonic" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38659f4a91aba8598d27821589f5db7dddd94601e7a01b1e485a50e5484c7401" +dependencies = [ + "async-stream", + "async-trait", + "axum 0.7.4", + "base64 0.22.0", + "bytes", + "flate2", + "h2 0.4.4", + "http 1.1.0", + "http-body 1.0.0", + "http-body-util", + "hyper 1.4.1", + "hyper-timeout 0.5.1", + "hyper-util", + "percent-encoding", + "pin-project", + "prost 0.13.1", + "rustls-pemfile 2.1.1", + "socket2 0.5.6", + "tokio", + "tokio-rustls 0.26.0", + "tokio-stream 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)", + "tower", + "tower-layer", + "tower-service", + "tracing", + "webpki-roots 0.26.1", +] + [[package]] name = "tonic-build" -version = "0.10.2" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d021fc044c18582b9a2408cd0dd05b1596e3ecdb5c4df822bb0183545683889" +checksum = "568392c5a2bd0020723e3f387891176aabafe36fd9fcd074ad309dfa0c8eb964" dependencies = [ "prettyplease 0.2.15", "proc-macro2", - "prost-build 0.12.1", + "prost-build 0.13.1", "quote", "syn 2.0.66", ] diff --git a/Cargo.toml b/Cargo.toml index 9b07142dcf021..5bfab4feb27fb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -121,7 +121,7 @@ aws-smithy-types = { version = "1", default-features = false, features = [ aws-endpoint = "0.60" aws-types = "1" axum = "=0.7.4" # TODO: 0.7.5+ does not work with current toolchain -etcd-client = { package = "madsim-etcd-client", version = "0.4" } +etcd-client = { package = "madsim-etcd-client", version = "0.6" } futures-async-stream = "0.2.9" hytra = "0.1" rdkafka = { package = "madsim-rdkafka", version = "0.4.1", features = [ @@ -129,11 +129,11 @@ rdkafka = { package = "madsim-rdkafka", version = "0.4.1", features = [ ] } hashbrown = { version = "0.14", features = ["ahash", "inline-more", "nightly"] } criterion = { version = "0.5", features = ["async_futures"] } -tonic = { package = "madsim-tonic", version = "0.4.1" } -tonic-build = { package = "madsim-tonic-build", version = "0.4.2" } -otlp-embedded = { git = "https://github.com/risingwavelabs/otlp-embedded", rev = "492c244e0be91feb659c0cd48a624bbd96045a33" } -prost = { version = "0.12" } -prost-build = { version = "0.12" } +tonic = { package = "madsim-tonic", version = "0.5.1" } +tonic-build = { package = "madsim-tonic-build", version = "0.5" } +otlp-embedded = { git = "https://github.com/risingwavelabs/otlp-embedded", rev = "e6cd165b9bc85783b42c106e99186b86b73e3507" } +prost = { version = "0.13" } +prost-build = { version = "0.13" } icelake = { git = "https://github.com/risingwavelabs/icelake.git", rev = "1860eb315183a5f3f72b4097c1e40d49407f8373", features = [ "prometheus", ] } @@ -180,6 +180,7 @@ tikv-jemallocator = { git = "https://github.com/risingwavelabs/jemallocator.git" "profiling", "stats", ], rev = "64a2d9" } +# TODO(http-bump): bump to use tonic 0.12 once minitrace-opentelemetry is updated opentelemetry = "0.23" opentelemetry-otlp = "0.16" opentelemetry_sdk = { version = "0.23", default-features = false } @@ -195,6 +196,7 @@ sea-orm = { version = "0.12.14", features = [ "runtime-tokio-native-tls", ] } sqlx = "0.7" +tokio-stream = { git = "https://github.com/madsim-rs/tokio.git", rev = "0dd1055", features = ["net", "fs"] } tokio-util = "0.7" tracing-opentelemetry = "0.24" rand = { version = "0.8", features = ["small_rng"] } @@ -335,7 +337,9 @@ opt-level = 2 # Patch third-party crates for deterministic simulation. quanta = { git = "https://github.com/madsim-rs/quanta.git", rev = "948bdc3" } getrandom = { git = "https://github.com/madsim-rs/getrandom.git", rev = "e79a7ae" } -tokio-stream = { git = "https://github.com/madsim-rs/tokio.git", rev = "fe39bb8e" } +# Don't patch `tokio-stream`, but only use the madsim version for **direct** dependencies. +# Imagine an unpatched dependency depends on the original `tokio` and the patched `tokio-stream`. +# tokio-stream = { git = "https://github.com/madsim-rs/tokio.git", rev = "0dd1055" } tokio-retry = { git = "https://github.com/madsim-rs/rust-tokio-retry.git", rev = "95e2fd3" } tokio-postgres = { git = "https://github.com/madsim-rs/rust-postgres.git", rev = "ac00d88" } futures-timer = { git = "https://github.com/madsim-rs/futures-timer.git", rev = "05b33b4" } diff --git a/ci/docker-compose.yml b/ci/docker-compose.yml index b230e0381d147..4b1954ff5ae2c 100644 --- a/ci/docker-compose.yml +++ b/ci/docker-compose.yml @@ -266,7 +266,6 @@ services: SCHEMA_REGISTRY_HOST_NAME: schemaregistry SCHEMA_REGISTRY_LISTENERS: http://schemaregistry:8082 SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: message_queue:29092 - SCHEMA_REGISTRY_DEBUG: 'true' pulsar-server: container_name: pulsar-server diff --git a/e2e_test/sink/kafka/protobuf.slt b/e2e_test/sink/kafka/protobuf.slt index 0abd242e3c79d..5f032ba32f8dc 100644 --- a/e2e_test/sink/kafka/protobuf.slt +++ b/e2e_test/sink/kafka/protobuf.slt @@ -201,25 +201,19 @@ format plain encode protobuf ( message = 'recursive.AllTypes'); statement ok -drop sink sink_upsert; +drop table from_kafka cascade; statement ok -drop sink sink_csr_nested; +drop table from_kafka_csr_trivial cascade; statement ok -drop sink sink_csr_trivial; +drop table from_kafka_csr_nested cascade; statement ok -drop sink sink0; +drop table from_kafka_raw cascade; statement ok -drop table into_kafka; - -statement ok -drop table from_kafka_raw; +drop table into_kafka cascade; system ok rpk topic delete test-rw-sink-upsert-protobuf - -statement ok -drop table from_kafka; diff --git a/src/batch/Cargo.toml b/src/batch/Cargo.toml index 099ae9019afcf..403eb864229d3 100644 --- a/src/batch/Cargo.toml +++ b/src/batch/Cargo.toml @@ -63,7 +63,7 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [ "fs", ] } tokio-metrics = "0.3.0" -tokio-stream = "0.1" +tokio-stream = { workspace = true } tokio-util = { workspace = true } tonic = { workspace = true } tracing = "0.1" diff --git a/src/batch/src/executor/hash_agg.rs b/src/batch/src/executor/hash_agg.rs index d69d4fbc8b174..00073217f7ead 100644 --- a/src/batch/src/executor/hash_agg.rs +++ b/src/batch/src/executor/hash_agg.rs @@ -20,7 +20,6 @@ use bytes::Bytes; use futures_async_stream::try_stream; use hashbrown::hash_map::Entry; use itertools::Itertools; -use prost::Message; use risingwave_common::array::{DataChunk, StreamChunk}; use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::{Field, Schema}; @@ -35,6 +34,7 @@ use risingwave_expr::aggregate::{AggCall, AggregateState, BoxedAggregateFunction use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::HashAggNode; use risingwave_pb::data::DataChunk as PbDataChunk; +use risingwave_pb::Message; use crate::error::{BatchError, Result}; use crate::executor::aggregation::build as build_agg; diff --git a/src/batch/src/executor/join/distributed_lookup_join.rs b/src/batch/src/executor/join/distributed_lookup_join.rs index f5ad5ab5ed984..1068ffd7f3349 100644 --- a/src/batch/src/executor/join/distributed_lookup_join.rs +++ b/src/batch/src/executor/join/distributed_lookup_join.rs @@ -354,10 +354,7 @@ impl LookupExecutorBuilder for InnerSideExecutorBuilder { let pk_prefix = OwnedRow::new(scan_range.eq_conds); if self.lookup_prefix_len == self.table.pk_indices().len() { - let row = self - .table - .get_row(&pk_prefix, self.epoch.clone().into()) - .await?; + let row = self.table.get_row(&pk_prefix, self.epoch.into()).await?; if let Some(row) = row { self.row_list.push(row); @@ -366,7 +363,7 @@ impl LookupExecutorBuilder for InnerSideExecutorBuilder { let iter = self .table .batch_iter_with_pk_bounds( - self.epoch.clone().into(), + self.epoch.into(), &pk_prefix, .., false, diff --git a/src/batch/src/executor/join/hash_join.rs b/src/batch/src/executor/join/hash_join.rs index 026f03fb65deb..3bfb583d6459d 100644 --- a/src/batch/src/executor/join/hash_join.rs +++ b/src/batch/src/executor/join/hash_join.rs @@ -20,7 +20,6 @@ use std::sync::Arc; use bytes::Bytes; use futures_async_stream::try_stream; use itertools::Itertools; -use prost::Message; use risingwave_common::array::{Array, DataChunk, RowRef}; use risingwave_common::bitmap::{Bitmap, BitmapBuilder}; use risingwave_common::catalog::Schema; @@ -34,6 +33,7 @@ use risingwave_common_estimate_size::EstimateSize; use risingwave_expr::expr::{build_from_prost, BoxedExpression, Expression}; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::data::DataChunk as PbDataChunk; +use risingwave_pb::Message; use super::{ChunkedData, JoinType, RowId}; use crate::error::{BatchError, Result}; diff --git a/src/batch/src/executor/join/local_lookup_join.rs b/src/batch/src/executor/join/local_lookup_join.rs index 7fcaba71a9c3b..a3be00fc39a22 100644 --- a/src/batch/src/executor/join/local_lookup_join.rs +++ b/src/batch/src/executor/join/local_lookup_join.rs @@ -134,7 +134,7 @@ impl InnerSideExecutorBuilder { ..Default::default() }), }), - epoch: Some(self.epoch.clone()), + epoch: Some(self.epoch), tracing_context: TracingContext::from_current_span().to_protobuf(), }; @@ -237,7 +237,7 @@ impl LookupExecutorBuilder for InnerSideExecutorBuilder &plan_node, &task_id, self.context.clone(), - self.epoch.clone(), + self.epoch, self.shutdown_rx.clone(), ); diff --git a/src/batch/src/executor/mod.rs b/src/batch/src/executor/mod.rs index 3a64901c64a04..80dc57b4f3620 100644 --- a/src/batch/src/executor/mod.rs +++ b/src/batch/src/executor/mod.rs @@ -174,7 +174,7 @@ impl<'a, C: Clone> ExecutorBuilder<'a, C> { plan_node, self.task_id, self.context.clone(), - self.epoch.clone(), + self.epoch, self.shutdown_rx.clone(), ) } @@ -188,7 +188,7 @@ impl<'a, C: Clone> ExecutorBuilder<'a, C> { } pub fn epoch(&self) -> BatchQueryEpoch { - self.epoch.clone() + self.epoch } } diff --git a/src/batch/src/executor/order_by.rs b/src/batch/src/executor/order_by.rs index 3f8c8e106c78f..ad7cc13992346 100644 --- a/src/batch/src/executor/order_by.rs +++ b/src/batch/src/executor/order_by.rs @@ -17,7 +17,6 @@ use std::sync::Arc; use bytes::Bytes; use futures_async_stream::try_stream; use itertools::Itertools; -use prost::Message; use risingwave_common::array::DataChunk; use risingwave_common::catalog::Schema; use risingwave_common::memory::MemoryContext; @@ -28,6 +27,7 @@ use risingwave_common::util::sort_util::ColumnOrder; use risingwave_common_estimate_size::EstimateSize; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::data::DataChunk as PbDataChunk; +use risingwave_pb::Message; use super::{ BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder, diff --git a/src/batch/src/executor/row_seq_scan.rs b/src/batch/src/executor/row_seq_scan.rs index b8287147c6750..b897dbd813787 100644 --- a/src/batch/src/executor/row_seq_scan.rs +++ b/src/batch/src/executor/row_seq_scan.rs @@ -237,7 +237,7 @@ impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder { let ordered = seq_scan_node.ordered; - let epoch = source.epoch.clone(); + let epoch = source.epoch; let limit = seq_scan_node.limit; let as_of = seq_scan_node .as_of @@ -341,8 +341,7 @@ impl RowSeqScanExecutor { for point_get in point_gets { let table = table.clone(); if let Some(row) = - Self::execute_point_get(table, point_get, query_epoch.clone(), histogram.clone()) - .await? + Self::execute_point_get(table, point_get, query_epoch, histogram.clone()).await? { if let Some(chunk) = data_chunk_builder.append_one_row(row) { returned += chunk.cardinality() as u64; @@ -373,7 +372,7 @@ impl RowSeqScanExecutor { table.clone(), range, ordered, - query_epoch.clone(), + query_epoch, chunk_size, limit, histogram.clone(), diff --git a/src/batch/src/spill/spill_op.rs b/src/batch/src/spill/spill_op.rs index 237ee3baf0099..b3e842a269ec7 100644 --- a/src/batch/src/spill/spill_op.rs +++ b/src/batch/src/spill/spill_op.rs @@ -22,9 +22,9 @@ use futures_util::AsyncReadExt; use opendal::layers::RetryLayer; use opendal::services::{Fs, Memory}; use opendal::Operator; -use prost::Message; use risingwave_common::array::DataChunk; use risingwave_pb::data::DataChunk as PbDataChunk; +use risingwave_pb::Message; use thiserror_ext::AsReport; use tokio::sync::Mutex; use twox_hash::XxHash64; diff --git a/src/batch/src/task/broadcast_channel.rs b/src/batch/src/task/broadcast_channel.rs index d66eda7d7d620..9781e38e7d7f6 100644 --- a/src/batch/src/task/broadcast_channel.rs +++ b/src/batch/src/task/broadcast_channel.rs @@ -86,7 +86,7 @@ pub fn new_broadcast_channel( output_channel_size: usize, ) -> (ChanSenderImpl, Vec) { let broadcast_info = match shuffle.distribution { - Some(exchange_info::Distribution::BroadcastInfo(ref v)) => v.clone(), + Some(exchange_info::Distribution::BroadcastInfo(ref v)) => *v, _ => BroadcastInfo::default(), }; diff --git a/src/batch/src/task/task_execution.rs b/src/batch/src/task/task_execution.rs index 4536dad1c031f..7186ced55febd 100644 --- a/src/batch/src/task/task_execution.rs +++ b/src/batch/src/task/task_execution.rs @@ -393,7 +393,7 @@ impl BatchTaskExecution { self.plan.root.as_ref().unwrap(), &self.task_id, self.context.clone(), - self.epoch.clone(), + self.epoch, self.shutdown_rx.clone(), ) .build(), diff --git a/src/bench/Cargo.toml b/src/bench/Cargo.toml index d451ef46ef838..43451ebaeb9d1 100644 --- a/src/bench/Cargo.toml +++ b/src/bench/Cargo.toml @@ -50,7 +50,7 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [ "time", "signal", ] } -tokio-stream = "0.1" +tokio-stream = { workspace = true } toml = "0.8" tracing = "0.1" tracing-subscriber = "0.3.17" diff --git a/src/common/Cargo.toml b/src/common/Cargo.toml index a117dce645ae6..2cc1d81f1a38d 100644 --- a/src/common/Cargo.toml +++ b/src/common/Cargo.toml @@ -55,7 +55,7 @@ futures = { version = "0.3", default-features = false, features = ["alloc"] } governor = { version = "0.6", default-features = false, features = ["std"] } hashbrown = "0.14" hex = "0.4.3" -http = "0.2" +http = "1" humantime = "2.1" hytra = { workspace = true } itertools = { workspace = true } diff --git a/src/common/common_service/Cargo.toml b/src/common/common_service/Cargo.toml index cb43702f3f9e6..87206ab7cbc1d 100644 --- a/src/common/common_service/Cargo.toml +++ b/src/common/common_service/Cargo.toml @@ -18,7 +18,7 @@ normal = ["workspace-hack"] async-trait = "0.1" axum = { workspace = true } futures = { version = "0.3", default-features = false, features = ["alloc"] } -hyper = "0.14" # required by tonic +http = "1" prometheus = { version = "0.13" } risingwave_common = { workspace = true } risingwave_pb = { workspace = true } diff --git a/src/common/common_service/src/tracing.rs b/src/common/common_service/src/tracing.rs index 3ee4a64231c29..de6f43bbf33f3 100644 --- a/src/common/common_service/src/tracing.rs +++ b/src/common/common_service/src/tracing.rs @@ -15,8 +15,8 @@ use std::task::{Context, Poll}; use futures::Future; -use hyper::Body; use risingwave_common::util::tracing::TracingContext; +use tonic::body::BoxBody; use tower::{Layer, Service}; use tracing::Instrument; @@ -49,9 +49,9 @@ pub struct TracingExtract { inner: S, } -impl Service> for TracingExtract +impl Service> for TracingExtract where - S: Service> + Clone + Send + 'static, + S: Service> + Clone + Send + 'static, S::Future: Send + 'static, { type Error = S::Error; @@ -63,7 +63,7 @@ where self.inner.poll_ready(cx) } - fn call(&mut self, req: hyper::Request) -> Self::Future { + fn call(&mut self, req: http::Request) -> Self::Future { // This is necessary because tonic internally uses `tower::buffer::Buffer`. // See https://github.com/tower-rs/tower/issues/547#issuecomment-767629149 // for details on why this is necessary diff --git a/src/common/metrics/Cargo.toml b/src/common/metrics/Cargo.toml index 4f3e8b20936b2..0c32b557cebb2 100644 --- a/src/common/metrics/Cargo.toml +++ b/src/common/metrics/Cargo.toml @@ -15,12 +15,16 @@ ignored = ["workspace-hack"] normal = ["workspace-hack"] [dependencies] +auto_impl = "1" bytes = "1" clap = { workspace = true } easy-ext = "1" futures = { version = "0.3", default-features = false, features = ["alloc"] } -http = "0.2" -hyper = { version = "0.14", features = ["client"] } # used by tonic +http = "1" +http-02 = { package = "http", version = "0.2" } +hyper = { version = "1" } +hyper-014 = { package = "hyper", version = "0.14" } +hyper-util = { version = "0.1", features = ["client-legacy"] } hytra = { workspace = true } itertools = { workspace = true } parking_lot = { workspace = true } @@ -32,13 +36,13 @@ serde = { version = "1", features = ["derive"] } thiserror-ext = { workspace = true } tokio = { version = "0.2", package = "madsim-tokio" } tonic = { workspace = true } +tower-layer = "0.3.2" +tower-service = "0.3.2" tracing = "0.1" tracing-subscriber = "0.3.17" [target.'cfg(not(madsim))'.dependencies] -http-body = "0.4.5" -tower-layer = "0.3.2" -tower-service = "0.3.2" +http-body = "1" [target.'cfg(target_os = "linux")'.dependencies] procfs = { version = "0.16", default-features = false } libc = "0.2" diff --git a/src/common/metrics/src/monitor/connection.rs b/src/common/metrics/src/monitor/connection.rs index e5774a3f16d7d..aa7c8c8d4baa3 100644 --- a/src/common/metrics/src/monitor/connection.rs +++ b/src/common/metrics/src/monitor/connection.rs @@ -24,10 +24,9 @@ use std::time::Duration; use futures::FutureExt; use http::Uri; -use hyper::client::connect::dns::{GaiAddrs, GaiFuture, GaiResolver, Name}; -use hyper::client::connect::Connection; -use hyper::client::HttpConnector; -use hyper::service::Service; +use hyper_util::client::legacy::connect::dns::{GaiAddrs, GaiFuture, GaiResolver, Name}; +use hyper_util::client::legacy::connect::{Connected, Connection, HttpConnector}; +use hyper_util::rt::TokioIo; use itertools::Itertools; use pin_project_lite::pin_project; use prometheus::{ @@ -37,11 +36,13 @@ use prometheus::{ use thiserror_ext::AsReport; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use tonic::transport::{Channel, Endpoint}; +use tower_service::Service; use tracing::{debug, info, warn}; use crate::monitor::GLOBAL_METRICS_REGISTRY; use crate::{register_guarded_int_counter_vec_with_registry, LabelGuardedIntCounterVec}; +#[auto_impl::auto_impl(&mut)] pub trait MonitorAsyncReadWrite { fn on_read(&mut self, _size: usize) {} fn on_eof(&mut self) {} @@ -74,6 +75,14 @@ impl MonitoredConnection { let this = this.project(); (this.inner, this.monitor) } + + /// Delegate async read/write traits between tokio and hyper. + fn hyper_tokio_delegate( + self: Pin<&mut Self>, + ) -> TokioIo>, &mut M>> { + let (inner, monitor) = MonitoredConnection::project_into(self); + TokioIo::new(MonitoredConnection::new(TokioIo::new(inner), monitor)) + } } impl AsyncRead for MonitoredConnection { @@ -112,6 +121,16 @@ impl AsyncRead for MonitoredConnection hyper::rt::Read for MonitoredConnection { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: hyper::rt::ReadBufCursor<'_>, + ) -> Poll> { + hyper::rt::Read::poll_read(std::pin::pin!(self.hyper_tokio_delegate()), cx, buf) + } +} + impl AsyncWrite for MonitoredConnection { fn poll_write( self: Pin<&mut Self>, @@ -186,8 +205,41 @@ impl AsyncWrite for MonitoredConnection } } +impl hyper::rt::Write for MonitoredConnection { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + hyper::rt::Write::poll_write(std::pin::pin!(self.hyper_tokio_delegate()), cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + hyper::rt::Write::poll_flush(std::pin::pin!(self.hyper_tokio_delegate()), cx) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + hyper::rt::Write::poll_shutdown(std::pin::pin!(self.hyper_tokio_delegate()), cx) + } + + fn is_write_vectored(&self) -> bool { + self.inner.is_write_vectored() + } + + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[std::io::IoSlice<'_>], + ) -> Poll> { + hyper::rt::Write::poll_write_vectored(std::pin::pin!(self.hyper_tokio_delegate()), cx, bufs) + } +} + impl Connection for MonitoredConnection { - fn connected(&self) -> hyper::client::connect::Connected { + fn connected(&self) -> Connected { self.inner.connected() } } @@ -275,6 +327,58 @@ where } } +// Compatibility implementation for hyper 0.14 ecosystem. +// Should be the same as those with imports from `http::Uri` and `hyper_util::client::legacy`. +// TODO(http-bump): remove this after there is no more dependency on hyper 0.14. +mod compat { + use http_02::Uri; + use hyper_014::client::connect::{Connected, Connection}; + + use super::*; + + impl, M: MonitorNewConnection + Clone + 'static> Service + for MonitoredConnection + where + C::Future: 'static, + { + type Error = C::Error; + type Response = MonitoredConnection; + + type Future = impl Future> + 'static; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + let ret = self.inner.poll_ready(cx); + if let Poll::Ready(Err(_)) = &ret { + self.monitor.on_err("".to_string()); + } + ret + } + + fn call(&mut self, uri: Uri) -> Self::Future { + let endpoint = format!("{:?}", uri.host()); + let monitor = self.monitor.clone(); + self.inner + .call(uri) + .map(move |result: Result<_, _>| match result { + Ok(resp) => Ok(MonitoredConnection::new( + resp, + monitor.new_connection_monitor(endpoint), + )), + Err(e) => { + monitor.on_err(endpoint); + Err(e) + } + }) + } + } + + impl Connection for MonitoredConnection { + fn connected(&self) -> Connected { + self.inner.connected() + } + } +} + #[derive(Clone)] pub struct ConnectionMetrics { connection_count: IntGaugeVec, @@ -534,18 +638,16 @@ impl tonic::transport::server::Router { signal: impl Future, ) -> impl Future where - L: tower_layer::Layer, - L::Service: Service< - http::request::Request, - Response = http::response::Response, - > + Clone + L: tower_layer::Layer, + L::Service: Service, Response = http::Response> + + Clone + Send + 'static, - <>::Service as Service< - http::request::Request, + <>::Service as Service< + http::Request, >>::Future: Send + 'static, - <>::Service as Service< - http::request::Request, + <>::Service as Service< + http::Request, >>::Error: Into> + Send, ResBody: http_body::Body + Send + 'static, ResBody::Error: Into>, diff --git a/src/common/src/vnode_mapping/vnode_placement.rs b/src/common/src/vnode_mapping/vnode_placement.rs index 10cb93b9fc290..1afdac2baa2f3 100644 --- a/src/common/src/vnode_mapping/vnode_placement.rs +++ b/src/common/src/vnode_mapping/vnode_placement.rs @@ -232,7 +232,7 @@ mod tests { let worker_1 = WorkerNode { id: 1, parallelism: 1, - property: Some(serving_property.clone()), + property: Some(serving_property), ..Default::default() }; @@ -247,7 +247,7 @@ mod tests { let worker_2 = WorkerNode { id: 2, parallelism: 50, - property: Some(serving_property.clone()), + property: Some(serving_property), ..Default::default() }; diff --git a/src/compute/Cargo.toml b/src/compute/Cargo.toml index a3f74792982f2..ed1758029092b 100644 --- a/src/compute/Cargo.toml +++ b/src/compute/Cargo.toml @@ -23,7 +23,8 @@ either = "1" foyer = { workspace = true } futures = { version = "0.3", default-features = false, features = ["alloc"] } futures-async-stream = { workspace = true } -hyper = "0.14" # required by tonic +http = "1" +hyper = "1" itertools = { workspace = true } maplit = "1.0.2" pprof = { version = "0.13", features = ["flamegraph"] } @@ -54,7 +55,7 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [ "signal", "fs", ] } -tokio-stream = "0.1" +tokio-stream = { workspace = true } tonic = { workspace = true } tower = { version = "0.4", features = ["util", "load-shed"] } tracing = "0.1" diff --git a/src/compute/src/rpc/service/monitor_service.rs b/src/compute/src/rpc/service/monitor_service.rs index a9a41d753ac96..0acc30e0c2430 100644 --- a/src/compute/src/rpc/service/monitor_service.rs +++ b/src/compute/src/rpc/service/monitor_service.rs @@ -389,8 +389,7 @@ pub mod grpc_middleware { use either::Either; use futures::Future; - use hyper::Body; - use tonic::transport::NamedService; + use tonic::body::BoxBody; use tower::{Layer, Service}; /// Manages the await-trees of `gRPC` requests that are currently served by the compute node. @@ -438,10 +437,9 @@ pub mod grpc_middleware { next_id: Arc, } - impl Service> for AwaitTreeMiddleware + impl Service> for AwaitTreeMiddleware where - S: Service> + Clone + Send + 'static, - S::Future: Send + 'static, + S: Service> + Clone, { type Error = S::Error; type Response = S::Response; @@ -452,7 +450,7 @@ pub mod grpc_middleware { self.inner.poll_ready(cx) } - fn call(&mut self, req: hyper::Request) -> Self::Future { + fn call(&mut self, req: http::Request) -> Self::Future { let Some(registry) = self.registry.clone() else { return Either::Left(self.inner.call(req)); }; @@ -479,7 +477,8 @@ pub mod grpc_middleware { } } - impl NamedService for AwaitTreeMiddleware { + #[cfg(not(madsim))] + impl tonic::server::NamedService for AwaitTreeMiddleware { const NAME: &'static str = S::NAME; } } diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index b46fc15bb605b..38e82ccdf76ea 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -61,10 +61,10 @@ futures = { version = "0.3", default-features = false, features = ["alloc"] } futures-async-stream = { workspace = true } gcp-bigquery-client = "0.18.0" glob = "0.3" -google-cloud-bigquery = { version = "0.9.0", features = ["auth"] } -google-cloud-gax = "0.17.0" -google-cloud-googleapis = { version = "0.13", features = ["pubsub", "bigquery"] } -google-cloud-pubsub = "0.25" +google-cloud-bigquery = { version = "0.12.0", features = ["auth"] } +google-cloud-gax = "0.19.0" +google-cloud-googleapis = { version = "0.15", features = ["pubsub", "bigquery"] } +google-cloud-pubsub = "0.28" http = "0.2" iceberg = { workspace = true } iceberg-catalog-rest = { workspace = true } @@ -100,8 +100,8 @@ pg_bigdecimal = { git = "https://github.com/risingwavelabs/rust-pg_bigdecimal", postgres-openssl = "0.5.0" prometheus = { version = "0.13", features = ["process"] } prost = { workspace = true, features = ["no-recursion-limit"] } -prost-reflect = "0.13" -prost-types = "0.12" +prost-reflect = "0.14" +prost-types = "0.13" protobuf-native = "0.2.2" pulsar = { version = "6.3", default-features = false, features = [ "tokio-runtime", @@ -172,7 +172,7 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [ ] } tokio-postgres = { version = "0.7", features = ["with-uuid-1"] } tokio-retry = "0.3" -tokio-stream = "0.1" +tokio-stream = { workspace = true } tokio-util = { workspace = true, features = ["codec", "io"] } tonic = { workspace = true } tracing = "0.1" diff --git a/src/error/src/tonic.rs b/src/error/src/tonic.rs index 4e3476c460fd6..f17b6b8ea9d44 100644 --- a/src/error/src/tonic.rs +++ b/src/error/src/tonic.rs @@ -244,7 +244,7 @@ mod tests { }; let server_status = original.to_status(tonic::Code::Internal, "test"); - let body = server_status.to_http(); + let body = server_status.into_http(); let client_status = tonic::Status::from_header_map(body.headers()).unwrap(); let wrapper = TonicStatusWrapper::new(client_status); diff --git a/src/expr/impl/Cargo.toml b/src/expr/impl/Cargo.toml index a9c955f91dcca..e493037c200b7 100644 --- a/src/expr/impl/Cargo.toml +++ b/src/expr/impl/Cargo.toml @@ -44,7 +44,7 @@ educe = "0.6" fancy-regex = "0.13" futures-async-stream = { workspace = true } futures-util = "0.3" -ginepro = "0.7" +ginepro = "0.7" # TODO(http-bump): bump to 0.8 once arrow-udf switches to tonic 0.12 hex = "0.4" icelake = { workspace = true } itertools = { workspace = true } @@ -71,7 +71,7 @@ sql-json-path = { version = "0.1", features = ["jsonbb"] } thiserror = "1" thiserror-ext = { workspace = true } tokio = { version = "0.2", package = "madsim-tokio", features = ["time"] } -tonic = { version = "0.10", optional = true } +tonic = { version = "0.10", optional = true } # TODO(http-bump): bump once arrow-udf switches to tonic 0.12 tracing = "0.1" zstd = { version = "0.13", default-features = false, optional = true } diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index 89d29e076a38f..3a95eab660b09 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -93,7 +93,7 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [ "signal", "fs", ] } -tokio-stream = "0.1" +tokio-stream = { workspace = true } tonic = { workspace = true } tracing = "0.1" uuid = "1" diff --git a/src/frontend/src/catalog/source_catalog.rs b/src/frontend/src/catalog/source_catalog.rs index 8e64a6db4e2b9..f453a89204501 100644 --- a/src/frontend/src/catalog/source_catalog.rs +++ b/src/frontend/src/catalog/source_catalog.rs @@ -114,12 +114,9 @@ impl From<&PbSource> for SourceCatalog { let owner = prost.owner; let watermark_descs = prost.get_watermark_descs().clone(); - let associated_table_id = prost - .optional_associated_table_id - .clone() - .map(|id| match id { - OptionalAssociatedTableId::AssociatedTableId(id) => id, - }); + let associated_table_id = prost.optional_associated_table_id.map(|id| match id { + OptionalAssociatedTableId::AssociatedTableId(id) => id, + }); let version = prost.version; let connection_id = prost.connection_id; diff --git a/src/frontend/src/catalog/system_catalog/mod.rs b/src/frontend/src/catalog/system_catalog/mod.rs index 5e32dcb7b2aba..432266d1871a9 100644 --- a/src/frontend/src/catalog/system_catalog/mod.rs +++ b/src/frontend/src/catalog/system_catalog/mod.rs @@ -240,7 +240,7 @@ fn get_acl_items( ) -> String { let mut res = String::from("{"); let mut empty_flag = true; - let super_privilege = available_prost_privilege(object.clone(), for_dml_table); + let super_privilege = available_prost_privilege(*object, for_dml_table); for user in users { let privileges = if user.is_super { vec![&super_privilege] diff --git a/src/frontend/src/observer/observer_manager.rs b/src/frontend/src/observer/observer_manager.rs index 80f6316f9ca90..b49abcb023429 100644 --- a/src/frontend/src/observer/observer_manager.rs +++ b/src/frontend/src/observer/observer_manager.rs @@ -472,8 +472,7 @@ impl FrontendObserverNode { match info { Info::HummockSnapshot(hummock_snapshot) => match resp.operation() { Operation::Update => { - self.hummock_snapshot_manager - .update(hummock_snapshot.clone()); + self.hummock_snapshot_manager.update(*hummock_snapshot); } _ => panic!("receive an unsupported notify {:?}", resp), }, diff --git a/src/frontend/src/scheduler/distributed/stage.rs b/src/frontend/src/scheduler/distributed/stage.rs index a6c76c78bf23d..e30dfa0dad377 100644 --- a/src/frontend/src/scheduler/distributed/stage.rs +++ b/src/frontend/src/scheduler/distributed/stage.rs @@ -196,7 +196,7 @@ impl StageExecution { match cur_state { Pending { msg_sender } => { let runner = StageRunner { - epoch: self.epoch.clone(), + epoch: self.epoch, stage: self.stage.clone(), worker_node_manager: self.worker_node_manager.clone(), tasks: self.tasks.clone(), @@ -649,7 +649,7 @@ impl StageRunner { &plan_node, &task_id, self.ctx.to_batch_task_context(), - self.epoch.clone(), + self.epoch, shutdown_rx.clone(), ); @@ -935,7 +935,7 @@ impl StageRunner { let t_id = task_id.task_id; let stream_status: Fuse> = compute_client - .create_task(task_id, plan_fragment, self.epoch.clone(), expr_context) + .create_task(task_id, plan_fragment, self.epoch, expr_context) .await .inspect_err(|_| self.mask_failed_serving_worker(&worker)) .map_err(|e| anyhow!(e))? diff --git a/src/frontend/src/scheduler/snapshot.rs b/src/frontend/src/scheduler/snapshot.rs index 9d13573e03a7b..73a3ade5799b5 100644 --- a/src/frontend/src/scheduler/snapshot.rs +++ b/src/frontend/src/scheduler/snapshot.rs @@ -121,7 +121,7 @@ impl PinnedSnapshot { impl Drop for PinnedSnapshot { fn drop(&mut self) { - let _ = self.unpin_sender.send(Operation::Unpin(self.value.clone())); + let _ = self.unpin_sender.send(Operation::Unpin(self.value)); } } @@ -202,9 +202,7 @@ impl HummockSnapshotManager { false } else { // First tell the worker that a new snapshot is going to be pinned. - self.worker_sender - .send(Operation::Pin(snapshot.clone())) - .unwrap(); + self.worker_sender.send(Operation::Pin(snapshot)).unwrap(); // Then set the latest snapshot. *old_snapshot = Arc::new(PinnedSnapshot { value: snapshot, diff --git a/src/meta/Cargo.toml b/src/meta/Cargo.toml index b35ce9e73d96b..4511e9f61d894 100644 --- a/src/meta/Cargo.toml +++ b/src/meta/Cargo.toml @@ -36,7 +36,7 @@ flate2 = "1" function_name = "0.3.0" futures = { version = "0.3", default-features = false, features = ["alloc"] } hex = "0.4" -hyper = "0.14" # required by tonic +http = "1" itertools = { workspace = true } jsonbb = { workspace = true } maplit = "1.0.2" @@ -81,7 +81,7 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [ "signal", ] } tokio-retry = "0.3" -tokio-stream = { version = "0.1", features = ["net"] } +tokio-stream = { workspace = true } tonic = { workspace = true } tower = { version = "0.4", features = ["util", "load-shed"] } tracing = "0.1" diff --git a/src/meta/node/src/server.rs b/src/meta/node/src/server.rs index 5c72b39bcd156..5d1b3570e1ce3 100644 --- a/src/meta/node/src/server.rs +++ b/src/meta/node/src/server.rs @@ -772,7 +772,7 @@ pub async fn start_service_as_election_leader( risingwave_pb::meta::event_log::Event::MetaNodeStart(event), ]); - let server = tonic::transport::Server::builder() + let server_builder = tonic::transport::Server::builder() .layer(MetricsMiddlewareLayer::new(meta_metrics)) .layer(TracingExtractLayer::new()) .add_service(HeartbeatServiceServer::new(heartbeat_srv)) @@ -794,17 +794,19 @@ pub async fn start_service_as_election_leader( .add_service(ServingServiceServer::new(serving_srv)) .add_service(CloudServiceServer::new(cloud_srv)) .add_service(SinkCoordinationServiceServer::new(sink_coordination_srv)) - .add_service(EventLogServiceServer::new(event_log_srv)) - .add_service(TraceServiceServer::new(trace_srv)) - .monitored_serve_with_shutdown( - address_info.listen_addr, - "grpc-meta-leader-service", - TcpConfig { - tcp_nodelay: true, - keepalive_duration: None, - }, - shutdown.clone().cancelled_owned(), - ); + .add_service(EventLogServiceServer::new(event_log_srv)); + #[cfg(not(madsim))] // `otlp-embedded` does not use madsim-patched tonic + let server_builder = server_builder.add_service(TraceServiceServer::new(trace_srv)); + + let server = server_builder.monitored_serve_with_shutdown( + address_info.listen_addr, + "grpc-meta-leader-service", + TcpConfig { + tcp_nodelay: true, + keepalive_duration: None, + }, + shutdown.clone().cancelled_owned(), + ); started::set(); let _server_handle = tokio::spawn(server); diff --git a/src/meta/service/Cargo.toml b/src/meta/service/Cargo.toml index 1e3330a2b53a0..69986f8570234 100644 --- a/src/meta/service/Cargo.toml +++ b/src/meta/service/Cargo.toml @@ -40,7 +40,7 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [ "time", "signal", ] } -tokio-stream = { version = "0.1", features = ["net"] } +tokio-stream = { workspace = true } tonic = { workspace = true } tracing = "0.1" diff --git a/src/meta/service/src/ddl_service.rs b/src/meta/service/src/ddl_service.rs index 81b90d0f2005a..cd94e7c3af862 100644 --- a/src/meta/service/src/ddl_service.rs +++ b/src/meta/service/src/ddl_service.rs @@ -912,7 +912,7 @@ impl DdlService for DdlServiceImpl { let req = request.into_inner(); let table_id = req.get_table_id(); - let parallelism = req.get_parallelism()?.clone(); + let parallelism = *req.get_parallelism()?; let deferred = req.get_deferred(); self.ddl_controller diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index b8b829ab9cddb..48e8fcbbc05c1 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -688,7 +688,7 @@ impl GlobalBarrierManager { r#type: node.r#type, host: node.host.clone(), parallelism: node.parallelism, - property: node.property.clone(), + property: node.property, resource: node.resource.clone(), ..Default::default() }, diff --git a/src/meta/src/controller/cluster.rs b/src/meta/src/controller/cluster.rs index 8df302380a64a..d80ffb54e66a4 100644 --- a/src/meta/src/controller/cluster.rs +++ b/src/meta/src/controller/cluster.rs @@ -938,7 +938,7 @@ mod tests { .add_worker( PbWorkerType::ComputeNode, host.clone(), - property.clone(), + property, PbResource::default(), ) .await?, @@ -970,7 +970,7 @@ mod tests { ); // re-register existing worker node with larger parallelism and change its serving mode. - let mut new_property = property.clone(); + let mut new_property = property; new_property.worker_node_parallelism = (parallelism_num * 2) as _; new_property.is_serving = false; cluster_ctl @@ -1025,7 +1025,7 @@ mod tests { .add_worker( PbWorkerType::ComputeNode, host.clone(), - property.clone(), + property, PbResource::default(), ) .await?; diff --git a/src/meta/src/hummock/manager/commit_epoch.rs b/src/meta/src/hummock/manager/commit_epoch.rs index 0f42c17751377..c7c2057fcc96e 100644 --- a/src/meta/src/hummock/manager/commit_epoch.rs +++ b/src/meta/src/hummock/manager/commit_epoch.rs @@ -377,7 +377,7 @@ impl HummockManager { committed_epoch: epoch, current_epoch: epoch, }; - let prev_snapshot = self.latest_snapshot.swap(snapshot.clone().into()); + let prev_snapshot = self.latest_snapshot.swap(snapshot.into()); assert!(prev_snapshot.committed_epoch < epoch); assert!(prev_snapshot.current_epoch < epoch); diff --git a/src/meta/src/hummock/model/pinned_snapshot.rs b/src/meta/src/hummock/model/pinned_snapshot.rs index f485d9dab7211..fd009e22b789f 100644 --- a/src/meta/src/hummock/model/pinned_snapshot.rs +++ b/src/meta/src/hummock/model/pinned_snapshot.rs @@ -28,7 +28,7 @@ impl MetadataModel for HummockPinnedSnapshot { } fn to_protobuf(&self) -> Self::PbType { - self.clone() + *self } fn from_protobuf(prost: Self::PbType) -> Self { diff --git a/src/meta/src/hummock/model/pinned_version.rs b/src/meta/src/hummock/model/pinned_version.rs index e8f6b2e65e75e..b2e7d97501b2b 100644 --- a/src/meta/src/hummock/model/pinned_version.rs +++ b/src/meta/src/hummock/model/pinned_version.rs @@ -28,7 +28,7 @@ impl MetadataModel for HummockPinnedVersion { } fn to_protobuf(&self) -> Self::PbType { - self.clone() + *self } fn from_protobuf(prost: Self::PbType) -> Self { diff --git a/src/meta/src/manager/catalog/user.rs b/src/meta/src/manager/catalog/user.rs index f6e2f9e03e835..81181b0fc1e17 100644 --- a/src/meta/src/manager/catalog/user.rs +++ b/src/meta/src/manager/catalog/user.rs @@ -234,7 +234,7 @@ mod tests { .grant_privilege( &[test_sub_user_id], &[make_privilege( - object.clone(), + object, &[Action::Select, Action::Update, Action::Delete], true, )], @@ -249,7 +249,7 @@ mod tests { .grant_privilege( &[test_user_id], &[make_privilege( - object.clone(), + object, &[Action::Select, Action::Insert], false, )], @@ -258,7 +258,7 @@ mod tests { .await?; let user = catalog_manager.get_user(test_user_id).await?; assert_eq!(user.grant_privileges.len(), 1); - assert_eq!(user.grant_privileges[0].object, Some(object.clone())); + assert_eq!(user.grant_privileges[0].object, Some(object)); assert_eq!(user.grant_privileges[0].action_with_opts.len(), 2); assert!(user.grant_privileges[0] .action_with_opts @@ -269,7 +269,7 @@ mod tests { .grant_privilege( &[test_sub_user_id], &[make_privilege( - object.clone(), + object, &[Action::Select, Action::Insert], true, )], @@ -284,7 +284,7 @@ mod tests { .grant_privilege( &[test_user_id], &[make_privilege( - object.clone(), + object, &[Action::Select, Action::Insert], true, )], @@ -293,7 +293,7 @@ mod tests { .await?; let user = catalog_manager.get_user(test_user_id).await?; assert_eq!(user.grant_privileges.len(), 1); - assert_eq!(user.grant_privileges[0].object, Some(object.clone())); + assert_eq!(user.grant_privileges[0].object, Some(object)); assert_eq!(user.grant_privileges[0].action_with_opts.len(), 2); assert!(user.grant_privileges[0] .action_with_opts @@ -304,7 +304,7 @@ mod tests { .grant_privilege( &[test_sub_user_id], &[make_privilege( - object.clone(), + object, &[Action::Select, Action::Insert], true, )], @@ -319,7 +319,7 @@ mod tests { .grant_privilege( &[test_user_id], &[make_privilege( - object.clone(), + object, &[Action::Select, Action::Update, Action::Delete], true, )], @@ -328,7 +328,7 @@ mod tests { .await?; let user = catalog_manager.get_user(test_user_id).await?; assert_eq!(user.grant_privileges.len(), 1); - assert_eq!(user.grant_privileges[0].object, Some(object.clone())); + assert_eq!(user.grant_privileges[0].object, Some(object)); assert_eq!(user.grant_privileges[0].action_with_opts.len(), 4); assert!(user.grant_privileges[0] .action_with_opts @@ -339,7 +339,7 @@ mod tests { let res = catalog_manager .revoke_privilege( &[test_user_id], - &[make_privilege(object.clone(), &[Action::Connect], false)], + &[make_privilege(object, &[Action::Connect], false)], 0, test_sub_user_id, true, @@ -355,11 +355,7 @@ mod tests { let res = catalog_manager .revoke_privilege( &[test_user_id], - &[make_privilege( - other_object.clone(), - &[Action::Connect], - false, - )], + &[make_privilege(other_object, &[Action::Connect], false)], 0, test_sub_user_id, true, @@ -376,7 +372,7 @@ mod tests { .revoke_privilege( &[test_user_id], &[make_privilege( - object.clone(), + object, &[ Action::Select, Action::Insert, @@ -401,7 +397,7 @@ mod tests { .revoke_privilege( &[test_user_id], &[make_privilege( - object.clone(), + object, &[ Action::Select, Action::Insert, @@ -429,7 +425,7 @@ mod tests { .revoke_privilege( &[test_user_id], &[make_privilege( - object.clone(), + object, &[Action::Select, Action::Insert, Action::Delete], false, )], diff --git a/src/meta/src/rpc/intercept.rs b/src/meta/src/rpc/intercept.rs index 8b5bb67f30943..87151e06b88a1 100644 --- a/src/meta/src/rpc/intercept.rs +++ b/src/meta/src/rpc/intercept.rs @@ -16,7 +16,7 @@ use std::sync::Arc; use std::task::{Context, Poll}; use futures::Future; -use hyper::Body; +use tonic::body::BoxBody; use tower::{Layer, Service}; use crate::rpc::metrics::MetaMetrics; @@ -49,9 +49,9 @@ pub struct MetricsMiddleware { metrics: Arc, } -impl Service> for MetricsMiddleware +impl Service> for MetricsMiddleware where - S: Service> + Clone + Send + 'static, + S: Service> + Clone + Send + 'static, S::Future: Send + 'static, { type Error = S::Error; @@ -63,7 +63,7 @@ where self.inner.poll_ready(cx) } - fn call(&mut self, req: hyper::Request) -> Self::Future { + fn call(&mut self, req: http::Request) -> Self::Future { // This is necessary because tonic internally uses `tower::buffer::Buffer`. // See https://github.com/tower-rs/tower/issues/547#issuecomment-767629149 // for details on why this is necessary diff --git a/src/object_store/Cargo.toml b/src/object_store/Cargo.toml index e821c2fc85090..fa433a30abcb4 100644 --- a/src/object_store/Cargo.toml +++ b/src/object_store/Cargo.toml @@ -26,7 +26,7 @@ crc32fast = "1" either = "1" fail = "0.5" futures = { version = "0.3", default-features = false, features = ["alloc"] } -hyper = { version = "0.14", features = ["tcp", "client"] } # required by aws sdk +hyper = { version = "0.14", features = ["tcp", "client"] } # TODO(http-bump): required by aws sdk hyper-rustls = { version = "0.24.2", features = ["webpki-roots"] } hyper-tls = "0.5.0" itertools = { workspace = true } diff --git a/src/prost/build.rs b/src/prost/build.rs index aa2b8a706b690..0682a63a02edb 100644 --- a/src/prost/build.rs +++ b/src/prost/build.rs @@ -132,7 +132,7 @@ fn main() -> Result<(), Box> { ) .type_attribute("plan_common.GeneratedColumnDesc", "#[derive(Eq, Hash)]") .type_attribute("plan_common.DefaultColumnDesc", "#[derive(Eq, Hash)]") - .type_attribute("plan_common.Cardinality", "#[derive(Eq, Hash, Copy)]") + .type_attribute("plan_common.Cardinality", "#[derive(Eq, Hash)]") .type_attribute("plan_common.ExternalTableDesc", "#[derive(Eq, Hash)]") .type_attribute("plan_common.ColumnDesc", "#[derive(Eq, Hash)]") .type_attribute("plan_common.AdditionalColumn", "#[derive(Eq, Hash)]") diff --git a/src/prost/src/lib.rs b/src/prost/src/lib.rs index c7b8ef27cd748..5bb2609c81593 100644 --- a/src/prost/src/lib.rs +++ b/src/prost/src/lib.rs @@ -19,6 +19,7 @@ use std::str::FromStr; +pub use prost::Message; use risingwave_error::tonic::ToTonicStatus; use thiserror::Error; diff --git a/src/risedevtool/Cargo.toml b/src/risedevtool/Cargo.toml index b8a2ca9db14f9..71c2662024dbb 100644 --- a/src/risedevtool/Cargo.toml +++ b/src/risedevtool/Cargo.toml @@ -23,7 +23,7 @@ clap = { workspace = true } console = "0.15" fs-err = "2.11.0" glob = "0.3" -google-cloud-pubsub = "0.25" +google-cloud-pubsub = "0.28" indicatif = "0.17" itertools = { workspace = true } rdkafka = { workspace = true } diff --git a/src/rpc_client/Cargo.toml b/src/rpc_client/Cargo.toml index 37064df273ed0..49729c6d9e8ac 100644 --- a/src/rpc_client/Cargo.toml +++ b/src/rpc_client/Cargo.toml @@ -19,8 +19,8 @@ async-trait = "0.1" easy-ext = "1" either = "1.13.0" futures = { version = "0.3", default-features = false, features = ["alloc"] } -http = "0.2" -hyper = "0.14" # required by tonic +http = "1" +hyper = "1" itertools = { workspace = true } lru = { workspace = true } moka = { version = "0.12", features = ["future"] } @@ -43,7 +43,7 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [ "signal", ] } tokio-retry = "0.3" -tokio-stream = "0.1" +tokio-stream = { workspace = true } tonic = { workspace = true } tower = "0.4" tracing = "0.1" diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index b18acb55aac69..906261799b13d 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -245,7 +245,7 @@ impl MetaClient { .add_worker_node(AddWorkerNodeRequest { worker_type: worker_type as i32, host: Some(addr.to_protobuf()), - property: Some(property.clone()), + property: Some(property), resource: Some(risingwave_pb::common::worker_node::Resource { rw_version: RW_VERSION.to_string(), total_memory_bytes: system_memory_available_bytes() as _, diff --git a/src/rpc_client/src/tracing.rs b/src/rpc_client/src/tracing.rs index 50c98007bb9fd..aab07d43225d4 100644 --- a/src/rpc_client/src/tracing.rs +++ b/src/rpc_client/src/tracing.rs @@ -16,46 +16,22 @@ use std::task::{Context, Poll}; use futures::Future; use risingwave_common::util::tracing::TracingContext; -use tower::{Layer, Service}; - -/// A layer that decorates the inner service with [`TracingInject`]. -#[derive(Clone, Default)] -pub struct TracingInjectLayer { - _private: (), -} - -impl TracingInjectLayer { - #[allow(dead_code)] - pub fn new() -> Self { - Self::default() - } -} - -impl Layer for TracingInjectLayer { - type Service = TracingInject; - - fn layer(&self, service: S) -> Self::Service { - TracingInject { inner: service } - } -} +use tonic::body::BoxBody; +use tower::Service; /// A service wrapper that injects the [`TracingContext`] obtained from the current tracing span /// into the HTTP headers of the request. /// /// See also `TracingExtract` in the `common_service` crate. #[derive(Clone, Debug)] -pub struct TracingInject { - inner: S, +pub struct TracingInjectChannel { + inner: tonic::transport::Channel, } -impl Service> for TracingInject -where - S: Service> + Clone + Send + 'static, - S::Future: Send + 'static, - B: hyper::body::HttpBody, // tonic `Channel` uses `BoxBody` instead of `hyper::Body` -{ - type Error = S::Error; - type Response = S::Response; +#[cfg(not(madsim))] +impl Service> for TracingInjectChannel { + type Error = tonic::transport::Error; + type Response = http::Response; type Future = impl Future>; @@ -63,7 +39,7 @@ where self.inner.poll_ready(cx) } - fn call(&mut self, mut req: hyper::Request) -> Self::Future { + fn call(&mut self, mut req: http::Request) -> Self::Future { // This is necessary because tonic internally uses `tower::buffer::Buffer`. // See https://github.com/tower-rs/tower/issues/547#issuecomment-767629149 // for details on why this is necessary @@ -81,21 +57,21 @@ where /// A wrapper around tonic's `Channel` that injects the [`TracingContext`] obtained from the current /// tracing span when making gRPC requests. #[cfg(not(madsim))] -pub type Channel = TracingInject; +pub type Channel = TracingInjectChannel; #[cfg(madsim)] pub type Channel = tonic::transport::Channel; -/// An extension trait for tonic's `Channel` that wraps it in a [`TracingInject`] service. +/// An extension trait for tonic's `Channel` that wraps it into a [`TracingInjectChannel`]. #[easy_ext::ext(TracingInjectedChannelExt)] impl tonic::transport::Channel { - /// Wraps the channel in a [`TracingInject`] service, so that the [`TracingContext`] obtained + /// Wraps the channel into a [`TracingInjectChannel`], so that the [`TracingContext`] obtained /// from the current tracing span is injected into the HTTP headers of the request. /// /// The server can then extract the [`TracingContext`] from the HTTP headers with the /// `TracingExtract` middleware. pub fn tracing_injected(self) -> Channel { #[cfg(not(madsim))] - return TracingInject { inner: self }; + return TracingInjectChannel { inner: self }; #[cfg(madsim)] return self; } diff --git a/src/storage/backup/src/lib.rs b/src/storage/backup/src/lib.rs index 4c53af38eef67..8dfba1b62a181 100644 --- a/src/storage/backup/src/lib.rs +++ b/src/storage/backup/src/lib.rs @@ -122,7 +122,7 @@ impl From<&MetaSnapshotMetadata> for PbMetaSnapshotMetadata { state_table_info: m .state_table_info .iter() - .map(|(t, i)| (t.table_id, i.clone())) + .map(|(t, i)| (t.table_id, *i)) .collect(), rw_version: m.rw_version.clone(), } diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index d194f4355aa2b..1ee4fe0443783 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -85,7 +85,7 @@ pub fn summarize_group_deltas(group_deltas: &GroupDeltas) -> GroupDeltasSummary } GroupDelta::GroupDestroy(destroy_delta) => { assert!(group_destroy.is_none()); - group_destroy = Some(destroy_delta.clone()); + group_destroy = Some(*destroy_delta); } GroupDelta::GroupMetaChange(meta_delta) => { group_meta_changes.push(meta_delta.clone()); diff --git a/src/storage/hummock_sdk/src/time_travel.rs b/src/storage/hummock_sdk/src/time_travel.rs index 1fbd26ed3485b..5894ed3e4a6e9 100644 --- a/src/storage/hummock_sdk/src/time_travel.rs +++ b/src/storage/hummock_sdk/src/time_travel.rs @@ -338,7 +338,7 @@ impl IncompleteHummockVersionDelta { state_table_info_delta: self .state_table_info_delta .iter() - .map(|(table_id, delta)| (table_id.table_id, delta.clone())) + .map(|(table_id, delta)| (table_id.table_id, *delta)) .collect(), } } diff --git a/src/storage/hummock_sdk/src/version.rs b/src/storage/hummock_sdk/src/version.rs index 4ce5b89138f8e..e418250f0b6bf 100644 --- a/src/storage/hummock_sdk/src/version.rs +++ b/src/storage/hummock_sdk/src/version.rs @@ -77,7 +77,7 @@ impl HummockVersionStateTableInfo { pub fn from_protobuf(state_table_info: &HashMap) -> Self { let state_table_info = state_table_info .iter() - .map(|(table_id, info)| (TableId::new(*table_id), info.clone())) + .map(|(table_id, info)| (TableId::new(*table_id), *info)) .collect(); let compaction_group_member_tables = Self::build_compaction_group_member_tables(&state_table_info); @@ -90,7 +90,7 @@ impl HummockVersionStateTableInfo { pub fn to_protobuf(&self) -> HashMap { self.state_table_info .iter() - .map(|(table_id, info)| (table_id.table_id, info.clone())) + .map(|(table_id, info)| (table_id.table_id, *info)) .collect() } @@ -642,7 +642,7 @@ impl From<&PbHummockVersionDelta> for HummockVersionDelta { state_table_info_delta: pb_version_delta .state_table_info_delta .iter() - .map(|(table_id, delta)| (TableId::new(*table_id), delta.clone())) + .map(|(table_id, delta)| (TableId::new(*table_id), *delta)) .collect(), } } @@ -679,7 +679,7 @@ impl From<&HummockVersionDelta> for PbHummockVersionDelta { state_table_info_delta: version_delta .state_table_info_delta .iter() - .map(|(table_id, delta)| (table_id.table_id, delta.clone())) + .map(|(table_id, delta)| (table_id.table_id, *delta)) .collect(), } } @@ -716,7 +716,7 @@ impl From for PbHummockVersionDelta { state_table_info_delta: version_delta .state_table_info_delta .into_iter() - .map(|(table_id, delta)| (table_id.table_id, delta.clone())) + .map(|(table_id, delta)| (table_id.table_id, delta)) .collect(), } } @@ -761,7 +761,7 @@ impl From for HummockVersionDelta { state_table_info_delta: pb_version_delta .state_table_info_delta .iter() - .map(|(table_id, delta)| (TableId::new(*table_id), delta.clone())) + .map(|(table_id, delta)| (TableId::new(*table_id), *delta)) .collect(), } } @@ -938,7 +938,7 @@ impl From<&GroupDelta> for PbGroupDelta { delta_type: Some(PbDeltaType::GroupConstruct(pb_group_construct.clone())), }, GroupDelta::GroupDestroy(pb_group_destroy) => PbGroupDelta { - delta_type: Some(PbDeltaType::GroupDestroy(pb_group_destroy.clone())), + delta_type: Some(PbDeltaType::GroupDestroy(*pb_group_destroy)), }, GroupDelta::GroupMetaChange(pb_group_meta_change) => PbGroupDelta { delta_type: Some(PbDeltaType::GroupMetaChange(pb_group_meta_change.clone())), @@ -960,7 +960,7 @@ impl From<&PbGroupDelta> for GroupDelta { GroupDelta::GroupConstruct(pb_group_construct.clone()) } Some(PbDeltaType::GroupDestroy(pb_group_destroy)) => { - GroupDelta::GroupDestroy(pb_group_destroy.clone()) + GroupDelta::GroupDestroy(*pb_group_destroy) } Some(PbDeltaType::GroupMetaChange(pb_group_meta_change)) => { GroupDelta::GroupMetaChange(pb_group_meta_change.clone()) diff --git a/src/stream/Cargo.toml b/src/stream/Cargo.toml index 3c85092a4d677..de25cf8439be1 100644 --- a/src/stream/Cargo.toml +++ b/src/stream/Cargo.toml @@ -79,7 +79,7 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [ ] } tokio-metrics = "0.3.0" tokio-retry = "0.3" -tokio-stream = "0.1" +tokio-stream = { workspace = true } tonic = { workspace = true } tracing = "0.1" diff --git a/src/tests/simulation/Cargo.toml b/src/tests/simulation/Cargo.toml index 6d881f203f670..143d0f0c01c75 100644 --- a/src/tests/simulation/Cargo.toml +++ b/src/tests/simulation/Cargo.toml @@ -55,7 +55,7 @@ tempfile = "3" tikv-jemallocator = { workspace = true } tokio = { version = "0.2", package = "madsim-tokio" } tokio-postgres = "0.7" -tokio-stream = "0.1" +tokio-stream = { workspace = true } tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/src/tests/state_cleaning_test/Cargo.toml b/src/tests/state_cleaning_test/Cargo.toml index a105360a68f6b..6c12898343951 100644 --- a/src/tests/state_cleaning_test/Cargo.toml +++ b/src/tests/state_cleaning_test/Cargo.toml @@ -24,7 +24,7 @@ serde = { version = "1", features = ["derive"] } serde_with = "3" tokio = { version = "0.2", package = "madsim-tokio" } tokio-postgres = "0.7" -tokio-stream = { version = "0.1", features = ["fs"] } +tokio-stream = { workspace = true } toml = "0.8" tracing = "0.1" diff --git a/src/utils/runtime/Cargo.toml b/src/utils/runtime/Cargo.toml index 0815a005df5b5..ff2902e7a4b4c 100644 --- a/src/utils/runtime/Cargo.toml +++ b/src/utils/runtime/Cargo.toml @@ -17,7 +17,7 @@ normal = ["workspace-hack"] [dependencies] await-tree = { workspace = true } console = "0.15" -console-subscriber = "0.3.0" +console-subscriber = "0.4" either = "1" futures = { version = "0.3", default-features = false, features = ["alloc"] } hostname = "0.4"