From 8d69afc196090fa4bcde2dc3a15145154e9c5919 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Fri, 10 May 2024 16:19:32 +0900 Subject: [PATCH] Added a thread pool for ingest decompression --- quickwit/Cargo.lock | 31 ++- quickwit/Cargo.toml | 230 +++++++++--------- quickwit/quickwit-common/Cargo.toml | 1 + quickwit/quickwit-common/src/lib.rs | 2 + quickwit/quickwit-common/src/metrics.rs | 22 +- quickwit/quickwit-common/src/stream_utils.rs | 2 +- quickwit/quickwit-search/src/fetch_docs.rs | 6 +- quickwit/quickwit-search/src/leaf.rs | 4 +- quickwit/quickwit-search/src/lib.rs | 10 +- quickwit/quickwit-search/src/metrics.rs | 9 +- quickwit/quickwit-search/src/root.rs | 2 +- .../quickwit-search/src/search_stream/leaf.rs | 2 +- quickwit/quickwit-search/src/thread_pool.rs | 131 ---------- 13 files changed, 168 insertions(+), 284 deletions(-) delete mode 100644 quickwit/quickwit-search/src/thread_pool.rs diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 6d96cd2a2f4..0b0e6abed17 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -4342,6 +4342,14 @@ dependencies = [ "loom", ] +[[package]] +name = "oneshot" +version = "0.1.6" +source = "git+https://github.com/fulmicoton/oneshot.git?rev=c10a3ba#c10a3ba32adc189acf68acd579ba9755075ecb4d" +dependencies = [ + "loom", +] + [[package]] name = "onig" version = "6.4.0" @@ -4667,7 +4675,7 @@ checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" [[package]] name = "ownedbytes" version = "0.7.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=1ee5f907#1ee5f90761f8890ddc94282b8397240e9eded350" +source = "git+https://github.com/quickwit-oss/tantivy/?rev=9737b5f#9737b5fae0ae56309429b28f27b3cd8cf16c2434" dependencies = [ "stable_deref_trait", ] @@ -5690,6 +5698,7 @@ dependencies = [ "prometheus", "proptest", "rand 0.8.5", + "rayon", "regex", "serde", "serde_json", @@ -5874,7 +5883,7 @@ dependencies = [ "libz-sys", "mockall", "once_cell", - "oneshot", + "oneshot 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", "openssl", "proptest", "prost", @@ -7999,7 +8008,7 @@ dependencies = [ [[package]] name = "tantivy" version = "0.22.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=1ee5f907#1ee5f90761f8890ddc94282b8397240e9eded350" +source = "git+https://github.com/quickwit-oss/tantivy/?rev=9737b5f#9737b5fae0ae56309429b28f27b3cd8cf16c2434" dependencies = [ "aho-corasick", "arc-swap", @@ -8023,7 +8032,7 @@ dependencies = [ "measure_time", "memmap2", "once_cell", - "oneshot", + "oneshot 0.1.6 (git+https://github.com/fulmicoton/oneshot.git?rev=c10a3ba)", "rayon", "regex", "rust-stemmers", @@ -8051,7 +8060,7 @@ dependencies = [ [[package]] name = "tantivy-bitpacker" version = "0.6.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=1ee5f907#1ee5f90761f8890ddc94282b8397240e9eded350" +source = "git+https://github.com/quickwit-oss/tantivy/?rev=9737b5f#9737b5fae0ae56309429b28f27b3cd8cf16c2434" dependencies = [ "bitpacking", ] @@ -8059,7 +8068,7 @@ dependencies = [ [[package]] name = "tantivy-columnar" version = "0.3.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=1ee5f907#1ee5f90761f8890ddc94282b8397240e9eded350" +source = "git+https://github.com/quickwit-oss/tantivy/?rev=9737b5f#9737b5fae0ae56309429b28f27b3cd8cf16c2434" dependencies = [ "downcast-rs", "fastdivide", @@ -8074,7 +8083,7 @@ dependencies = [ [[package]] name = "tantivy-common" version = "0.7.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=1ee5f907#1ee5f90761f8890ddc94282b8397240e9eded350" +source = "git+https://github.com/quickwit-oss/tantivy/?rev=9737b5f#9737b5fae0ae56309429b28f27b3cd8cf16c2434" dependencies = [ "async-trait", "byteorder", @@ -8097,7 +8106,7 @@ dependencies = [ [[package]] name = "tantivy-query-grammar" version = "0.22.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=1ee5f907#1ee5f90761f8890ddc94282b8397240e9eded350" +source = "git+https://github.com/quickwit-oss/tantivy/?rev=9737b5f#9737b5fae0ae56309429b28f27b3cd8cf16c2434" dependencies = [ "nom", ] @@ -8105,7 +8114,7 @@ dependencies = [ [[package]] name = "tantivy-sstable" version = "0.3.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=1ee5f907#1ee5f90761f8890ddc94282b8397240e9eded350" +source = "git+https://github.com/quickwit-oss/tantivy/?rev=9737b5f#9737b5fae0ae56309429b28f27b3cd8cf16c2434" dependencies = [ "tantivy-bitpacker", "tantivy-common", @@ -8116,7 +8125,7 @@ dependencies = [ [[package]] name = "tantivy-stacker" version = "0.3.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=1ee5f907#1ee5f90761f8890ddc94282b8397240e9eded350" +source = "git+https://github.com/quickwit-oss/tantivy/?rev=9737b5f#9737b5fae0ae56309429b28f27b3cd8cf16c2434" dependencies = [ "murmurhash32", "rand_distr", @@ -8126,7 +8135,7 @@ dependencies = [ [[package]] name = "tantivy-tokenizer-api" version = "0.3.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=1ee5f907#1ee5f90761f8890ddc94282b8397240e9eded350" +source = "git+https://github.com/quickwit-oss/tantivy/?rev=9737b5f#9737b5fae0ae56309429b28f27b3cd8cf16c2434" dependencies = [ "serde", ] diff --git a/quickwit/Cargo.toml b/quickwit/Cargo.toml index 616c19c6545..e763ce8770e 100644 --- a/quickwit/Cargo.toml +++ b/quickwit/Cargo.toml @@ -1,72 +1,72 @@ [workspace] resolver = "2" members = [ - "quickwit-actors", - "quickwit-aws", - "quickwit-cli", - "quickwit-cluster", - "quickwit-codegen", - "quickwit-codegen/example", - "quickwit-common", - "quickwit-config", - "quickwit-control-plane", - "quickwit-index-management", - "quickwit-datetime", - "quickwit-directories", - "quickwit-doc-mapper", - "quickwit-indexing", - "quickwit-ingest", - "quickwit-integration-tests", - "quickwit-jaeger", - "quickwit-janitor", - "quickwit-lambda", - "quickwit-macros", - "quickwit-metastore", + "quickwit-actors", + "quickwit-aws", + "quickwit-cli", + "quickwit-cluster", + "quickwit-codegen", + "quickwit-codegen/example", + "quickwit-common", + "quickwit-config", + "quickwit-control-plane", + "quickwit-index-management", + "quickwit-datetime", + "quickwit-directories", + "quickwit-doc-mapper", + "quickwit-indexing", + "quickwit-ingest", + "quickwit-integration-tests", + "quickwit-jaeger", + "quickwit-janitor", + "quickwit-lambda", + "quickwit-macros", + "quickwit-metastore", - # Disabling metastore-utils from the quickwit projects to ease build/deps. - # We can reenable it when we need it. - # "quickwit-metastore-utils", - "quickwit-opentelemetry", - "quickwit-proto", - "quickwit-query", - "quickwit-rest-client", - "quickwit-search", - "quickwit-serve", - "quickwit-storage", - "quickwit-telemetry", + # Disabling metastore-utils from the quickwit projects to ease build/deps. + # We can reenable it when we need it. + # "quickwit-metastore-utils", + "quickwit-opentelemetry", + "quickwit-proto", + "quickwit-query", + "quickwit-rest-client", + "quickwit-search", + "quickwit-serve", + "quickwit-storage", + "quickwit-telemetry", ] # The following list excludes `quickwit-metastore-utils` and `quickwit-lambda` # from the default member to ease build/deps. default-members = [ - "quickwit-actors", - "quickwit-aws", - "quickwit-cli", - "quickwit-cluster", - "quickwit-codegen", - "quickwit-codegen/example", - "quickwit-common", - "quickwit-config", - "quickwit-control-plane", - "quickwit-datetime", - "quickwit-directories", - "quickwit-doc-mapper", - "quickwit-index-management", - "quickwit-indexing", - "quickwit-ingest", - "quickwit-integration-tests", - "quickwit-jaeger", - "quickwit-janitor", - "quickwit-macros", - "quickwit-metastore", - "quickwit-opentelemetry", - "quickwit-proto", - "quickwit-query", - "quickwit-rest-client", - "quickwit-search", - "quickwit-serve", - "quickwit-storage", - "quickwit-telemetry", + "quickwit-actors", + "quickwit-aws", + "quickwit-cli", + "quickwit-cluster", + "quickwit-codegen", + "quickwit-codegen/example", + "quickwit-common", + "quickwit-config", + "quickwit-control-plane", + "quickwit-datetime", + "quickwit-directories", + "quickwit-doc-mapper", + "quickwit-index-management", + "quickwit-indexing", + "quickwit-ingest", + "quickwit-integration-tests", + "quickwit-jaeger", + "quickwit-janitor", + "quickwit-macros", + "quickwit-metastore", + "quickwit-opentelemetry", + "quickwit-proto", + "quickwit-query", + "quickwit-rest-client", + "quickwit-search", + "quickwit-serve", + "quickwit-storage", + "quickwit-telemetry", ] [workspace.package] @@ -91,8 +91,8 @@ bytesize = { version = "1.3.0", features = ["serde"] } bytestring = "1.3.0" chitchat = { git = "https://github.com/quickwit-oss/chitchat.git", rev = "d039699" } chrono = { version = "0.4", default-features = false, features = [ - "clock", - "std", + "clock", + "std", ] } clap = { version = "4.5.0", features = ["env", "string"] } coarsetime = "0.1.33" @@ -124,12 +124,12 @@ http = "0.2.9" http-serde = "1.1.2" humantime = "2.1.0" hyper = { version = "0.14", features = [ - "client", - "http1", - "http2", - "server", - "stream", - "tcp", + "client", + "http1", + "http2", + "server", + "stream", + "tcp", ] } hyper-rustls = "0.24" indexmap = { version = "2.1.0", features = ["serde"] } @@ -141,12 +141,12 @@ lru = "0.12" lindera-core = "0.27.0" lindera-dictionary = "0.27.0" lindera-tokenizer = { version = "0.27.0", features = [ - "cc-cedict-compress", - "cc-cedict", - "ipadic-compress", - "ipadic", - "ko-dic-compress", - "ko-dic", + "cc-cedict-compress", + "cc-cedict", + "ipadic-compress", + "ipadic", + "ko-dic-compress", + "ko-dic", ] } matches = "0.1.9" md5 = "0.7" @@ -167,7 +167,7 @@ percent-encoding = "2.3.1" pin-project = "1.1.0" pnet = { version = "0.33.0", features = ["std"] } postcard = { version = "1.0.4", features = [ - "use-std", + "use-std", ], default-features = false } predicates = "3" prettyplease = "0.2.0" @@ -175,37 +175,37 @@ proc-macro2 = "1.0.50" prometheus = { version = "0.13", features = ["process"] } proptest = "1" prost = { version = "0.11.6", default-features = false, features = [ - "prost-derive", + "prost-derive", ] } prost-build = "0.11.6" prost-types = "0.11.6" pulsar = { git = "https://github.com/quickwit-oss/pulsar-rs.git", rev = "f9eff04", default-features = false, features = [ - "auth-oauth2", - "compression", - "tokio-runtime", + "auth-oauth2", + "compression", + "tokio-runtime", ] } quote = "1.0.23" rand = "0.8" rand_distr = "0.4" rayon = "1" rdkafka = { version = "0.33", default-features = false, features = [ - "cmake-build", - "libz", - "ssl", - "tokio", - "zstd", + "cmake-build", + "libz", + "ssl", + "tokio", + "zstd", ] } regex = "1.10.0" regex-syntax = "0.8" reqwest = { version = "0.11", default-features = false, features = [ - "json", - "rustls-tls", + "json", + "rustls-tls", ] } rust-embed = "6.8.1" sea-query = { version = "0.30" } sea-query-binder = { version = "0.5", features = [ - "runtime-tokio-rustls", - "sqlx-postgres", + "runtime-tokio-rustls", + "sqlx-postgres", ] } # ^1.0.184 due to serde-rs/serde#2538 serde = { version = "1.0.184", features = ["derive", "rc"] } @@ -216,10 +216,10 @@ serde_yaml = "0.9" siphasher = "0.3" smallvec = "1" sqlx = { version = "0.7", features = [ - "migrate", - "postgres", - "runtime-tokio-rustls", - "time", + "migrate", + "postgres", + "runtime-tokio-rustls", + "time", ] } syn = { version = "2.0.11", features = ["extra-traits", "full", "parsing"] } sync_wrapper = "0.1.2" @@ -237,19 +237,19 @@ toml = "0.7.6" tonic = { version = "0.9.0", features = ["gzip"] } tonic-build = "0.9.0" tower = { version = "0.4.13", features = [ - "balance", - "buffer", - "load", - "retry", - "util", + "balance", + "buffer", + "load", + "retry", + "util", ] } tower-http = { version = "0.4.0", features = ["compression-gzip", "cors"] } tracing = "0.1.37" tracing-opentelemetry = "0.20.0" tracing-subscriber = { version = "0.3.16", features = [ - "env-filter", - "std", - "time", + "env-filter", + "std", + "time", ] } ttl_cache = "0.5" typetag = "0.2" @@ -258,10 +258,10 @@ username = "0.2" utoipa = "4.2.0" uuid = { version = "1.8", features = ["v4", "serde"] } vrl = { version = "0.8.1", default-features = false, features = [ - "compiler", - "diagnostic", - "stdlib", - "value", + "compiler", + "diagnostic", + "stdlib", + "value", ] } warp = "0.3" whichlang = { git = "https://github.com/quickwit-oss/whichlang", rev = "fe406416" } @@ -269,24 +269,20 @@ wiremock = "0.5" zstd = "0.13.0" aws-config = "1.2" -aws-credential-types = { version = "1.2", features = [ - "hardcoded-credentials", -] } +aws-credential-types = { version = "1.2", features = ["hardcoded-credentials"] } aws-sdk-kinesis = "1.21" aws-sdk-s3 = "1.24" aws-smithy-async = "1.2" aws-smithy-runtime = "1.3" -aws-smithy-types = { version = "1.1", features = [ - "byte-stream-poll-next" -] } +aws-smithy-types = { version = "1.1", features = ["byte-stream-poll-next"] } aws-types = "1.2" azure_core = { version = "0.13.0", features = ["enable_reqwest_rustls"] } azure_storage = { version = "0.13.0", default-features = false, features = [ - "enable_reqwest_rustls", + "enable_reqwest_rustls", ] } azure_storage_blobs = { version = "0.13.0", default-features = false, features = [ - "enable_reqwest_rustls", + "enable_reqwest_rustls", ] } opendal = { version = "0.44", default-features = false } @@ -321,11 +317,11 @@ quickwit-serve = { path = "quickwit-serve" } quickwit-storage = { path = "quickwit-storage" } quickwit-telemetry = { path = "quickwit-telemetry" } -tantivy = { git = "https://github.com/quickwit-oss/tantivy/", rev = "1ee5f907", default-features = false, features = [ - "lz4-compression", - "mmap", - "quickwit", - "zstd-compression", +tantivy = { git = "https://github.com/quickwit-oss/tantivy/", rev = "9737b5f", default-features = false, features = [ + "lz4-compression", + "mmap", + "quickwit", + "zstd-compression", ] } # This is actually not used directly the goal is to fix the version diff --git a/quickwit/quickwit-common/Cargo.toml b/quickwit/quickwit-common/Cargo.toml index d6041c3b7bb..c26817e3b27 100644 --- a/quickwit/quickwit-common/Cargo.toml +++ b/quickwit/quickwit-common/Cargo.toml @@ -30,6 +30,7 @@ pin-project = { workspace = true } pnet = { workspace = true } prometheus = { workspace = true } rand = { workspace = true } +rayon = { worskpace = true } regex = { workspace = true } serde = { workspace = true } siphasher = { workspace = true } diff --git a/quickwit/quickwit-common/src/lib.rs b/quickwit/quickwit-common/src/lib.rs index 15bcd12ab85..e3a0b57a211 100644 --- a/quickwit/quickwit-common/src/lib.rs +++ b/quickwit/quickwit-common/src/lib.rs @@ -43,6 +43,7 @@ pub mod stream_utils; pub mod temp_dir; #[cfg(any(test, feature = "testsuite"))] pub mod test_utils; +mod executor; pub mod tower; pub mod type_map; pub mod uri; @@ -54,6 +55,7 @@ use std::ops::{Range, RangeInclusive}; use std::str::FromStr; pub use coolid::new_coolid; +pub use executor::Executor; pub use kill_switch::KillSwitch; pub use path_hasher::PathHasher; pub use progress::{Progress, ProtectedZoneGuard}; diff --git a/quickwit/quickwit-common/src/metrics.rs b/quickwit/quickwit-common/src/metrics.rs index 323362e65ed..30ab074ee2d 100644 --- a/quickwit/quickwit-common/src/metrics.rs +++ b/quickwit/quickwit-common/src/metrics.rs @@ -179,19 +179,19 @@ pub fn new_histogram_vec( HistogramVec { underlying } } -pub struct GaugeGuard { - gauge: &'static IntGauge, +pub struct GaugeGuard<'a> { + gauge: &'a IntGauge, delta: i64, } -impl std::fmt::Debug for GaugeGuard { +impl<'a> std::fmt::Debug for GaugeGuard<'a> { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { self.delta.fmt(f) } } -impl GaugeGuard { - pub fn from_gauge(gauge: &'static IntGauge) -> Self { +impl<'a> GaugeGuard<'a> { + pub fn from_gauge(gauge: &'a IntGauge) -> Self { Self { gauge, delta: 0i64 } } @@ -210,7 +210,7 @@ impl GaugeGuard { } } -impl Drop for GaugeGuard { +impl<'a> Drop for GaugeGuard<'a> { fn drop(&mut self) { self.gauge.sub(self.delta) } @@ -361,3 +361,13 @@ impl InFlightDataGauges { } pub static MEMORY_METRICS: Lazy = Lazy::new(MemoryMetrics::default); + +pub static ACTIVE_THREAD_COUNT: Lazy> = Lazy::new(|| { + new_gauge_vec( + "active_thread_count", + "Number of active threads in a given thread pool.", + "threads", + &[], + ["pool"] + ) +}); diff --git a/quickwit/quickwit-common/src/stream_utils.rs b/quickwit/quickwit-common/src/stream_utils.rs index edca41a8b70..d3a591099fd 100644 --- a/quickwit/quickwit-common/src/stream_utils.rs +++ b/quickwit/quickwit-common/src/stream_utils.rs @@ -233,7 +233,7 @@ where T: RpcName } } -pub struct InFlightValue(T, #[allow(dead_code)] GaugeGuard); +pub struct InFlightValue(T, #[allow(dead_code)] GaugeGuard<'static>); impl fmt::Debug for InFlightValue where T: fmt::Debug diff --git a/quickwit/quickwit-search/src/fetch_docs.rs b/quickwit/quickwit-search/src/fetch_docs.rs index a3b4cc166b3..c4411e8ea9d 100644 --- a/quickwit/quickwit-search/src/fetch_docs.rs +++ b/quickwit/quickwit-search/src/fetch_docs.rs @@ -36,7 +36,6 @@ use tracing::{error, Instrument}; use crate::leaf::open_index_with_caches; use crate::service::SearcherContext; -use crate::thread_pool::search_executor; use crate::{convert_document_to_json_string, GlobalDocAddress}; const SNIPPET_MAX_NUM_CHARS: usize = 150; @@ -185,9 +184,8 @@ async fn fetch_docs_in_split( .context("open-index-for-split")?; // we add an executor here, we could add it in open_index_with_caches, though we should verify // the side-effect before - index - .set_shared_multithread_executor(search_executor()) - .context("failed to set search pool")?; + let executor_tantivy = crate::search_executor().get_underlying_rayon_thread_pool().into(); + index.set_executor(executor_tantivy); let index_reader = index .reader_builder() // the docs are presorted so a cache size of NUM_CONCURRENT_REQUESTS is fine diff --git a/quickwit/quickwit-search/src/leaf.rs b/quickwit/quickwit-search/src/leaf.rs index f839e206bb5..1f859cdc740 100644 --- a/quickwit/quickwit-search/src/leaf.rs +++ b/quickwit/quickwit-search/src/leaf.rs @@ -378,7 +378,7 @@ async fn leaf_search_single_split( warmup(&searcher, &warmup_info).await?; let span = info_span!("tantivy_search"); - let leaf_search_response = crate::run_cpu_intensive(move || { + let leaf_search_response = crate::search_executor().run_cpu_intensive(move || { let _span_guard = span.enter(); searcher.search(&query, &quickwit_collector) }) @@ -921,7 +921,7 @@ pub async fn leaf_search( } } - crate::run_cpu_intensive(|| incremental_merge_collector.finalize().map_err(Into::into)) + crate::search_executor().run_cpu_intensive(|| incremental_merge_collector.finalize().map_err(Into::into)) .instrument(info_span!("incremental_merge_finalize")) .await .context("failed to merge split search responses")? diff --git a/quickwit/quickwit-search/src/lib.rs b/quickwit/quickwit-search/src/lib.rs index 11618ac603e..921e782a483 100644 --- a/quickwit/quickwit-search/src/lib.rs +++ b/quickwit/quickwit-search/src/lib.rs @@ -41,17 +41,24 @@ mod search_job_placer; mod search_response_rest; mod search_stream; mod service; -mod thread_pool; pub(crate) mod top_k_collector; mod metrics; + +fn search_executor() -> &'static Executor { + static SEARCH_EXECUTOR: OnceCell = OnceCell::new(); + &*SEARCH_EXECUTOR.get_or_init(|| quickwit_common::Executor::new("quickwit-search", None)) +} + #[cfg(test)] mod tests; pub use collector::QuickwitAggregations; use metrics::SEARCH_METRICS; +use once_cell::sync::OnceCell; use quickwit_common::tower::Pool; +use quickwit_common::Executor; use quickwit_doc_mapper::DocMapper; use quickwit_proto::metastore::{ ListIndexesMetadataRequest, ListSplitsRequest, MetastoreService, MetastoreServiceClient, @@ -92,7 +99,6 @@ pub use crate::search_job_placer::{Job, SearchJobPlacer}; pub use crate::search_response_rest::SearchResponseRest; pub use crate::search_stream::root_search_stream; pub use crate::service::{MockSearchService, SearchService, SearchServiceImpl}; -use crate::thread_pool::run_cpu_intensive; /// A pool of searcher clients identified by their gRPC socket address. pub type SearcherPool = Pool; diff --git a/quickwit/quickwit-search/src/metrics.rs b/quickwit/quickwit-search/src/metrics.rs index b1d89abff57..39ac1a4e890 100644 --- a/quickwit/quickwit-search/src/metrics.rs +++ b/quickwit/quickwit-search/src/metrics.rs @@ -21,13 +21,12 @@ use once_cell::sync::Lazy; use quickwit_common::metrics::{ - new_counter, new_gauge, new_histogram, Histogram, IntCounter, IntGauge, + new_counter, new_histogram, Histogram, IntCounter }; pub struct SearchMetrics { pub leaf_searches_splits_total: IntCounter, pub leaf_search_split_duration_secs: Histogram, - pub active_search_threads_count: IntGauge, } impl Default for SearchMetrics { @@ -44,12 +43,6 @@ impl Default for SearchMetrics { starts after the semaphore is obtained.", "search", ), - active_search_threads_count: new_gauge( - "active_search_threads_count", - "Number of threads in use in the CPU thread pool", - "search", - &[], - ), } } } diff --git a/quickwit/quickwit-search/src/root.rs b/quickwit/quickwit-search/src/root.rs index 9ae0e02aab3..e24c53b95b1 100644 --- a/quickwit/quickwit-search/src/root.rs +++ b/quickwit/quickwit-search/src/root.rs @@ -667,7 +667,7 @@ pub(crate) async fn search_partial_hits_phase( let leaf_search_responses: Vec> = leaf_search_responses.into_iter().map(Ok).collect_vec(); let span = info_span!("merge_fruits"); - let leaf_search_response = crate::run_cpu_intensive(move || { + let leaf_search_response = crate::search_executor().run_cpu_intensive(move || { let _span_guard = span.enter(); merge_collector.merge_fruits(leaf_search_responses) }) diff --git a/quickwit/quickwit-search/src/search_stream/leaf.rs b/quickwit/quickwit-search/src/search_stream/leaf.rs index 53c3021652c..fce1d3a4709 100644 --- a/quickwit/quickwit-search/src/search_stream/leaf.rs +++ b/quickwit/quickwit-search/src/search_stream/leaf.rs @@ -196,7 +196,7 @@ async fn leaf_search_stream_single_split( let _ = span.enter(); let m_request_fields = request_fields.clone(); - let collect_handle = crate::run_cpu_intensive(move || { + let collect_handle = crate::search_executor().run_cpu_intensive(move || { let mut buffer = Vec::new(); match m_request_fields.fast_field_types() { (Type::I64, None) => { diff --git a/quickwit/quickwit-search/src/thread_pool.rs b/quickwit/quickwit-search/src/thread_pool.rs deleted file mode 100644 index 3f4f9aba2b0..00000000000 --- a/quickwit/quickwit-search/src/thread_pool.rs +++ /dev/null @@ -1,131 +0,0 @@ -// Copyright (C) 2024 Quickwit, Inc. -// -// Quickwit is offered under the AGPL v3.0 and as commercial software. -// For commercial licensing, contact us at hello@quickwit.io. -// -// AGPL: -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -use std::fmt; -use std::sync::Arc; - -use once_cell::sync::OnceCell; -use quickwit_common::metrics::GaugeGuard; -use tantivy::Executor; -use tracing::error; - -static SEARCH_THREAD_POOL: OnceCell> = OnceCell::new(); - -fn build_executor() -> Arc { - let rayon_pool = rayon::ThreadPoolBuilder::new() - .thread_name(|thread_id| format!("quickwit-search-{thread_id}")) - .panic_handler(|_my_panic| { - error!("task running in the quickwit search pool panicked"); - }) - .build() - .expect("Failed to spawn the spawning pool"); - Arc::new(Executor::ThreadPool(rayon_pool)) -} - -pub(crate) fn search_executor() -> Arc { - SEARCH_THREAD_POOL.get_or_init(build_executor).clone() -} - -#[derive(Clone, Copy, Debug, Eq, PartialEq)] -pub struct Panicked; - -impl fmt::Display for Panicked { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "Scheduled job panicked") - } -} - -impl std::error::Error for Panicked {} - -/// Function similar to `tokio::spawn_blocking`. -/// -/// Here are two important differences however: -/// -/// 1) The task is running on a rayon thread pool managed by quickwit. -/// This pool is specifically used only to run CPU intensive work -/// and is configured to contain `num_cpus` cores. -/// -/// 2) Before the task is effectively scheduled, we check that -/// the spawner is still interested by its result. -/// -/// It is therefore required to `await` the result of this -/// function to get anywork done. -/// -/// This is nice, because it makes work that has been scheduled -/// but is not running yet "cancellable". -pub async fn run_cpu_intensive(cpu_heavy_task: F) -> Result -where - F: FnOnce() -> R + Send + 'static, - R: Send + 'static, -{ - let span = tracing::Span::current(); - search_executor() - .spawn_blocking(move || { - let _guard = span.enter(); - let mut active_thread_guard = - GaugeGuard::from_gauge(&crate::SEARCH_METRICS.active_search_threads_count); - active_thread_guard.add(1i64); - cpu_heavy_task() - }) - .await - .map_err(|_| Panicked) -} - -#[cfg(test)] -mod tests { - use std::sync::atomic::{AtomicU64, Ordering}; - use std::sync::Arc; - use std::time::Duration; - - use super::*; - - #[tokio::test] - async fn test_run_cpu_intensive() { - assert_eq!(run_cpu_intensive(|| 1).await, Ok(1)); - } - - #[tokio::test] - async fn test_run_cpu_intensive_panicks() { - assert!(run_cpu_intensive(|| panic!("")).await.is_err()); - } - - #[tokio::test] - async fn test_run_cpu_intensive_panicks_do_not_shrink_thread_pool() { - for _ in 0..100 { - assert!(run_cpu_intensive(|| panic!("")).await.is_err()); - } - } - - #[tokio::test] - async fn test_run_cpu_intensive_abort() { - let counter: Arc = Default::default(); - let mut futures = Vec::new(); - for _ in 0..1_000 { - let counter_clone = counter.clone(); - let fut = run_cpu_intensive(move || { - std::thread::sleep(Duration::from_millis(5)); - counter_clone.fetch_add(1, Ordering::SeqCst) - }); - // The first few num_cores tasks should run, but the other should get cancelled. - futures.push(tokio::time::timeout(Duration::from_millis(1), fut)); - } - futures::future::join_all(futures).await; - assert!(counter.load(Ordering::SeqCst) < 100); - } -}