diff --git a/quickwit/quickwit-common/src/shared_consts.rs b/quickwit/quickwit-common/src/shared_consts.rs
index 3a47ab1effc..f888854b700 100644
--- a/quickwit/quickwit-common/src/shared_consts.rs
+++ b/quickwit/quickwit-common/src/shared_consts.rs
@@ -19,6 +19,8 @@
use std::time::Duration;
+use bytesize::ByteSize;
+
/// Field name reserved for storing the dynamically indexed fields.
pub const FIELD_PRESENCE_FIELD_NAME: &str = "_field_presence";
@@ -41,3 +43,8 @@ pub const INGESTER_PRIMARY_SHARDS_PREFIX: &str = "ingester.primary_shards:";
/// File name for the encoded list of fields in the split
pub const SPLIT_FIELDS_FILE_NAME: &str = "split_fields";
+
+pub const DEFAULT_SHARD_THROUGHPUT_LIMIT: ByteSize = ByteSize::mib(5);
+
+// (Just a reexport).
+pub use bytesize::MIB;
diff --git a/quickwit/quickwit-config/src/cluster_config/mod.rs b/quickwit/quickwit-config/src/cluster_config/mod.rs
index 6dda71921ee..eac26d7e3b6 100644
--- a/quickwit/quickwit-config/src/cluster_config/mod.rs
+++ b/quickwit/quickwit-config/src/cluster_config/mod.rs
@@ -17,6 +17,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see .
+use bytesize::ByteSize;
use quickwit_common::uri::Uri;
/// An embryo of a cluster config.
@@ -27,6 +28,7 @@ pub struct ClusterConfig {
pub auto_create_indexes: bool,
pub default_index_root_uri: Uri,
pub replication_factor: usize,
+ pub shard_throughput_limit: ByteSize,
}
impl ClusterConfig {
@@ -37,6 +39,7 @@ impl ClusterConfig {
auto_create_indexes: false,
default_index_root_uri: Uri::for_test("ram:///indexes"),
replication_factor: 1,
+ shard_throughput_limit: ByteSize::mb(5),
}
}
}
diff --git a/quickwit/quickwit-config/src/node_config/mod.rs b/quickwit/quickwit-config/src/node_config/mod.rs
index e120faec57a..86e724e0cc8 100644
--- a/quickwit/quickwit-config/src/node_config/mod.rs
+++ b/quickwit/quickwit-config/src/node_config/mod.rs
@@ -30,6 +30,7 @@ use anyhow::{bail, ensure};
use bytesize::ByteSize;
use http::HeaderMap;
use quickwit_common::net::HostAddr;
+use quickwit_common::shared_consts::DEFAULT_SHARD_THROUGHPUT_LIMIT;
use quickwit_common::uri::Uri;
use quickwit_proto::indexing::CpuCapacity;
use quickwit_proto::types::NodeId;
@@ -270,7 +271,7 @@ impl SearcherConfig {
pub struct IngestApiConfig {
pub max_queue_memory_usage: ByteSize,
pub max_queue_disk_usage: ByteSize,
- pub replication_factor: usize,
+ replication_factor: usize,
pub content_length_limit: ByteSize,
pub shard_throughput_limit: ByteSize,
}
@@ -282,12 +283,14 @@ impl Default for IngestApiConfig {
max_queue_disk_usage: ByteSize::gib(4),
replication_factor: 1,
content_length_limit: ByteSize::mib(10),
- shard_throughput_limit: ByteSize::mib(5),
+ shard_throughput_limit: DEFAULT_SHARD_THROUGHPUT_LIMIT,
}
}
}
impl IngestApiConfig {
+ /// Returns the replication factor, as defined in environment variable or in the configuration
+ /// in that order (the environment variable can overrrides the configuration).
pub fn replication_factor(&self) -> anyhow::Result {
if let Ok(replication_factor_str) = env::var("QW_INGEST_REPLICATION_FACTOR") {
let replication_factor = match replication_factor_str.trim() {
diff --git a/quickwit/quickwit-control-plane/src/control_plane.rs b/quickwit/quickwit-control-plane/src/control_plane.rs
index ea16e4ef796..be0511d2364 100644
--- a/quickwit/quickwit-control-plane/src/control_plane.rs
+++ b/quickwit/quickwit-control-plane/src/control_plane.rs
@@ -24,6 +24,7 @@ use std::time::Duration;
use anyhow::Context;
use async_trait::async_trait;
+use futures::future::Shared;
use futures::stream::FuturesUnordered;
use futures::{Future, StreamExt};
use quickwit_actors::{
@@ -35,7 +36,7 @@ use quickwit_cluster::{
};
use quickwit_common::pubsub::EventSubscriber;
use quickwit_common::uri::Uri;
-use quickwit_common::Progress;
+use quickwit_common::{shared_consts, Progress};
use quickwit_config::service::QuickwitService;
use quickwit_config::{ClusterConfig, IndexConfig, IndexTemplate, SourceConfig};
use quickwit_ingest::{IngesterPool, LocalShardsUpdate};
@@ -121,7 +122,6 @@ impl ControlPlane {
watch::Receiver,
) {
let disable_control_loop = false;
-
Self::spawn_inner(
universe,
cluster_config,
@@ -156,12 +156,15 @@ impl ControlPlane {
universe.spawn_builder().supervise_fn(move || {
let cluster_id = cluster_config.cluster_id.clone();
let replication_factor = cluster_config.replication_factor;
+ let shard_throughput_limit_mib: f32 =
+ cluster_config.shard_throughput_limit.as_u64() as f32 / shared_consts::MIB as f32;
let indexing_scheduler =
IndexingScheduler::new(cluster_id, self_node_id.clone(), indexer_pool.clone());
let ingest_controller = IngestController::new(
metastore.clone(),
ingester_pool.clone(),
replication_factor,
+ shard_throughput_limit_mib,
);
let readiness_tx = readiness_tx.clone();
diff --git a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs
index 9bc8898bf67..ab0e41cb8ce 100644
--- a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs
+++ b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs
@@ -59,16 +59,6 @@ use crate::control_plane::ControlPlane;
use crate::ingest::wait_handle::WaitHandle;
use crate::model::{ControlPlaneModel, ScalingMode, ShardEntry, ShardStats};
-const MAX_SHARD_INGESTION_THROUGHPUT_MIB_PER_SEC: f32 = 5.;
-
-/// Threshold in MiB/s above which we increase the number of shards.
-const SCALE_UP_SHARDS_THRESHOLD_MIB_PER_SEC: f32 =
- MAX_SHARD_INGESTION_THROUGHPUT_MIB_PER_SEC * 8. / 10.;
-
-/// Threshold in MiB/s below which we decrease the number of shards.
-const SCALE_DOWN_SHARDS_THRESHOLD_MIB_PER_SEC: f32 =
- MAX_SHARD_INGESTION_THROUGHPUT_MIB_PER_SEC * 2. / 10.;
-
const CLOSE_SHARDS_REQUEST_TIMEOUT: Duration = if cfg!(test) {
Duration::from_millis(50)
} else {
@@ -112,6 +102,10 @@ pub struct IngestController {
// This lock ensures that only one rebalance operation is performed at a time.
rebalance_lock: Arc>,
pub stats: IngestControllerStats,
+ // Threshold in MiB/s below which we decrease the number of shards.
+ scale_down_shards_threshold_mib_per_sec: f32,
+ // Threshold in MiB/s above which we increase the number of shards.
+ scale_up_shards_threshold_mib_per_sec: f32,
}
impl fmt::Debug for IngestController {
@@ -190,6 +184,7 @@ impl IngestController {
metastore: MetastoreServiceClient,
ingester_pool: IngesterPool,
replication_factor: usize,
+ max_shard_ingestion_throughput_mib_per_sec: f32,
) -> Self {
IngestController {
metastore,
@@ -197,6 +192,9 @@ impl IngestController {
replication_factor,
rebalance_lock: Arc::new(Mutex::new(())),
stats: IngestControllerStats::default(),
+ scale_up_shards_threshold_mib_per_sec: max_shard_ingestion_throughput_mib_per_sec * 0.8,
+ scale_down_shards_threshold_mib_per_sec: max_shard_ingestion_throughput_mib_per_sec
+ * 0.2,
}
}
@@ -293,10 +291,10 @@ impl IngestController {
&local_shards_update.source_uid,
&local_shards_update.shard_infos,
);
- if shard_stats.avg_ingestion_rate >= SCALE_UP_SHARDS_THRESHOLD_MIB_PER_SEC {
+ if shard_stats.avg_ingestion_rate >= self.scale_up_shards_threshold_mib_per_sec {
self.try_scale_up_shards(local_shards_update.source_uid, shard_stats, model, progress)
.await?;
- } else if shard_stats.avg_ingestion_rate <= SCALE_DOWN_SHARDS_THRESHOLD_MIB_PER_SEC
+ } else if shard_stats.avg_ingestion_rate <= self.scale_down_shards_threshold_mib_per_sec
&& shard_stats.num_open_shards > 1
{
self.try_scale_down_shards(
@@ -1142,6 +1140,7 @@ mod tests {
use quickwit_actors::Universe;
use quickwit_common::setup_logging_for_tests;
+ use quickwit_common::shared_consts::DEFAULT_SHARD_THROUGHPUT_LIMIT;
use quickwit_common::tower::DelayLayer;
use quickwit_config::{DocMapping, SourceConfig, INGEST_V2_SOURCE_ID};
use quickwit_ingest::{RateMibPerSec, ShardInfo};
@@ -1159,6 +1158,9 @@ mod tests {
use super::*;
+ const SHARD_THROUGHPUT_LIMIT_MB: f32 =
+ DEFAULT_SHARD_THROUGHPUT_LIMIT.as_u64() as f32 / 1_000_000f32;
+
#[tokio::test]
async fn test_ingest_controller_get_or_create_open_shards() {
let source_id: &'static str = "test-source";
@@ -1244,8 +1246,12 @@ mod tests {
ingester_pool.insert(NodeId::from("test-ingester-2"), ingester.clone());
let replication_factor = 2;
- let mut controller =
- IngestController::new(metastore, ingester_pool.clone(), replication_factor);
+ let mut controller = IngestController::new(
+ metastore,
+ ingester_pool.clone(),
+ replication_factor,
+ SHARD_THROUGHPUT_LIMIT_MB,
+ );
let mut model = ControlPlaneModel::default();
model.add_index(index_metadata_0.clone());
@@ -1425,7 +1431,12 @@ mod tests {
ingester_pool.insert(NodeId::from("test-ingester-1"), ingester.clone());
let replication_factor = 1;
- let mut controller = IngestController::new(metastore, ingester_pool, replication_factor);
+ let mut controller = IngestController::new(
+ metastore,
+ ingester_pool,
+ replication_factor,
+ SHARD_THROUGHPUT_LIMIT_MB,
+ );
let mut model = ControlPlaneModel::default();
model.add_index(index_metadata_0.clone());
@@ -1462,7 +1473,12 @@ mod tests {
let ingester_pool = IngesterPool::default();
let replication_factor = 2;
- let mut controller = IngestController::new(metastore, ingester_pool, replication_factor);
+ let mut controller = IngestController::new(
+ metastore,
+ ingester_pool,
+ replication_factor,
+ SHARD_THROUGHPUT_LIMIT_MB,
+ );
let mut model = ControlPlaneModel::default();
let index_uid = IndexUid::for_test("test-index-0", 0);
@@ -1507,8 +1523,12 @@ mod tests {
let ingester_pool = IngesterPool::default();
let replication_factor = 2;
- let controller =
- IngestController::new(metastore, ingester_pool.clone(), replication_factor);
+ let controller = IngestController::new(
+ metastore,
+ ingester_pool.clone(),
+ replication_factor,
+ SHARD_THROUGHPUT_LIMIT_MB,
+ );
let mut model = ControlPlaneModel::default();
@@ -1681,8 +1701,12 @@ mod tests {
let ingester_pool = IngesterPool::default();
let replication_factor = 1;
- let controller =
- IngestController::new(metastore, ingester_pool.clone(), replication_factor);
+ let controller = IngestController::new(
+ metastore,
+ ingester_pool.clone(),
+ replication_factor,
+ SHARD_THROUGHPUT_LIMIT_MB,
+ );
let ingester_id_0 = NodeId::from("test-ingester-0");
let mut mock_ingester_0 = MockIngesterService::new();
@@ -1895,8 +1919,12 @@ mod tests {
let ingester_pool = IngesterPool::default();
let replication_factor = 1;
- let mut controller =
- IngestController::new(metastore, ingester_pool.clone(), replication_factor);
+ let mut controller = IngestController::new(
+ metastore,
+ ingester_pool.clone(),
+ replication_factor,
+ SHARD_THROUGHPUT_LIMIT_MB,
+ );
let index_uid = IndexUid::for_test("test-index", 0);
let source_id = "test-source".to_string();
@@ -2023,8 +2051,12 @@ mod tests {
let ingester_pool = IngesterPool::default();
let replication_factor = 1;
- let mut controller =
- IngestController::new(metastore, ingester_pool.clone(), replication_factor);
+ let mut controller = IngestController::new(
+ metastore,
+ ingester_pool.clone(),
+ replication_factor,
+ SHARD_THROUGHPUT_LIMIT_MB,
+ );
let index_uid = IndexUid::for_test("test-index", 0);
let index_metadata = IndexMetadata::for_test("test-index", "ram://indexes/test-index");
@@ -2223,8 +2255,12 @@ mod tests {
let ingester_pool = IngesterPool::default();
let replication_factor = 1;
- let mut controller =
- IngestController::new(metastore, ingester_pool.clone(), replication_factor);
+ let mut controller = IngestController::new(
+ metastore,
+ ingester_pool.clone(),
+ replication_factor,
+ SHARD_THROUGHPUT_LIMIT_MB,
+ );
let index_uid = IndexUid::for_test("test-index", 0);
let source_id: SourceId = INGEST_V2_SOURCE_ID.to_string();
@@ -2330,8 +2366,12 @@ mod tests {
let ingester_pool = IngesterPool::default();
let replication_factor = 1;
- let controller =
- IngestController::new(metastore, ingester_pool.clone(), replication_factor);
+ let controller = IngestController::new(
+ metastore,
+ ingester_pool.clone(),
+ replication_factor,
+ SHARD_THROUGHPUT_LIMIT_MB,
+ );
let index_uid = IndexUid::for_test("test-index", 0);
let source_id: SourceId = "test-source".to_string();
@@ -2545,8 +2585,12 @@ mod tests {
let ingester_pool = IngesterPool::default();
let replication_factor = 2;
- let controller =
- IngestController::new(metastore, ingester_pool.clone(), replication_factor);
+ let controller = IngestController::new(
+ metastore,
+ ingester_pool.clone(),
+ replication_factor,
+ SHARD_THROUGHPUT_LIMIT_MB,
+ );
let index_uid = IndexUid::for_test("test-index", 0);
let source_id: SourceId = "test-source".to_string();
@@ -2624,7 +2668,12 @@ mod tests {
let ingester_pool = IngesterPool::default();
let replication_factor = 2;
- let controller = IngestController::new(metastore, ingester_pool, replication_factor);
+ let controller = IngestController::new(
+ metastore,
+ ingester_pool,
+ replication_factor,
+ SHARD_THROUGHPUT_LIMIT_MB,
+ );
let mut model = ControlPlaneModel::default();
@@ -2694,8 +2743,12 @@ mod tests {
let metastore = MetastoreServiceClient::mocked();
let ingester_pool = IngesterPool::default();
let replication_factor = 1;
- let controller =
- IngestController::new(metastore, ingester_pool.clone(), replication_factor);
+ let controller = IngestController::new(
+ metastore,
+ ingester_pool.clone(),
+ replication_factor,
+ SHARD_THROUGHPUT_LIMIT_MB,
+ );
let closed_shards = controller.close_shards(Vec::new()).await;
assert_eq!(closed_shards.len(), 0);
@@ -2846,8 +2899,12 @@ mod tests {
let metastore = MetastoreServiceClient::from_mock(mock_metastore);
let ingester_pool = IngesterPool::default();
let replication_factor = 1;
- let mut controller =
- IngestController::new(metastore, ingester_pool.clone(), replication_factor);
+ let mut controller = IngestController::new(
+ metastore,
+ ingester_pool.clone(),
+ replication_factor,
+ SHARD_THROUGHPUT_LIMIT_MB,
+ );
let mut model = ControlPlaneModel::default();
diff --git a/quickwit/quickwit-ingest/src/lib.rs b/quickwit/quickwit-ingest/src/lib.rs
index 93d605f96a6..099126fa6d6 100644
--- a/quickwit/quickwit-ingest/src/lib.rs
+++ b/quickwit/quickwit-ingest/src/lib.rs
@@ -154,7 +154,6 @@ macro_rules! with_lock_metrics {
#[cfg(test)]
mod tests {
- use bytesize::ByteSize;
use quickwit_actors::AskError;
use super::*;
@@ -244,11 +243,13 @@ mod tests {
let queues_dir_path = temp_dir.path().join("queues-0");
get_ingest_api_service(&queues_dir_path).await.unwrap_err();
- let ingest_api_config = IngestApiConfig {
- max_queue_memory_usage: ByteSize(1200),
- max_queue_disk_usage: ByteSize::mib(256),
- ..Default::default()
- };
+ let ingest_api_config = serde_json::from_str(
+ r#"{
+ "max_queue_memory_usage": "1200b",
+ "max_queue_disk_usage": "256mb"
+ }"#,
+ )
+ .unwrap();
init_ingest_api(&universe, &queues_dir_path, &ingest_api_config)
.await
.unwrap();
diff --git a/quickwit/quickwit-serve/src/ingest_api/rest_handler.rs b/quickwit/quickwit-serve/src/ingest_api/rest_handler.rs
index 5522f94ea04..9acadd080a8 100644
--- a/quickwit/quickwit-serve/src/ingest_api/rest_handler.rs
+++ b/quickwit/quickwit-serve/src/ingest_api/rest_handler.rs
@@ -277,7 +277,6 @@ pub(crate) mod tests {
use std::time::Duration;
use bytes::Bytes;
- use bytesize::ByteSize;
use quickwit_actors::{Mailbox, Universe};
use quickwit_config::IngestApiConfig;
use quickwit_ingest::{
@@ -398,10 +397,8 @@ pub(crate) mod tests {
#[tokio::test]
async fn test_ingest_api_return_429_if_above_limits() {
- let config = IngestApiConfig {
- max_queue_memory_usage: ByteSize(1),
- ..Default::default()
- };
+ let config: IngestApiConfig =
+ serde_json::from_str(r#"{ "max_queue_memory_usage": "1" }"#).unwrap();
let (universe, _temp_dir, ingest_service, _) =
setup_ingest_service(&["my-index"], &config).await;
let ingest_router = IngestRouterServiceClient::mocked();
@@ -420,10 +417,8 @@ pub(crate) mod tests {
#[tokio::test]
async fn test_ingest_api_return_413_if_above_content_limit() {
- let config = IngestApiConfig {
- content_length_limit: ByteSize(1),
- ..Default::default()
- };
+ let config: IngestApiConfig =
+ serde_json::from_str(r#"{ "content_length_limit": "1" }"#).unwrap();
let (universe, _temp_dir, ingest_service, _) =
setup_ingest_service(&["my-index"], &IngestApiConfig::default()).await;
let ingest_router = IngestRouterServiceClient::mocked();
diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs
index 47f04c022b6..5fb5f10b26e 100644
--- a/quickwit/quickwit-serve/src/lib.rs
+++ b/quickwit/quickwit-serve/src/lib.rs
@@ -78,7 +78,7 @@ use quickwit_common::tower::{
use quickwit_common::uri::Uri;
use quickwit_common::{get_bool_from_env, spawn_named_task};
use quickwit_config::service::QuickwitService;
-use quickwit_config::{ClusterConfig, NodeConfig};
+use quickwit_config::{ClusterConfig, IngestApiConfig, NodeConfig};
use quickwit_control_plane::control_plane::{ControlPlane, ControlPlaneEventSubscriber};
use quickwit_control_plane::{IndexerNodeInfo, IndexerPool};
use quickwit_index_management::{IndexService as IndexManager, IndexServiceError};
@@ -313,11 +313,6 @@ async fn start_control_plane_if_needed(
.await?;
let self_node_id: NodeId = cluster.self_node_id().into();
- let replication_factor = node_config
- .ingest_api_config
- .replication_factor()
- .expect("replication factor should have been validated")
- .get();
let control_plane_mailbox = setup_control_plane(
universe,
@@ -328,7 +323,7 @@ async fn start_control_plane_if_needed(
ingester_pool.clone(),
metastore_client.clone(),
node_config.default_index_root_uri.clone(),
- replication_factor,
+ &node_config.ingest_api_config,
)
.await?;
@@ -1041,14 +1036,19 @@ async fn setup_control_plane(
ingester_pool: IngesterPool,
metastore: MetastoreServiceClient,
default_index_root_uri: Uri,
- replication_factor: usize,
+ ingest_api_config: &IngestApiConfig,
) -> anyhow::Result> {
let cluster_id = cluster.cluster_id().to_string();
+ let replication_factor = ingest_api_config
+ .replication_factor()
+ .expect("replication factor should have been validated")
+ .get();
let cluster_config = ClusterConfig {
cluster_id,
auto_create_indexes: true,
default_index_root_uri,
replication_factor,
+ shard_throughput_limit: ingest_api_config.shard_throughput_limit,
};
let (control_plane_mailbox, _control_plane_handle, mut readiness_rx) = ControlPlane::spawn(
universe,