diff --git a/quickwit/.cargo/config.toml b/quickwit/.cargo/config.toml new file mode 100644 index 00000000000..bff29e6e175 --- /dev/null +++ b/quickwit/.cargo/config.toml @@ -0,0 +1,2 @@ +[build] +rustflags = ["--cfg", "tokio_unstable"] diff --git a/quickwit/quickwit-common/src/runtimes.rs b/quickwit/quickwit-common/src/runtimes.rs index f1061d284f4..64884d72419 100644 --- a/quickwit/quickwit-common/src/runtimes.rs +++ b/quickwit/quickwit-common/src/runtimes.rs @@ -90,7 +90,13 @@ impl Default for RuntimesConfig { fn start_runtimes(config: RuntimesConfig) -> HashMap { let mut runtimes = HashMap::with_capacity(2); - let blocking_runtime = tokio::runtime::Builder::new_multi_thread() + let disable_lifo_slot: bool = crate::get_from_env("QW_DISABLE_TOKIO_LIFO_SLOT", false); + + let mut blocking_runtime_builder = tokio::runtime::Builder::new_multi_thread(); + if disable_lifo_slot { + blocking_runtime_builder.disable_lifo_slot(); + } + let blocking_runtime = blocking_runtime_builder .worker_threads(config.num_threads_blocking) .thread_name_fn(|| { static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0); diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index f5235db275c..800a675c27c 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -143,17 +143,10 @@ pub fn do_nothing_env_filter_reload_fn() -> EnvFilterReloadFn { } fn get_metastore_client_max_concurrency() -> usize { - std::env::var(METASTORE_CLIENT_MAX_CONCURRENCY_ENV_KEY).ok() - .and_then(|metastore_client_max_concurrency_str| { - if let Ok(metastore_client_max_concurrency) = metastore_client_max_concurrency_str.parse::() { - info!("overriding max concurrent metastore requests to {metastore_client_max_concurrency}"); - Some(metastore_client_max_concurrency) - } else { - error!("failed to parse environment variable `{METASTORE_CLIENT_MAX_CONCURRENCY_ENV_KEY}={metastore_client_max_concurrency_str}`"); - None - } - }) - .unwrap_or(DEFAULT_METASTORE_CLIENT_MAX_CONCURRENCY) + quickwit_common::get_from_env( + METASTORE_CLIENT_MAX_CONCURRENCY_ENV_KEY, + DEFAULT_METASTORE_CLIENT_MAX_CONCURRENCY, + ) } static CP_GRPC_CLIENT_METRICS_LAYER: Lazy = diff --git a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs index f43e2a4ae42..3f912799a53 100644 --- a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs +++ b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs @@ -22,7 +22,7 @@ use std::ops::Range; use std::path::{Path, PathBuf}; use std::pin::Pin; use std::task::{Context, Poll}; -use std::{env, fmt, io}; +use std::{fmt, io}; use anyhow::{anyhow, Context as AnyhhowContext}; use async_trait::async_trait; @@ -59,11 +59,7 @@ use crate::{ /// Semaphore to limit the number of concurent requests to the object store. Some object stores /// (R2, SeaweedFs...) return errors when too many concurrent requests are emitted. static REQUEST_SEMAPHORE: Lazy = Lazy::new(|| { - let num_permits: usize = env::var("QW_S3_MAX_CONCURRENCY") - .as_deref() - .unwrap_or("10000") - .parse() - .expect("QW_S3_MAX_CONCURRENCY value should be a number."); + let num_permits: usize = quickwit_common::get_from_env("QW_S3_MAX_CONCURRENCY", 10_000usize); Semaphore::new(num_permits) });