From 22b04a4d44ee4917678b333c8753abbad07b4134 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Tue, 11 Jun 2024 18:21:08 +0800 Subject: [PATCH 1/2] refactor: Upgrade opendal to v0.47.0 for zero cost bytes streaming Signed-off-by: Xuanwo --- quickwit/Cargo.lock | 215 ++++++++++++---- quickwit/Cargo.toml | 233 +++++++++--------- .../src/opendal_storage/base.rs | 44 +++- 3 files changed, 322 insertions(+), 170 deletions(-) diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 32dc1f46d11..6e2838ac5be 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -670,12 +670,12 @@ dependencies = [ "http-body 0.4.6", "http-body 1.0.0", "hyper 0.14.28", - "hyper-rustls", + "hyper-rustls 0.24.2", "indexmap 2.1.0", "once_cell", "pin-project-lite", "pin-utils", - "rustls", + "rustls 0.21.12", "serde", "serde_json", "tokio", @@ -849,7 +849,7 @@ dependencies = [ "pin-project", "quick-xml 0.29.0", "rand 0.8.5", - "reqwest", + "reqwest 0.11.27", "rustc_version", "serde", "serde_json", @@ -2676,7 +2676,7 @@ dependencies = [ "google-cloud-token", "home", "jsonwebtoken 8.3.0", - "reqwest", + "reqwest 0.11.27", "serde", "serde_json", "thiserror", @@ -2719,7 +2719,7 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "96e4ad0802d3f416f62e7ce01ac1460898ee0efc98f8b45cd4aab7611607012f" dependencies = [ - "reqwest", + "reqwest 0.11.27", "thiserror", "tokio", ] @@ -3106,10 +3106,27 @@ dependencies = [ "http 0.2.12", "hyper 0.14.28", "log", - "rustls", + "rustls 0.21.12", "rustls-native-certs", "tokio", - "tokio-rustls", + "tokio-rustls 0.24.1", +] + +[[package]] +name = "hyper-rustls" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a0bea761b46ae2b24eb4aef630d8d1c398157b6fc29e6350ecf090a0b70c952c" +dependencies = [ + "futures-util", + "http 1.1.0", + "hyper 1.3.1", + "hyper-util", + "rustls 0.22.4", + "rustls-pki-types", + "tokio", + "tokio-rustls 0.25.0", + "tower-service", ] [[package]] @@ -4387,7 +4404,7 @@ dependencies = [ "getrandom 0.2.14", "http 0.2.12", "rand 0.8.5", - "reqwest", + "reqwest 0.11.27", "serde", "serde_json", "serde_path_to_error", @@ -4465,27 +4482,27 @@ checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381" [[package]] name = "opendal" -version = "0.44.2" +version = "0.47.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4af824652d4d2ffabf606d337a071677ae621b05622adf35df9562f69d9b4498" +checksum = "5c3ba698f2258bebdf7a3a38862bb6ef1f96d351627002686dacc228f805bdd6" dependencies = [ "anyhow", "async-trait", "backon", - "base64 0.21.7", + "base64 0.22.0", "bytes", "chrono", "flagset", "futures", "getrandom 0.2.14", - "http 0.2.12", + "http 1.1.0", "log", "md-5", "once_cell", "percent-encoding", - "quick-xml 0.30.0", + "quick-xml 0.31.0", "reqsign", - "reqwest", + "reqwest 0.12.4", "serde", "serde_json", "tokio", @@ -4593,7 +4610,7 @@ dependencies = [ "bytes", "http 0.2.12", "opentelemetry_api", - "reqwest", + "reqwest 0.11.27", ] [[package]] @@ -4611,7 +4628,7 @@ dependencies = [ "opentelemetry_api", "opentelemetry_sdk", "prost", - "reqwest", + "reqwest 0.11.27", "thiserror", "tokio", "tonic", @@ -5619,9 +5636,9 @@ dependencies = [ [[package]] name = "quick-xml" -version = "0.30.0" +version = "0.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eff6510e86862b57b210fd8cbe8ed3f0d7d600b9c2863cd4549a2e033c66e956" +checksum = "1004a344b30a54e2ee58d66a71b32d2db2feb0a31f9a2d302bf0536f15de2a33" dependencies = [ "memchr", "serde", @@ -5659,7 +5676,7 @@ dependencies = [ "aws-types", "futures", "hyper 0.14.28", - "hyper-rustls", + "hyper-rustls 0.24.2", "quickwit-common", "tokio", ] @@ -5700,7 +5717,7 @@ dependencies = [ "quickwit-serve", "quickwit-storage", "quickwit-telemetry", - "reqwest", + "reqwest 0.12.4", "serde_json", "tabled", "tempfile", @@ -6013,7 +6030,7 @@ dependencies = [ "quickwit-storage", "rand 0.8.5", "rdkafka", - "reqwest", + "reqwest 0.12.4", "serde", "serde_json", "tantivy", @@ -6080,7 +6097,7 @@ dependencies = [ "quickwit-rest-client", "quickwit-serve", "quickwit-storage", - "reqwest", + "reqwest 0.12.4", "serde_json", "tempfile", "tokio", @@ -6184,7 +6201,7 @@ dependencies = [ "quickwit-storage", "quickwit-telemetry", "rand 0.8.5", - "reqwest", + "reqwest 0.12.4", "serde", "serde_json", "time", @@ -6345,7 +6362,7 @@ dependencies = [ "quickwit-metastore", "quickwit-search", "quickwit-serve", - "reqwest", + "reqwest 0.12.4", "serde", "serde_json", "thiserror", @@ -6498,7 +6515,7 @@ dependencies = [ "rand 0.8.5", "regex", "reqsign", - "reqwest", + "reqwest 0.12.4", "serde", "serde_json", "tantivy", @@ -6522,7 +6539,7 @@ dependencies = [ "md5", "once_cell", "quickwit-common", - "reqwest", + "reqwest 0.12.4", "serde", "serde_json", "tokio", @@ -6777,25 +6794,25 @@ dependencies = [ [[package]] name = "reqsign" -version = "0.14.9" +version = "0.15.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43e319d9de9ff4d941abf4ac718897118b0fe04577ea3f8e0f5788971784eef5" +checksum = "70fe66d4cd0b5ed9b1abbfe639bf6baeaaf509f7da2d51b31111ba945be59286" dependencies = [ "anyhow", "async-trait", - "base64 0.21.7", + "base64 0.22.0", "chrono", "form_urlencoded", "getrandom 0.2.14", "hex", "hmac", "home", - "http 0.2.12", + "http 1.1.0", "jsonwebtoken 9.3.0", "log", "percent-encoding", "rand 0.8.5", - "reqwest", + "reqwest 0.12.4", "rsa", "serde", "serde_json", @@ -6818,7 +6835,7 @@ dependencies = [ "http 0.2.12", "http-body 0.4.6", "hyper 0.14.28", - "hyper-rustls", + "hyper-rustls 0.24.2", "hyper-tls", "ipnet", "js-sys", @@ -6828,9 +6845,9 @@ dependencies = [ "once_cell", "percent-encoding", "pin-project-lite", - "rustls", + "rustls 0.21.12", "rustls-native-certs", - "rustls-pemfile", + "rustls-pemfile 1.0.4", "serde", "serde_json", "serde_urlencoded", @@ -6838,7 +6855,7 @@ dependencies = [ "system-configuration", "tokio", "tokio-native-tls", - "tokio-rustls", + "tokio-rustls 0.24.1", "tokio-util", "tower-service", "url", @@ -6847,7 +6864,50 @@ dependencies = [ "wasm-streams", "web-sys", "webpki-roots 0.25.4", - "winreg", + "winreg 0.50.0", +] + +[[package]] +name = "reqwest" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "566cafdd92868e0939d3fb961bd0dc25fcfaaed179291093b3d43e6b3150ea10" +dependencies = [ + "base64 0.22.0", + "bytes", + "futures-core", + "futures-util", + "http 1.1.0", + "http-body 1.0.0", + "http-body-util", + "hyper 1.3.1", + "hyper-rustls 0.26.0", + "hyper-util", + "ipnet", + "js-sys", + "log", + "mime", + "once_cell", + "percent-encoding", + "pin-project-lite", + "rustls 0.22.4", + "rustls-pemfile 2.1.2", + "rustls-pki-types", + "serde", + "serde_json", + "serde_urlencoded", + "sync_wrapper", + "tokio", + "tokio-rustls 0.25.0", + "tokio-util", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "wasm-streams", + "web-sys", + "webpki-roots 0.26.2", + "winreg 0.52.0", ] [[package]] @@ -7094,6 +7154,20 @@ dependencies = [ "sct", ] +[[package]] +name = "rustls" +version = "0.22.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf4ef73721ac7bcd79b2b315da7779d8fc09718c6b3d2d1b2d94850eb8c18432" +dependencies = [ + "log", + "ring 0.17.8", + "rustls-pki-types", + "rustls-webpki 0.102.4", + "subtle", + "zeroize", +] + [[package]] name = "rustls-native-certs" version = "0.6.3" @@ -7101,7 +7175,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00" dependencies = [ "openssl-probe", - "rustls-pemfile", + "rustls-pemfile 1.0.4", "schannel", "security-framework", ] @@ -7115,6 +7189,22 @@ dependencies = [ "base64 0.21.7", ] +[[package]] +name = "rustls-pemfile" +version = "2.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29993a25686778eb88d4189742cd713c9bce943bc54251a33509dc63cbacf73d" +dependencies = [ + "base64 0.22.0", + "rustls-pki-types", +] + +[[package]] +name = "rustls-pki-types" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "976295e77ce332211c0d24d92c0e83e50f5c5f046d11082cea19f3df13a3562d" + [[package]] name = "rustls-webpki" version = "0.100.3" @@ -7135,6 +7225,17 @@ dependencies = [ "untrusted 0.9.0", ] +[[package]] +name = "rustls-webpki" +version = "0.102.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff448f7e92e913c4b7d4c6d8e4540a1724b319b4152b8aef6d4cf8339712b33e" +dependencies = [ + "ring 0.17.8", + "rustls-pki-types", + "untrusted 0.9.0", +] + [[package]] name = "rustversion" version = "1.0.15" @@ -7804,8 +7905,8 @@ dependencies = [ "once_cell", "paste", "percent-encoding", - "rustls", - "rustls-pemfile", + "rustls 0.21.12", + "rustls-pemfile 1.0.4", "serde", "serde_json", "sha2", @@ -8568,7 +8669,18 @@ version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" dependencies = [ - "rustls", + "rustls 0.21.12", + "tokio", +] + +[[package]] +name = "tokio-rustls" +version = "0.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "775e0c0f0adb3a2f22a00c4745d728b479985fc15ee7ca6a2608388c5569860f" +dependencies = [ + "rustls 0.22.4", + "rustls-pki-types", "tokio", ] @@ -8681,9 +8793,9 @@ dependencies = [ "percent-encoding", "pin-project", "prost", - "rustls-pemfile", + "rustls-pemfile 1.0.4", "tokio", - "tokio-rustls", + "tokio-rustls 0.24.1", "tokio-stream", "tower", "tower-layer", @@ -9464,6 +9576,15 @@ version = "0.25.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5f20c57d8d7db6d3b86154206ae5d8fba62dd39573114de97c2cb0578251f8e1" +[[package]] +name = "webpki-roots" +version = "0.26.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c452ad30530b54a4d8e71952716a212b08efd0f3562baa66c29a618b07da7c3" +dependencies = [ + "rustls-pki-types", +] + [[package]] name = "which" version = "4.4.2" @@ -9796,6 +9917,16 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "winreg" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a277a57398d4bfa075df44f501a17cfdf8542d224f0d36095a2adc7aee4ef0a5" +dependencies = [ + "cfg-if", + "windows-sys 0.48.0", +] + [[package]] name = "wiremock" version = "0.5.22" diff --git a/quickwit/Cargo.toml b/quickwit/Cargo.toml index cf2779e30ca..66aca27b767 100644 --- a/quickwit/Cargo.toml +++ b/quickwit/Cargo.toml @@ -1,72 +1,71 @@ [workspace] resolver = "2" members = [ - "quickwit-actors", - "quickwit-aws", - "quickwit-cli", - "quickwit-cluster", - "quickwit-codegen", - "quickwit-codegen/example", - "quickwit-common", - "quickwit-config", - "quickwit-control-plane", - "quickwit-index-management", - "quickwit-datetime", - "quickwit-directories", - "quickwit-doc-mapper", - "quickwit-indexing", - "quickwit-ingest", - "quickwit-integration-tests", - "quickwit-jaeger", - "quickwit-janitor", - "quickwit-lambda", - "quickwit-macros", - "quickwit-metastore", - - # Disabling metastore-utils from the quickwit projects to ease build/deps. - # We can reenable it when we need it. - # "quickwit-metastore-utils", - "quickwit-opentelemetry", - "quickwit-proto", - "quickwit-query", - "quickwit-rest-client", - "quickwit-search", - "quickwit-serve", - "quickwit-storage", - "quickwit-telemetry", + "quickwit-actors", + "quickwit-aws", + "quickwit-cli", + "quickwit-cluster", + "quickwit-codegen", + "quickwit-codegen/example", + "quickwit-common", + "quickwit-config", + "quickwit-control-plane", + "quickwit-index-management", + "quickwit-datetime", + "quickwit-directories", + "quickwit-doc-mapper", + "quickwit-indexing", + "quickwit-ingest", + "quickwit-integration-tests", + "quickwit-jaeger", + "quickwit-janitor", + "quickwit-lambda", + "quickwit-macros", + "quickwit-metastore", + # Disabling metastore-utils from the quickwit projects to ease build/deps. + # We can reenable it when we need it. + # "quickwit-metastore-utils", + "quickwit-opentelemetry", + "quickwit-proto", + "quickwit-query", + "quickwit-rest-client", + "quickwit-search", + "quickwit-serve", + "quickwit-storage", + "quickwit-telemetry", ] # The following list excludes `quickwit-metastore-utils` and `quickwit-lambda` # from the default member to ease build/deps. default-members = [ - "quickwit-actors", - "quickwit-aws", - "quickwit-cli", - "quickwit-cluster", - "quickwit-codegen", - "quickwit-codegen/example", - "quickwit-common", - "quickwit-config", - "quickwit-control-plane", - "quickwit-datetime", - "quickwit-directories", - "quickwit-doc-mapper", - "quickwit-index-management", - "quickwit-indexing", - "quickwit-ingest", - "quickwit-integration-tests", - "quickwit-jaeger", - "quickwit-janitor", - "quickwit-macros", - "quickwit-metastore", - "quickwit-opentelemetry", - "quickwit-proto", - "quickwit-query", - "quickwit-rest-client", - "quickwit-search", - "quickwit-serve", - "quickwit-storage", - "quickwit-telemetry", + "quickwit-actors", + "quickwit-aws", + "quickwit-cli", + "quickwit-cluster", + "quickwit-codegen", + "quickwit-codegen/example", + "quickwit-common", + "quickwit-config", + "quickwit-control-plane", + "quickwit-datetime", + "quickwit-directories", + "quickwit-doc-mapper", + "quickwit-index-management", + "quickwit-indexing", + "quickwit-ingest", + "quickwit-integration-tests", + "quickwit-jaeger", + "quickwit-janitor", + "quickwit-macros", + "quickwit-metastore", + "quickwit-opentelemetry", + "quickwit-proto", + "quickwit-query", + "quickwit-rest-client", + "quickwit-search", + "quickwit-serve", + "quickwit-storage", + "quickwit-telemetry", ] [workspace.package] @@ -91,8 +90,8 @@ bytesize = { version = "1.3.0", features = ["serde"] } bytestring = "1.3.0" chitchat = { git = "https://github.com/quickwit-oss/chitchat.git", rev = "d039699" } chrono = { version = "0.4", default-features = false, features = [ - "clock", - "std", + "clock", + "std", ] } clap = { version = "4.5.0", features = ["env", "string"] } coarsetime = "0.1.33" @@ -124,12 +123,12 @@ http = "0.2.9" http-serde = "1.1.2" humantime = "2.1.0" hyper = { version = "0.14", features = [ - "client", - "http1", - "http2", - "server", - "stream", - "tcp", + "client", + "http1", + "http2", + "server", + "stream", + "tcp", ] } hyper-rustls = "0.24" indexmap = { version = "2.1.0", features = ["serde"] } @@ -141,12 +140,12 @@ lru = "0.12" lindera-core = "0.27.0" lindera-dictionary = "0.27.0" lindera-tokenizer = { version = "0.27.0", features = [ - "cc-cedict-compress", - "cc-cedict", - "ipadic-compress", - "ipadic", - "ko-dic-compress", - "ko-dic", + "cc-cedict-compress", + "cc-cedict", + "ipadic-compress", + "ipadic", + "ko-dic-compress", + "ko-dic", ] } matches = "0.1.9" md5 = "0.7" @@ -167,7 +166,7 @@ percent-encoding = "2.3.1" pin-project = "1.1.0" pnet = { version = "0.33.0", features = ["std"] } postcard = { version = "1.0.4", features = [ - "use-std", + "use-std", ], default-features = false } predicates = "3" prettyplease = "0.2.0" @@ -176,37 +175,37 @@ proc-macro2 = "1.0.50" prometheus = { version = "0.13", features = ["process"] } proptest = "1" prost = { version = "0.11.6", default-features = false, features = [ - "prost-derive", + "prost-derive", ] } prost-build = "0.11.6" prost-types = "0.11.6" pulsar = { git = "https://github.com/quickwit-oss/pulsar-rs.git", rev = "f9eff04", default-features = false, features = [ - "auth-oauth2", - "compression", - "tokio-runtime", + "auth-oauth2", + "compression", + "tokio-runtime", ] } quote = "1.0.23" rand = "0.8" rand_distr = "0.4" rayon = "1.10" rdkafka = { version = "0.33", default-features = false, features = [ - "cmake-build", - "libz", - "ssl", - "tokio", - "zstd", + "cmake-build", + "libz", + "ssl", + "tokio", + "zstd", ] } regex = "1.10.0" regex-syntax = "0.8" -reqwest = { version = "0.11", default-features = false, features = [ - "json", - "rustls-tls", +reqwest = { version = "0.12", default-features = false, features = [ + "json", + "rustls-tls", ] } rust-embed = "6.8.1" sea-query = { version = "0.30" } sea-query-binder = { version = "0.5", features = [ - "runtime-tokio-rustls", - "sqlx-postgres", + "runtime-tokio-rustls", + "sqlx-postgres", ] } # ^1.0.184 due to serde-rs/serde#2538 serde = { version = "1.0.184", features = ["derive", "rc"] } @@ -217,10 +216,10 @@ serde_yaml = "0.9" siphasher = "0.3" smallvec = "1" sqlx = { version = "0.7", features = [ - "migrate", - "postgres", - "runtime-tokio-rustls", - "time", + "migrate", + "postgres", + "runtime-tokio-rustls", + "time", ] } syn = { version = "2.0.11", features = ["extra-traits", "full", "parsing"] } sync_wrapper = "0.1.2" @@ -239,23 +238,23 @@ toml = "0.7.6" tonic = { version = "0.9.0", features = ["gzip"] } tonic-build = "0.9.0" tower = { version = "0.4.13", features = [ - "balance", - "buffer", - "load", - "retry", - "util", + "balance", + "buffer", + "load", + "retry", + "util", ] } tower-http = { version = "0.4.0", features = [ - "compression-zstd", - "compression-gzip", - "cors", + "compression-zstd", + "compression-gzip", + "cors", ] } tracing = "0.1.37" tracing-opentelemetry = "0.20.0" tracing-subscriber = { version = "0.3.16", features = [ - "env-filter", - "std", - "time", + "env-filter", + "std", + "time", ] } ttl_cache = "0.5" typetag = "0.2" @@ -264,10 +263,10 @@ username = "0.2" utoipa = "4.2.0" uuid = { version = "1.8", features = ["v4", "serde"] } vrl = { version = "0.8.1", default-features = false, features = [ - "compiler", - "diagnostic", - "stdlib", - "value", + "compiler", + "diagnostic", + "stdlib", + "value", ] } warp = "0.3" whichlang = { git = "https://github.com/quickwit-oss/whichlang", rev = "fe406416" } @@ -285,14 +284,14 @@ aws-types = "1.2" azure_core = { version = "0.13.0", features = ["enable_reqwest_rustls"] } azure_storage = { version = "0.13.0", default-features = false, features = [ - "enable_reqwest_rustls", + "enable_reqwest_rustls", ] } azure_storage_blobs = { version = "0.13.0", default-features = false, features = [ - "enable_reqwest_rustls", + "enable_reqwest_rustls", ] } -opendal = { version = "0.44", default-features = false } -reqsign = { version = "0.14", default-features = false } +opendal = { version = "0.47", default-features = false } +reqsign = { version = "0.15", default-features = false } quickwit-actors = { path = "quickwit-actors" } quickwit-aws = { path = "quickwit-aws" } @@ -324,10 +323,10 @@ quickwit-storage = { path = "quickwit-storage" } quickwit-telemetry = { path = "quickwit-telemetry" } tantivy = { git = "https://github.com/quickwit-oss/tantivy/", rev = "08b9fc0", default-features = false, features = [ - "lz4-compression", - "mmap", - "quickwit", - "zstd-compression", + "lz4-compression", + "mmap", + "quickwit", + "zstd-compression", ] } # This is actually not used directly the goal is to fix the version diff --git a/quickwit/quickwit-storage/src/opendal_storage/base.rs b/quickwit/quickwit-storage/src/opendal_storage/base.rs index 7dd10b9bde4..25cfa05da11 100644 --- a/quickwit/quickwit-storage/src/opendal_storage/base.rs +++ b/quickwit/quickwit-storage/src/opendal_storage/base.rs @@ -23,9 +23,11 @@ use std::path::Path; use async_trait::async_trait; use bytesize::ByteSize; +use futures::SinkExt; use opendal::Operator; use quickwit_common::uri::Uri; use tokio::io::{AsyncRead, AsyncWriteExt}; +use tokio_util::compat::FuturesAsyncReadCompatExt; use crate::storage::SendableAsync; use crate::{ @@ -78,22 +80,36 @@ impl Storage for OpendalStorage { /// If the payload is small enough, we can call `op.write()` at once. async fn put(&self, path: &Path, payload: Box) -> StorageResult<()> { let path = path.as_os_str().to_string_lossy(); - let mut payload_reader = payload.byte_stream().await?.into_async_read(); + let mut payload_stream = payload.byte_stream().await?; - let mut storage_writer = self + let mut storage_sink = self .op .writer_with(&path) - .buffer(ByteSize::mb(8).as_u64() as usize) - .await?; - tokio::io::copy(&mut payload_reader, &mut storage_writer).await?; - storage_writer.close().await?; - + .chunk(ByteSize::mb(8).as_u64() as usize) + .await? + .into_bytes_sink(); + + // Use bytes stream and sink to avoid extra memory copy. + while let Some(bs) = payload_stream + .try_next() + .await + .map_err(|err| StorageErrorKind::Io.with_error(err))? + { + storage_sink.feed(bs).await?; + } + storage_sink.close().await?; Ok(()) } async fn copy_to(&self, path: &Path, output: &mut dyn SendableAsync) -> StorageResult<()> { let path = path.as_os_str().to_string_lossy(); - let mut storage_reader = self.op.reader(&path).await?; + let mut storage_reader = self + .op + .reader(&path) + .await? + .into_futures_async_read(..) + .await? + .compat(); tokio::io::copy(&mut storage_reader, output).await?; output.flush().await?; Ok(()) @@ -102,7 +118,7 @@ impl Storage for OpendalStorage { async fn get_slice(&self, path: &Path, range: Range) -> StorageResult { let path = path.as_os_str().to_string_lossy(); let range = range.start as u64..range.end as u64; - let storage_content = self.op.read_with(&path).range(range).await?; + let storage_content = self.op.read_with(&path).range(range).await?.to_vec(); Ok(OwnedBytes::new(storage_content)) } @@ -114,14 +130,20 @@ impl Storage for OpendalStorage { ) -> StorageResult> { let path = path.as_os_str().to_string_lossy(); let range = range.start as u64..range.end as u64; - let storage_reader = self.op.reader_with(&path).range(range).await?; + let storage_reader = self + .op + .reader_with(&path) + .await? + .into_futures_async_read(range) + .await? + .compat(); Ok(Box::new(storage_reader)) } async fn get_all(&self, path: &Path) -> StorageResult { let path = path.as_os_str().to_string_lossy(); - let storage_content = self.op.read(&path).await?; + let storage_content = self.op.read(&path).await?.to_vec(); Ok(OwnedBytes::new(storage_content)) } From f79917b900da93e13b2402a7b011fdb56a39337c Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Tue, 11 Jun 2024 18:24:37 +0800 Subject: [PATCH 2/2] Revert unrelated cargo toml changes Signed-off-by: Xuanwo --- quickwit/Cargo.toml | 227 ++++++++++++++++++++++---------------------- 1 file changed, 114 insertions(+), 113 deletions(-) diff --git a/quickwit/Cargo.toml b/quickwit/Cargo.toml index 66aca27b767..e07a2dd3071 100644 --- a/quickwit/Cargo.toml +++ b/quickwit/Cargo.toml @@ -1,71 +1,72 @@ [workspace] resolver = "2" members = [ - "quickwit-actors", - "quickwit-aws", - "quickwit-cli", - "quickwit-cluster", - "quickwit-codegen", - "quickwit-codegen/example", - "quickwit-common", - "quickwit-config", - "quickwit-control-plane", - "quickwit-index-management", - "quickwit-datetime", - "quickwit-directories", - "quickwit-doc-mapper", - "quickwit-indexing", - "quickwit-ingest", - "quickwit-integration-tests", - "quickwit-jaeger", - "quickwit-janitor", - "quickwit-lambda", - "quickwit-macros", - "quickwit-metastore", - # Disabling metastore-utils from the quickwit projects to ease build/deps. - # We can reenable it when we need it. - # "quickwit-metastore-utils", - "quickwit-opentelemetry", - "quickwit-proto", - "quickwit-query", - "quickwit-rest-client", - "quickwit-search", - "quickwit-serve", - "quickwit-storage", - "quickwit-telemetry", + "quickwit-actors", + "quickwit-aws", + "quickwit-cli", + "quickwit-cluster", + "quickwit-codegen", + "quickwit-codegen/example", + "quickwit-common", + "quickwit-config", + "quickwit-control-plane", + "quickwit-index-management", + "quickwit-datetime", + "quickwit-directories", + "quickwit-doc-mapper", + "quickwit-indexing", + "quickwit-ingest", + "quickwit-integration-tests", + "quickwit-jaeger", + "quickwit-janitor", + "quickwit-lambda", + "quickwit-macros", + "quickwit-metastore", + + # Disabling metastore-utils from the quickwit projects to ease build/deps. + # We can reenable it when we need it. + # "quickwit-metastore-utils", + "quickwit-opentelemetry", + "quickwit-proto", + "quickwit-query", + "quickwit-rest-client", + "quickwit-search", + "quickwit-serve", + "quickwit-storage", + "quickwit-telemetry", ] # The following list excludes `quickwit-metastore-utils` and `quickwit-lambda` # from the default member to ease build/deps. default-members = [ - "quickwit-actors", - "quickwit-aws", - "quickwit-cli", - "quickwit-cluster", - "quickwit-codegen", - "quickwit-codegen/example", - "quickwit-common", - "quickwit-config", - "quickwit-control-plane", - "quickwit-datetime", - "quickwit-directories", - "quickwit-doc-mapper", - "quickwit-index-management", - "quickwit-indexing", - "quickwit-ingest", - "quickwit-integration-tests", - "quickwit-jaeger", - "quickwit-janitor", - "quickwit-macros", - "quickwit-metastore", - "quickwit-opentelemetry", - "quickwit-proto", - "quickwit-query", - "quickwit-rest-client", - "quickwit-search", - "quickwit-serve", - "quickwit-storage", - "quickwit-telemetry", + "quickwit-actors", + "quickwit-aws", + "quickwit-cli", + "quickwit-cluster", + "quickwit-codegen", + "quickwit-codegen/example", + "quickwit-common", + "quickwit-config", + "quickwit-control-plane", + "quickwit-datetime", + "quickwit-directories", + "quickwit-doc-mapper", + "quickwit-index-management", + "quickwit-indexing", + "quickwit-ingest", + "quickwit-integration-tests", + "quickwit-jaeger", + "quickwit-janitor", + "quickwit-macros", + "quickwit-metastore", + "quickwit-opentelemetry", + "quickwit-proto", + "quickwit-query", + "quickwit-rest-client", + "quickwit-search", + "quickwit-serve", + "quickwit-storage", + "quickwit-telemetry", ] [workspace.package] @@ -90,8 +91,8 @@ bytesize = { version = "1.3.0", features = ["serde"] } bytestring = "1.3.0" chitchat = { git = "https://github.com/quickwit-oss/chitchat.git", rev = "d039699" } chrono = { version = "0.4", default-features = false, features = [ - "clock", - "std", + "clock", + "std", ] } clap = { version = "4.5.0", features = ["env", "string"] } coarsetime = "0.1.33" @@ -123,12 +124,12 @@ http = "0.2.9" http-serde = "1.1.2" humantime = "2.1.0" hyper = { version = "0.14", features = [ - "client", - "http1", - "http2", - "server", - "stream", - "tcp", + "client", + "http1", + "http2", + "server", + "stream", + "tcp", ] } hyper-rustls = "0.24" indexmap = { version = "2.1.0", features = ["serde"] } @@ -140,12 +141,12 @@ lru = "0.12" lindera-core = "0.27.0" lindera-dictionary = "0.27.0" lindera-tokenizer = { version = "0.27.0", features = [ - "cc-cedict-compress", - "cc-cedict", - "ipadic-compress", - "ipadic", - "ko-dic-compress", - "ko-dic", + "cc-cedict-compress", + "cc-cedict", + "ipadic-compress", + "ipadic", + "ko-dic-compress", + "ko-dic", ] } matches = "0.1.9" md5 = "0.7" @@ -166,7 +167,7 @@ percent-encoding = "2.3.1" pin-project = "1.1.0" pnet = { version = "0.33.0", features = ["std"] } postcard = { version = "1.0.4", features = [ - "use-std", + "use-std", ], default-features = false } predicates = "3" prettyplease = "0.2.0" @@ -175,37 +176,37 @@ proc-macro2 = "1.0.50" prometheus = { version = "0.13", features = ["process"] } proptest = "1" prost = { version = "0.11.6", default-features = false, features = [ - "prost-derive", + "prost-derive", ] } prost-build = "0.11.6" prost-types = "0.11.6" pulsar = { git = "https://github.com/quickwit-oss/pulsar-rs.git", rev = "f9eff04", default-features = false, features = [ - "auth-oauth2", - "compression", - "tokio-runtime", + "auth-oauth2", + "compression", + "tokio-runtime", ] } quote = "1.0.23" rand = "0.8" rand_distr = "0.4" rayon = "1.10" rdkafka = { version = "0.33", default-features = false, features = [ - "cmake-build", - "libz", - "ssl", - "tokio", - "zstd", + "cmake-build", + "libz", + "ssl", + "tokio", + "zstd", ] } regex = "1.10.0" regex-syntax = "0.8" reqwest = { version = "0.12", default-features = false, features = [ - "json", - "rustls-tls", + "json", + "rustls-tls", ] } rust-embed = "6.8.1" sea-query = { version = "0.30" } sea-query-binder = { version = "0.5", features = [ - "runtime-tokio-rustls", - "sqlx-postgres", + "runtime-tokio-rustls", + "sqlx-postgres", ] } # ^1.0.184 due to serde-rs/serde#2538 serde = { version = "1.0.184", features = ["derive", "rc"] } @@ -216,10 +217,10 @@ serde_yaml = "0.9" siphasher = "0.3" smallvec = "1" sqlx = { version = "0.7", features = [ - "migrate", - "postgres", - "runtime-tokio-rustls", - "time", + "migrate", + "postgres", + "runtime-tokio-rustls", + "time", ] } syn = { version = "2.0.11", features = ["extra-traits", "full", "parsing"] } sync_wrapper = "0.1.2" @@ -238,23 +239,23 @@ toml = "0.7.6" tonic = { version = "0.9.0", features = ["gzip"] } tonic-build = "0.9.0" tower = { version = "0.4.13", features = [ - "balance", - "buffer", - "load", - "retry", - "util", + "balance", + "buffer", + "load", + "retry", + "util", ] } tower-http = { version = "0.4.0", features = [ - "compression-zstd", - "compression-gzip", - "cors", + "compression-zstd", + "compression-gzip", + "cors", ] } tracing = "0.1.37" tracing-opentelemetry = "0.20.0" tracing-subscriber = { version = "0.3.16", features = [ - "env-filter", - "std", - "time", + "env-filter", + "std", + "time", ] } ttl_cache = "0.5" typetag = "0.2" @@ -263,10 +264,10 @@ username = "0.2" utoipa = "4.2.0" uuid = { version = "1.8", features = ["v4", "serde"] } vrl = { version = "0.8.1", default-features = false, features = [ - "compiler", - "diagnostic", - "stdlib", - "value", + "compiler", + "diagnostic", + "stdlib", + "value", ] } warp = "0.3" whichlang = { git = "https://github.com/quickwit-oss/whichlang", rev = "fe406416" } @@ -284,10 +285,10 @@ aws-types = "1.2" azure_core = { version = "0.13.0", features = ["enable_reqwest_rustls"] } azure_storage = { version = "0.13.0", default-features = false, features = [ - "enable_reqwest_rustls", + "enable_reqwest_rustls", ] } azure_storage_blobs = { version = "0.13.0", default-features = false, features = [ - "enable_reqwest_rustls", + "enable_reqwest_rustls", ] } opendal = { version = "0.47", default-features = false } @@ -323,10 +324,10 @@ quickwit-storage = { path = "quickwit-storage" } quickwit-telemetry = { path = "quickwit-telemetry" } tantivy = { git = "https://github.com/quickwit-oss/tantivy/", rev = "08b9fc0", default-features = false, features = [ - "lz4-compression", - "mmap", - "quickwit", - "zstd-compression", + "lz4-compression", + "mmap", + "quickwit", + "zstd-compression", ] } # This is actually not used directly the goal is to fix the version