From aec19b97cdb3b28b88ee3ed592caaf111f5329e4 Mon Sep 17 00:00:00 2001 From: Adrien Guillo Date: Fri, 22 Mar 2024 10:19:36 -0400 Subject: [PATCH] Track bytes in-flight in ingester `persist` and `replicate` methods --- quickwit/Cargo.lock | 178 +++++++++--------- quickwit/quickwit-common/src/metrics.rs | 4 + .../quickwit-ingest/src/ingest_v2/ingester.rs | 14 ++ quickwit/quickwit-ingest/src/ingest_v2/mod.rs | 8 +- .../src/ingest_v2/replication.rs | 5 + .../quickwit-ingest/src/ingest_v2/router.rs | 1 + .../quickwit-proto/src/ingest/ingester.rs | 10 + quickwit/quickwit-proto/src/ingest/mod.rs | 2 +- .../src/elasticsearch_api/bulk_v2.rs | 6 +- 9 files changed, 135 insertions(+), 93 deletions(-) diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 937f9a90735..43d2cc070a3 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -80,9 +80,9 @@ dependencies = [ [[package]] name = "aho-corasick" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b2969dcb958b36655471fc61f7e416fa76033bdd4bfed0678d8fee1e2d07a1f0" +checksum = "8e60d3430d3a69478ad0993f19238d2df97c507009a52b3c10addcd7f6bcb916" dependencies = [ "memchr", ] @@ -302,18 +302,18 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ "proc-macro2", "quote", - "syn 2.0.52", + "syn 2.0.53", ] [[package]] name = "async-trait" -version = "0.1.77" +version = "0.1.78" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c980ee35e870bd1a4d2c8294d4c04d0499e67bca1e4b5cefcc693c2fa00caea9" +checksum = "461abc97219de0eaaf81fe3ef974a540158f3d079c2ab200f891f1a2ef201e85" dependencies = [ "proc-macro2", "quote", - "syn 2.0.52", + "syn 2.0.53", ] [[package]] @@ -898,9 +898,9 @@ dependencies = [ [[package]] name = "backtrace" -version = "0.3.69" +version = "0.3.70" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2089b7e3f35b9dd2d0ed921ead4f6d318c27680d4a5bd167b3ee120edb105837" +checksum = "95d8e92cac0961e91dbd517496b00f7e9b92363dbe6d42c3198268323798860c" dependencies = [ "addr2line", "cc", @@ -983,9 +983,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bitflags" -version = "2.4.2" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed570934406eb16438a4e976b1b4500774099c13b8cb96eec99f620f05090ddf" +checksum = "cf4b9d6a944f767f8e5e0db018570623c85f3d925ac718db4e06d0187adb21c1" dependencies = [ "serde", ] @@ -1049,7 +1049,7 @@ dependencies = [ "proc-macro-crate 3.1.0", "proc-macro2", "quote", - "syn 2.0.52", + "syn 2.0.53", "syn_derive", ] @@ -1344,9 +1344,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.2" +version = "4.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b230ab84b0ffdf890d5a10abdbc8b83ae1c4918275daea1ab8801f71536b2651" +checksum = "949626d00e063efc93b6dca932419ceb5432f99769911c0b995f7e884c778813" dependencies = [ "clap_builder", ] @@ -1380,13 +1380,12 @@ dependencies = [ [[package]] name = "coarsetime" -version = "0.1.33" +version = "0.1.34" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "71367d3385c716342014ad17e3d19f7788ae514885a1f4c24f500260fb365e1a" +checksum = "13b3839cf01bb7960114be3ccf2340f541b6d0c81f8690b007b2b39f750f7e5d" dependencies = [ "libc", - "once_cell", - "wasi 0.11.0+wasi-snapshot-preview1", + "wasix", "wasm-bindgen", ] @@ -1769,7 +1768,7 @@ dependencies = [ "proc-macro2", "quote", "strsim 0.10.0", - "syn 2.0.52", + "syn 2.0.53", ] [[package]] @@ -1791,7 +1790,7 @@ checksum = "a668eda54683121533a393014d8692171709ff57a7d61f187b6e782719f8933f" dependencies = [ "darling_core 0.20.8", "quote", - "syn 2.0.52", + "syn 2.0.53", ] [[package]] @@ -2142,7 +2141,7 @@ checksum = "03cdc46ec28bd728e67540c528013c6a10eb69a02eb31078a1bda695438cbfb8" dependencies = [ "proc-macro2", "quote", - "syn 2.0.52", + "syn 2.0.53", ] [[package]] @@ -2347,7 +2346,7 @@ version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "57b1e34e369d7f0151309821497440bd0266b86c77ccd69717c3b67e5eaeffe4" dependencies = [ - "rustix 0.38.31", + "rustix 0.38.32", "windows-sys 0.52.0", ] @@ -2439,7 +2438,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", - "syn 2.0.52", + "syn 2.0.53", ] [[package]] @@ -2717,9 +2716,9 @@ dependencies = [ [[package]] name = "h2" -version = "0.3.24" +version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb2c4422095b67ee78da96fbb51a4cc413b3b25883c7717ff7ca1ab31022c9c9" +checksum = "4fbd2820c5e49886948654ab546d0688ff24530286bdcf8fca3cefb16d4618eb" dependencies = [ "bytes", "fnv", @@ -3208,7 +3207,7 @@ checksum = "0122b7114117e64a63ac49f752a5ca4624d534c7b1c7de796ac196381cd2d947" dependencies = [ "proc-macro2", "quote", - "syn 2.0.52", + "syn 2.0.53", ] [[package]] @@ -3349,9 +3348,9 @@ dependencies = [ [[package]] name = "jsonwebtoken" -version = "9.2.0" +version = "9.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c7ea04a7c5c055c175f189b6dc6ba036fd62306b58c66c9f6389036c503a3f4" +checksum = "b9ae10193d25051e74945f1ea2d0b42e03cc3b890f7e4cc5faa44997d808193f" dependencies = [ "base64 0.21.7", "js-sys", @@ -3517,7 +3516,7 @@ version = "0.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "85c833ca1e66078851dba29046874e38f08b2c883700aa29a03ddd3b23814ee8" dependencies = [ - "bitflags 2.4.2", + "bitflags 2.5.0", "libc", "redox_syscall", ] @@ -3535,9 +3534,9 @@ dependencies = [ [[package]] name = "libz-sys" -version = "1.1.15" +version = "1.1.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "037731f5d3aaa87a5675e895b63ddff1a87624bc29f77004ea829809654e48f6" +checksum = "5e143b5e666b2695d28f6bca6497720813f699c9602dd7f5cac91008b8ada7f9" dependencies = [ "cc", "libc", @@ -4064,9 +4063,9 @@ dependencies = [ [[package]] name = "new_debug_unreachable" -version = "1.0.4" +version = "1.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e4a24736216ec316047a1fc4252e27dabb04218aa4a3f37c6e7ddbf1f9782b54" +checksum = "650eef8c711430f1a879fdd01d4745a7deea475becfb90269c06775983bbf086" [[package]] name = "new_string_template" @@ -4223,7 +4222,7 @@ dependencies = [ "proc-macro-crate 1.3.1", "proc-macro2", "quote", - "syn 2.0.52", + "syn 2.0.53", ] [[package]] @@ -4400,7 +4399,7 @@ version = "0.10.64" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "95a0481286a310808298130d22dd1fef0fa571e05a8f44ec801801e84b216b1f" dependencies = [ - "bitflags 2.4.2", + "bitflags 2.5.0", "cfg-if", "foreign-types", "libc", @@ -4417,7 +4416,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.52", + "syn 2.0.53", ] [[package]] @@ -4611,7 +4610,7 @@ dependencies = [ "proc-macro2", "proc-macro2-diagnostics", "quote", - "syn 2.0.52", + "syn 2.0.53", ] [[package]] @@ -4795,7 +4794,7 @@ dependencies = [ "pest_meta", "proc-macro2", "quote", - "syn 2.0.52", + "syn 2.0.53", ] [[package]] @@ -4883,7 +4882,7 @@ checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" dependencies = [ "proc-macro2", "quote", - "syn 2.0.52", + "syn 2.0.53", ] [[package]] @@ -5178,7 +5177,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a41cf62165e97c7f814d2221421dbb9afcbcdb0a88068e5ea206e19951c2cbb5" dependencies = [ "proc-macro2", - "syn 2.0.52", + "syn 2.0.53", ] [[package]] @@ -5241,9 +5240,9 @@ checksum = "af066a9c399a26e020ada66a034357a868728e72cd426f3adcd35f80d88d88c8" dependencies = [ "proc-macro2", "quote", - "syn 2.0.52", + "syn 2.0.53", "version_check", - "yansi 1.0.0", + "yansi 1.0.1", ] [[package]] @@ -5284,7 +5283,7 @@ checksum = "31b476131c3c86cb68032fdc5cb6d5a1045e3e42d96b69fa599fd77701e1f5bf" dependencies = [ "bit-set", "bit-vec", - "bitflags 2.4.2", + "bitflags 2.5.0", "lazy_static", "num-traits", "rand 0.8.5", @@ -5593,7 +5592,7 @@ dependencies = [ "prost-build", "quote", "serde", - "syn 2.0.52", + "syn 2.0.53", "tonic-build", ] @@ -6092,7 +6091,7 @@ version = "0.8.0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.52", + "syn 2.0.53", ] [[package]] @@ -6691,7 +6690,7 @@ dependencies = [ "hmac", "home", "http 0.2.12", - "jsonwebtoken 9.2.0", + "jsonwebtoken 9.3.0", "log", "percent-encoding", "rand 0.8.5", @@ -6705,9 +6704,9 @@ dependencies = [ [[package]] name = "reqwest" -version = "0.11.25" +version = "0.11.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0eea5a9eb898d3783f17c6407670e3592fd174cb81a10e51d4c37f49450b9946" +checksum = "dd67538700a17451e7cba03ac727fb961abb7607553461627b97de0b89cf4a62" dependencies = [ "base64 0.21.7", "bytes", @@ -6874,7 +6873,7 @@ dependencies = [ "proc-macro2", "quote", "rust-embed-utils", - "syn 2.0.52", + "syn 2.0.53", "walkdir", ] @@ -6951,11 +6950,11 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.31" +version = "0.38.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ea3e1a662af26cd7a3ba09c0297a31af215563ecf42817c98df621387f4e949" +checksum = "65e04861e65f21776e67888bfbea442b3642beaa0138fdb1dd7a84a52dffdb89" dependencies = [ - "bitflags 2.4.2", + "bitflags 2.5.0", "errno", "libc", "linux-raw-sys 0.4.13", @@ -7152,7 +7151,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.52", + "syn 2.0.53", "thiserror", ] @@ -7218,7 +7217,7 @@ checksum = "7eb0b34b42edc17f6b7cac84a52a1c5f0e1bb2227e997ca9011ea3dd34e8610b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.52", + "syn 2.0.53", ] [[package]] @@ -7356,7 +7355,7 @@ dependencies = [ "darling 0.20.8", "proc-macro2", "quote", - "syn 2.0.52", + "syn 2.0.53", ] [[package]] @@ -7406,7 +7405,7 @@ checksum = "b93fb4adc70021ac1b47f7d45e8cc4169baaa7ea58483bc5b721d19a26202212" dependencies = [ "proc-macro2", "quote", - "syn 2.0.52", + "syn 2.0.53", ] [[package]] @@ -7540,9 +7539,9 @@ dependencies = [ [[package]] name = "smallvec" -version = "1.13.1" +version = "1.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6ecd384b10a64542d77071bd64bd7b231f4ed5940fba55e98c3de13824cf3d7" +checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" [[package]] name = "snafu" @@ -7721,7 +7720,7 @@ checksum = "1ed31390216d20e538e447a7a9b959e06ed9fc51c37b514b46eb758016ecd418" dependencies = [ "atoi", "base64 0.21.7", - "bitflags 2.4.2", + "bitflags 2.5.0", "byteorder", "bytes", "crc", @@ -7764,7 +7763,7 @@ checksum = "7c824eb80b894f926f89a0b9da0c7f435d27cdd35b8c655b114e58223918577e" dependencies = [ "atoi", "base64 0.21.7", - "bitflags 2.4.2", + "bitflags 2.5.0", "byteorder", "crc", "dotenvy", @@ -7895,9 +7894,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.52" +version = "2.0.53" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b699d15b36d1f02c3e7c69f8ffef53de37aefae075d8488d4ba1a7788d574a07" +checksum = "7383cd0e49fff4b6b90ca5670bfd3e9d6a733b3f90c686605aa7eec8c4996032" dependencies = [ "proc-macro2", "quote", @@ -7913,7 +7912,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn 2.0.52", + "syn 2.0.53", ] [[package]] @@ -7934,20 +7933,20 @@ dependencies = [ [[package]] name = "system-configuration" -version = "0.6.0" +version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "658bc6ee10a9b4fcf576e9b0819d95ec16f4d2c02d39fd83ac1c8789785c4a42" +checksum = "ba3a3adc5c275d719af8cb4272ea1c4a6d668a777f37e115f6d11ddbc1c8e0e7" dependencies = [ - "bitflags 2.4.2", + "bitflags 1.3.2", "core-foundation", "system-configuration-sys", ] [[package]] name = "system-configuration-sys" -version = "0.6.0" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e1d1b10ced5ca923a1fcb8d03e96b8d3268065d724548c0211415ff6ac6bac4" +checksum = "a75fb188eb626b924683e3b95e3a48e63551fcfb51949de2f06a9d91dbee93c9" dependencies = [ "core-foundation-sys", "libc", @@ -8140,7 +8139,7 @@ checksum = "85b77fafb263dd9d05cbeac119526425676db3784113aa9295c88498cbf8bff1" dependencies = [ "cfg-if", "fastrand 2.0.1", - "rustix 0.38.31", + "rustix 0.38.32", "windows-sys 0.52.0", ] @@ -8187,7 +8186,7 @@ checksum = "c61f3ba182994efc43764a46c018c347bc492c79f024e705f46567b418f6d4f7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.52", + "syn 2.0.53", ] [[package]] @@ -8352,7 +8351,7 @@ checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.52", + "syn 2.0.53", ] [[package]] @@ -8399,9 +8398,9 @@ dependencies = [ [[package]] name = "tokio-stream" -version = "0.1.14" +version = "0.1.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "397c988d37662c7dda6d2208364a706264bf3d6138b11d436cbac0ad38832842" +checksum = "267ac89e0bec6e691e5813911606935d77c476ff49024f98abcea3e7b15e37af" dependencies = [ "futures-core", "pin-project-lite", @@ -8557,7 +8556,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "61c5bb1d698276a2443e5ecfabc1008bf15a36c12e6a7176e7bf089ea9131140" dependencies = [ "async-compression", - "bitflags 2.4.2", + "bitflags 2.5.0", "bytes", "futures-core", "futures-util", @@ -8603,7 +8602,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.52", + "syn 2.0.53", ] [[package]] @@ -8745,7 +8744,7 @@ checksum = "ac73887f47b9312552aa90ef477927ff014d63d1920ca8037c6c1951eab64bb1" dependencies = [ "proc-macro2", "quote", - "syn 2.0.52", + "syn 2.0.53", ] [[package]] @@ -8852,9 +8851,9 @@ dependencies = [ [[package]] name = "unsafe-libyaml" -version = "0.2.10" +version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab4c90930b95a82d00dc9e9ac071b4991924390d46cbd0dfe566148667605e4b" +checksum = "673aac59facbab8a9007c7f6108d11f63b603f7cabff99fabf650fea5c32b861" [[package]] name = "untrusted" @@ -8941,7 +8940,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn 2.0.52", + "syn 2.0.53", ] [[package]] @@ -9167,6 +9166,15 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b8dad83b4f25e74f184f64c43b150b91efe7647395b42289f38e50566d82855b" +[[package]] +name = "wasix" +version = "0.12.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1fbb4ef9bbca0c1170e0b00dd28abc9e3b68669821600cad1caaed606583c6d" +dependencies = [ + "wasi 0.11.0+wasi-snapshot-preview1", +] + [[package]] name = "wasm-bindgen" version = "0.2.92" @@ -9188,7 +9196,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.52", + "syn 2.0.53", "wasm-bindgen-shared", ] @@ -9222,7 +9230,7 @@ checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.52", + "syn 2.0.53", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -9300,7 +9308,7 @@ dependencies = [ "either", "home", "once_cell", - "rustix 0.38.31", + "rustix 0.38.32", ] [[package]] @@ -9645,7 +9653,7 @@ checksum = "8da84f1a25939b27f6820d92aed108f83ff920fdf11a7b19366c27c4cda81d4f" dependencies = [ "libc", "linux-raw-sys 0.4.13", - "rustix 0.38.31", + "rustix 0.38.32", ] [[package]] @@ -9677,9 +9685,9 @@ checksum = "09041cd90cf85f7f8b2df60c646f853b7f535ce68f85244eb6731cf89fa498ec" [[package]] name = "yansi" -version = "1.0.0" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c2861d76f58ec8fc95708b9b1e417f7b12fd72ad33c01fa6886707092dea0d3" +checksum = "cfe53a6657fd280eaa890a3bc59152892ffa3e30101319d168b781ed6529b049" [[package]] name = "zerocopy" @@ -9698,7 +9706,7 @@ checksum = "9ce1b18ccd8e73a9321186f97e46f9f04b778851177567b1975109d26a08d2a6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.52", + "syn 2.0.53", ] [[package]] diff --git a/quickwit/quickwit-common/src/metrics.rs b/quickwit/quickwit-common/src/metrics.rs index c2950ff274b..7eed30ef197 100644 --- a/quickwit/quickwit-common/src/metrics.rs +++ b/quickwit/quickwit-common/src/metrics.rs @@ -254,6 +254,8 @@ impl Default for MemoryMetrics { pub struct InFlightDataGauges { pub rest_server: IntGauge, pub ingest_router: IntGauge, + pub ingester_persist: IntGauge, + pub ingester_replicate: IntGauge, pub wal: IntGauge, pub fetch_stream: IntGauge, pub multi_fetch_stream: IntGauge, @@ -275,6 +277,8 @@ impl Default for InFlightDataGauges { Self { rest_server: in_flight_gauge_vec.with_label_values(["rest_server"]), ingest_router: in_flight_gauge_vec.with_label_values(["ingest_router"]), + ingester_persist: in_flight_gauge_vec.with_label_values(["ingester_persist"]), + ingester_replicate: in_flight_gauge_vec.with_label_values(["ingester_replicate"]), wal: in_flight_gauge_vec.with_label_values(["wal"]), fetch_stream: in_flight_gauge_vec.with_label_values(["fetch_stream"]), multi_fetch_stream: in_flight_gauge_vec.with_label_values(["multi_fetch_stream"]), diff --git a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs index 09fd0fa7f94..753f50571a2 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs @@ -31,6 +31,7 @@ use futures::stream::FuturesUnordered; use futures::StreamExt; use mrecordlog::error::CreateQueueError; use quickwit_cluster::Cluster; +use quickwit_common::metrics::{GaugeGuard, MEMORY_METRICS}; use quickwit_common::pretty::PrettyDisplay; use quickwit_common::pubsub::{EventBroker, EventSubscriber}; use quickwit_common::rate_limiter::{RateLimiter, RateLimiterSettings}; @@ -968,6 +969,19 @@ impl IngesterService for Ingester { &mut self, persist_request: PersistRequest, ) -> IngestV2Result { + // If the request is local, the amount of memory it occupies is already + // accounted for in the router. + let request_size_bytes = persist_request + .subrequests + .iter() + .flat_map(|subrequest| match &subrequest.doc_batch { + Some(doc_batch) if doc_batch.doc_buffer.is_unique() => Some(doc_batch.num_bytes()), + _ => None, + }) + .sum::(); + let mut gauge_guard = GaugeGuard::from_gauge(&MEMORY_METRICS.in_flight.ingester_persist); + gauge_guard.add(request_size_bytes as i64); + self.persist_inner(persist_request).await } diff --git a/quickwit/quickwit-ingest/src/ingest_v2/mod.rs b/quickwit/quickwit-ingest/src/ingest_v2/mod.rs index 0f57f1e8c31..227ff5c4ab6 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/mod.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/mod.rs @@ -236,7 +236,7 @@ mod tests { let doc_batch = doc_batch_builder.build().unwrap(); assert_eq!(doc_batch.num_docs(), 2); - assert_eq!(doc_batch.num_bytes(), 13); + assert_eq!(doc_batch.num_bytes(), 21); assert_eq!(doc_batch.doc_lengths, [7, 6]); assert_eq!(doc_batch.doc_buffer, Bytes::from(&b"Hello, World!"[..])); } @@ -278,7 +278,7 @@ mod tests { .as_ref() .unwrap() .num_bytes(), - 13 + 21 ); assert_eq!( ingest_request.subrequests[0] @@ -313,7 +313,7 @@ mod tests { .as_ref() .unwrap() .num_bytes(), - 12 + 20 ); assert_eq!( ingest_request.subrequests[1] @@ -345,6 +345,6 @@ mod tests { doc_buffer: vec![0u8; 100].into(), doc_lengths: vec![10, 20, 30], }; - assert_eq!(estimate_size(&doc_batch), ByteSize(106)); + assert_eq!(estimate_size(&doc_batch), ByteSize(118)); } } diff --git a/quickwit/quickwit-ingest/src/ingest_v2/replication.rs b/quickwit/quickwit-ingest/src/ingest_v2/replication.rs index e8265e1d4f4..9a8c64813f7 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/replication.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/replication.rs @@ -23,6 +23,7 @@ use std::time::{Duration, Instant}; use bytesize::ByteSize; use futures::{Future, StreamExt}; use mrecordlog::error::CreateQueueError; +use quickwit_common::metrics::{GaugeGuard, MEMORY_METRICS}; use quickwit_common::{rate_limited_warn, ServiceStream}; use quickwit_proto::ingest::ingester::{ ack_replication_message, syn_replication_message, AckReplicationMessage, IngesterStatus, @@ -507,6 +508,10 @@ impl ReplicationTask { self.current_replication_seqno, replicate_request.replication_seqno ))); } + let request_size_bytes = replicate_request.num_bytes(); + let mut gauge_guard = GaugeGuard::from_gauge(&MEMORY_METRICS.in_flight.ingester_replicate); + gauge_guard.add(request_size_bytes as i64); + self.current_replication_seqno += 1; let commit_type = replicate_request.commit_type(); diff --git a/quickwit/quickwit-ingest/src/ingest_v2/router.rs b/quickwit/quickwit-ingest/src/ingest_v2/router.rs index fd0a0029a25..f65d9400638 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/router.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/router.rs @@ -444,6 +444,7 @@ impl IngestRouterService for IngestRouter { let mut gauge_guard = GaugeGuard::from_gauge(&MEMORY_METRICS.in_flight.ingest_router); gauge_guard.add(request_size_bytes as i64); + let _permit = self .ingest_semaphore .clone() diff --git a/quickwit/quickwit-proto/src/ingest/ingester.rs b/quickwit/quickwit-proto/src/ingest/ingester.rs index fa817b93829..a3a76c45d43 100644 --- a/quickwit/quickwit-proto/src/ingest/ingester.rs +++ b/quickwit/quickwit-proto/src/ingest/ingester.rs @@ -221,6 +221,16 @@ impl AckReplicationMessage { } } +impl ReplicateRequest { + pub fn num_bytes(&self) -> usize { + self.subrequests + .iter() + .flat_map(|subrequest| &subrequest.doc_batch) + .map(|doc_batch| doc_batch.num_bytes()) + .sum() + } +} + impl ReplicateSubrequest { pub fn shard_id(&self) -> &ShardId { self.shard_id diff --git a/quickwit/quickwit-proto/src/ingest/mod.rs b/quickwit/quickwit-proto/src/ingest/mod.rs index 7d6c3c92551..5c9829467b7 100644 --- a/quickwit/quickwit-proto/src/ingest/mod.rs +++ b/quickwit/quickwit-proto/src/ingest/mod.rs @@ -106,7 +106,7 @@ impl DocBatchV2 { } pub fn num_bytes(&self) -> usize { - self.doc_buffer.len() + self.doc_buffer.len() + self.doc_lengths.len() * 4 } pub fn num_docs(&self) -> usize { diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/bulk_v2.rs b/quickwit/quickwit-serve/src/elasticsearch_api/bulk_v2.rs index d1fccfa33cf..bcfa433694f 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/bulk_v2.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/bulk_v2.rs @@ -156,12 +156,12 @@ mod tests { assert_eq!(subrequests[0].index_id, "my-index-1"); assert_eq!(subrequests[0].source_id, INGEST_V2_SOURCE_ID); assert_eq!(subrequests[0].doc_batch.as_ref().unwrap().num_docs(), 2); - assert_eq!(subrequests[0].doc_batch.as_ref().unwrap().num_bytes(), 96); + assert_eq!(subrequests[0].doc_batch.as_ref().unwrap().num_bytes(), 104); assert_eq!(subrequests[1].index_id, "my-index-2"); assert_eq!(subrequests[1].source_id, INGEST_V2_SOURCE_ID); assert_eq!(subrequests[1].doc_batch.as_ref().unwrap().num_docs(), 1); - assert_eq!(subrequests[1].doc_batch.as_ref().unwrap().num_bytes(), 48); + assert_eq!(subrequests[1].doc_batch.as_ref().unwrap().num_bytes(), 52); Ok(IngestResponseV2 { successes: vec![ @@ -238,7 +238,7 @@ mod tests { assert_eq!(subrequest_0.index_id, "my-index-1"); assert_eq!(subrequest_0.source_id, INGEST_V2_SOURCE_ID); assert_eq!(subrequest_0.doc_batch.as_ref().unwrap().num_docs(), 1); - assert_eq!(subrequest_0.doc_batch.as_ref().unwrap().num_bytes(), 48); + assert_eq!(subrequest_0.doc_batch.as_ref().unwrap().num_bytes(), 52); Ok(IngestResponseV2 { successes: vec![IngestSuccess {