From 2ceab7598183e2c4c8de4f5a6272c5dcfb122b89 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Thu, 16 May 2024 11:46:51 +0900 Subject: [PATCH] Using a runtime with no lifo slot. (#4771) As spotted by @Pseitz, the unstealable lifo-slot is causing the doc processor and the indexer to run on the same thread. For the moment this is disabled by default and only apply to the indexer blocking runtime. --- quickwit/.cargo/config.toml | 2 ++ quickwit/quickwit-common/src/runtimes.rs | 8 +++++++- quickwit/quickwit-serve/src/lib.rs | 15 ++++----------- .../src/object_storage/s3_compatible_storage.rs | 8 ++------ 4 files changed, 15 insertions(+), 18 deletions(-) create mode 100644 quickwit/.cargo/config.toml diff --git a/quickwit/.cargo/config.toml b/quickwit/.cargo/config.toml new file mode 100644 index 00000000000..bff29e6e175 --- /dev/null +++ b/quickwit/.cargo/config.toml @@ -0,0 +1,2 @@ +[build] +rustflags = ["--cfg", "tokio_unstable"] diff --git a/quickwit/quickwit-common/src/runtimes.rs b/quickwit/quickwit-common/src/runtimes.rs index f1061d284f4..64884d72419 100644 --- a/quickwit/quickwit-common/src/runtimes.rs +++ b/quickwit/quickwit-common/src/runtimes.rs @@ -90,7 +90,13 @@ impl Default for RuntimesConfig { fn start_runtimes(config: RuntimesConfig) -> HashMap { let mut runtimes = HashMap::with_capacity(2); - let blocking_runtime = tokio::runtime::Builder::new_multi_thread() + let disable_lifo_slot: bool = crate::get_from_env("QW_DISABLE_TOKIO_LIFO_SLOT", false); + + let mut blocking_runtime_builder = tokio::runtime::Builder::new_multi_thread(); + if disable_lifo_slot { + blocking_runtime_builder.disable_lifo_slot(); + } + let blocking_runtime = blocking_runtime_builder .worker_threads(config.num_threads_blocking) .thread_name_fn(|| { static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0); diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index f5235db275c..800a675c27c 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -143,17 +143,10 @@ pub fn do_nothing_env_filter_reload_fn() -> EnvFilterReloadFn { } fn get_metastore_client_max_concurrency() -> usize { - std::env::var(METASTORE_CLIENT_MAX_CONCURRENCY_ENV_KEY).ok() - .and_then(|metastore_client_max_concurrency_str| { - if let Ok(metastore_client_max_concurrency) = metastore_client_max_concurrency_str.parse::() { - info!("overriding max concurrent metastore requests to {metastore_client_max_concurrency}"); - Some(metastore_client_max_concurrency) - } else { - error!("failed to parse environment variable `{METASTORE_CLIENT_MAX_CONCURRENCY_ENV_KEY}={metastore_client_max_concurrency_str}`"); - None - } - }) - .unwrap_or(DEFAULT_METASTORE_CLIENT_MAX_CONCURRENCY) + quickwit_common::get_from_env( + METASTORE_CLIENT_MAX_CONCURRENCY_ENV_KEY, + DEFAULT_METASTORE_CLIENT_MAX_CONCURRENCY, + ) } static CP_GRPC_CLIENT_METRICS_LAYER: Lazy = diff --git a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs index f43e2a4ae42..3f912799a53 100644 --- a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs +++ b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs @@ -22,7 +22,7 @@ use std::ops::Range; use std::path::{Path, PathBuf}; use std::pin::Pin; use std::task::{Context, Poll}; -use std::{env, fmt, io}; +use std::{fmt, io}; use anyhow::{anyhow, Context as AnyhhowContext}; use async_trait::async_trait; @@ -59,11 +59,7 @@ use crate::{ /// Semaphore to limit the number of concurent requests to the object store. Some object stores /// (R2, SeaweedFs...) return errors when too many concurrent requests are emitted. static REQUEST_SEMAPHORE: Lazy = Lazy::new(|| { - let num_permits: usize = env::var("QW_S3_MAX_CONCURRENCY") - .as_deref() - .unwrap_or("10000") - .parse() - .expect("QW_S3_MAX_CONCURRENCY value should be a number."); + let num_permits: usize = quickwit_common::get_from_env("QW_S3_MAX_CONCURRENCY", 10_000usize); Semaphore::new(num_permits) });