From 02737abfb57b94a23e8eb8f2a509154ed12e7c56 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Fri, 5 Jul 2024 11:41:21 +0900 Subject: [PATCH] Plugging the shard throughput limit configuration to the ingest controller. --- quickwit/quickwit-common/src/shared_consts.rs | 7 + .../quickwit-config/src/cluster_config/mod.rs | 3 + .../quickwit-config/src/node_config/mod.rs | 7 +- .../src/control_plane.rs | 7 +- .../src/ingest/ingest_controller.rs | 127 +++++++++++++----- quickwit/quickwit-ingest/src/lib.rs | 13 +- .../src/ingest_api/rest_handler.rs | 13 +- quickwit/quickwit-serve/src/lib.rs | 16 +-- 8 files changed, 131 insertions(+), 62 deletions(-) diff --git a/quickwit/quickwit-common/src/shared_consts.rs b/quickwit/quickwit-common/src/shared_consts.rs index 3a47ab1effc..f888854b700 100644 --- a/quickwit/quickwit-common/src/shared_consts.rs +++ b/quickwit/quickwit-common/src/shared_consts.rs @@ -19,6 +19,8 @@ use std::time::Duration; +use bytesize::ByteSize; + /// Field name reserved for storing the dynamically indexed fields. pub const FIELD_PRESENCE_FIELD_NAME: &str = "_field_presence"; @@ -41,3 +43,8 @@ pub const INGESTER_PRIMARY_SHARDS_PREFIX: &str = "ingester.primary_shards:"; /// File name for the encoded list of fields in the split pub const SPLIT_FIELDS_FILE_NAME: &str = "split_fields"; + +pub const DEFAULT_SHARD_THROUGHPUT_LIMIT: ByteSize = ByteSize::mib(5); + +// (Just a reexport). +pub use bytesize::MIB; diff --git a/quickwit/quickwit-config/src/cluster_config/mod.rs b/quickwit/quickwit-config/src/cluster_config/mod.rs index 6dda71921ee..eac26d7e3b6 100644 --- a/quickwit/quickwit-config/src/cluster_config/mod.rs +++ b/quickwit/quickwit-config/src/cluster_config/mod.rs @@ -17,6 +17,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +use bytesize::ByteSize; use quickwit_common::uri::Uri; /// An embryo of a cluster config. @@ -27,6 +28,7 @@ pub struct ClusterConfig { pub auto_create_indexes: bool, pub default_index_root_uri: Uri, pub replication_factor: usize, + pub shard_throughput_limit: ByteSize, } impl ClusterConfig { @@ -37,6 +39,7 @@ impl ClusterConfig { auto_create_indexes: false, default_index_root_uri: Uri::for_test("ram:///indexes"), replication_factor: 1, + shard_throughput_limit: ByteSize::mb(5), } } } diff --git a/quickwit/quickwit-config/src/node_config/mod.rs b/quickwit/quickwit-config/src/node_config/mod.rs index e120faec57a..86e724e0cc8 100644 --- a/quickwit/quickwit-config/src/node_config/mod.rs +++ b/quickwit/quickwit-config/src/node_config/mod.rs @@ -30,6 +30,7 @@ use anyhow::{bail, ensure}; use bytesize::ByteSize; use http::HeaderMap; use quickwit_common::net::HostAddr; +use quickwit_common::shared_consts::DEFAULT_SHARD_THROUGHPUT_LIMIT; use quickwit_common::uri::Uri; use quickwit_proto::indexing::CpuCapacity; use quickwit_proto::types::NodeId; @@ -270,7 +271,7 @@ impl SearcherConfig { pub struct IngestApiConfig { pub max_queue_memory_usage: ByteSize, pub max_queue_disk_usage: ByteSize, - pub replication_factor: usize, + replication_factor: usize, pub content_length_limit: ByteSize, pub shard_throughput_limit: ByteSize, } @@ -282,12 +283,14 @@ impl Default for IngestApiConfig { max_queue_disk_usage: ByteSize::gib(4), replication_factor: 1, content_length_limit: ByteSize::mib(10), - shard_throughput_limit: ByteSize::mib(5), + shard_throughput_limit: DEFAULT_SHARD_THROUGHPUT_LIMIT, } } } impl IngestApiConfig { + /// Returns the replication factor, as defined in environment variable or in the configuration + /// in that order (the environment variable can overrrides the configuration). pub fn replication_factor(&self) -> anyhow::Result { if let Ok(replication_factor_str) = env::var("QW_INGEST_REPLICATION_FACTOR") { let replication_factor = match replication_factor_str.trim() { diff --git a/quickwit/quickwit-control-plane/src/control_plane.rs b/quickwit/quickwit-control-plane/src/control_plane.rs index ea16e4ef796..be0511d2364 100644 --- a/quickwit/quickwit-control-plane/src/control_plane.rs +++ b/quickwit/quickwit-control-plane/src/control_plane.rs @@ -24,6 +24,7 @@ use std::time::Duration; use anyhow::Context; use async_trait::async_trait; +use futures::future::Shared; use futures::stream::FuturesUnordered; use futures::{Future, StreamExt}; use quickwit_actors::{ @@ -35,7 +36,7 @@ use quickwit_cluster::{ }; use quickwit_common::pubsub::EventSubscriber; use quickwit_common::uri::Uri; -use quickwit_common::Progress; +use quickwit_common::{shared_consts, Progress}; use quickwit_config::service::QuickwitService; use quickwit_config::{ClusterConfig, IndexConfig, IndexTemplate, SourceConfig}; use quickwit_ingest::{IngesterPool, LocalShardsUpdate}; @@ -121,7 +122,6 @@ impl ControlPlane { watch::Receiver, ) { let disable_control_loop = false; - Self::spawn_inner( universe, cluster_config, @@ -156,12 +156,15 @@ impl ControlPlane { universe.spawn_builder().supervise_fn(move || { let cluster_id = cluster_config.cluster_id.clone(); let replication_factor = cluster_config.replication_factor; + let shard_throughput_limit_mib: f32 = + cluster_config.shard_throughput_limit.as_u64() as f32 / shared_consts::MIB as f32; let indexing_scheduler = IndexingScheduler::new(cluster_id, self_node_id.clone(), indexer_pool.clone()); let ingest_controller = IngestController::new( metastore.clone(), ingester_pool.clone(), replication_factor, + shard_throughput_limit_mib, ); let readiness_tx = readiness_tx.clone(); diff --git a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs index 9bc8898bf67..ab0e41cb8ce 100644 --- a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs +++ b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs @@ -59,16 +59,6 @@ use crate::control_plane::ControlPlane; use crate::ingest::wait_handle::WaitHandle; use crate::model::{ControlPlaneModel, ScalingMode, ShardEntry, ShardStats}; -const MAX_SHARD_INGESTION_THROUGHPUT_MIB_PER_SEC: f32 = 5.; - -/// Threshold in MiB/s above which we increase the number of shards. -const SCALE_UP_SHARDS_THRESHOLD_MIB_PER_SEC: f32 = - MAX_SHARD_INGESTION_THROUGHPUT_MIB_PER_SEC * 8. / 10.; - -/// Threshold in MiB/s below which we decrease the number of shards. -const SCALE_DOWN_SHARDS_THRESHOLD_MIB_PER_SEC: f32 = - MAX_SHARD_INGESTION_THROUGHPUT_MIB_PER_SEC * 2. / 10.; - const CLOSE_SHARDS_REQUEST_TIMEOUT: Duration = if cfg!(test) { Duration::from_millis(50) } else { @@ -112,6 +102,10 @@ pub struct IngestController { // This lock ensures that only one rebalance operation is performed at a time. rebalance_lock: Arc>, pub stats: IngestControllerStats, + // Threshold in MiB/s below which we decrease the number of shards. + scale_down_shards_threshold_mib_per_sec: f32, + // Threshold in MiB/s above which we increase the number of shards. + scale_up_shards_threshold_mib_per_sec: f32, } impl fmt::Debug for IngestController { @@ -190,6 +184,7 @@ impl IngestController { metastore: MetastoreServiceClient, ingester_pool: IngesterPool, replication_factor: usize, + max_shard_ingestion_throughput_mib_per_sec: f32, ) -> Self { IngestController { metastore, @@ -197,6 +192,9 @@ impl IngestController { replication_factor, rebalance_lock: Arc::new(Mutex::new(())), stats: IngestControllerStats::default(), + scale_up_shards_threshold_mib_per_sec: max_shard_ingestion_throughput_mib_per_sec * 0.8, + scale_down_shards_threshold_mib_per_sec: max_shard_ingestion_throughput_mib_per_sec + * 0.2, } } @@ -293,10 +291,10 @@ impl IngestController { &local_shards_update.source_uid, &local_shards_update.shard_infos, ); - if shard_stats.avg_ingestion_rate >= SCALE_UP_SHARDS_THRESHOLD_MIB_PER_SEC { + if shard_stats.avg_ingestion_rate >= self.scale_up_shards_threshold_mib_per_sec { self.try_scale_up_shards(local_shards_update.source_uid, shard_stats, model, progress) .await?; - } else if shard_stats.avg_ingestion_rate <= SCALE_DOWN_SHARDS_THRESHOLD_MIB_PER_SEC + } else if shard_stats.avg_ingestion_rate <= self.scale_down_shards_threshold_mib_per_sec && shard_stats.num_open_shards > 1 { self.try_scale_down_shards( @@ -1142,6 +1140,7 @@ mod tests { use quickwit_actors::Universe; use quickwit_common::setup_logging_for_tests; + use quickwit_common::shared_consts::DEFAULT_SHARD_THROUGHPUT_LIMIT; use quickwit_common::tower::DelayLayer; use quickwit_config::{DocMapping, SourceConfig, INGEST_V2_SOURCE_ID}; use quickwit_ingest::{RateMibPerSec, ShardInfo}; @@ -1159,6 +1158,9 @@ mod tests { use super::*; + const SHARD_THROUGHPUT_LIMIT_MB: f32 = + DEFAULT_SHARD_THROUGHPUT_LIMIT.as_u64() as f32 / 1_000_000f32; + #[tokio::test] async fn test_ingest_controller_get_or_create_open_shards() { let source_id: &'static str = "test-source"; @@ -1244,8 +1246,12 @@ mod tests { ingester_pool.insert(NodeId::from("test-ingester-2"), ingester.clone()); let replication_factor = 2; - let mut controller = - IngestController::new(metastore, ingester_pool.clone(), replication_factor); + let mut controller = IngestController::new( + metastore, + ingester_pool.clone(), + replication_factor, + SHARD_THROUGHPUT_LIMIT_MB, + ); let mut model = ControlPlaneModel::default(); model.add_index(index_metadata_0.clone()); @@ -1425,7 +1431,12 @@ mod tests { ingester_pool.insert(NodeId::from("test-ingester-1"), ingester.clone()); let replication_factor = 1; - let mut controller = IngestController::new(metastore, ingester_pool, replication_factor); + let mut controller = IngestController::new( + metastore, + ingester_pool, + replication_factor, + SHARD_THROUGHPUT_LIMIT_MB, + ); let mut model = ControlPlaneModel::default(); model.add_index(index_metadata_0.clone()); @@ -1462,7 +1473,12 @@ mod tests { let ingester_pool = IngesterPool::default(); let replication_factor = 2; - let mut controller = IngestController::new(metastore, ingester_pool, replication_factor); + let mut controller = IngestController::new( + metastore, + ingester_pool, + replication_factor, + SHARD_THROUGHPUT_LIMIT_MB, + ); let mut model = ControlPlaneModel::default(); let index_uid = IndexUid::for_test("test-index-0", 0); @@ -1507,8 +1523,12 @@ mod tests { let ingester_pool = IngesterPool::default(); let replication_factor = 2; - let controller = - IngestController::new(metastore, ingester_pool.clone(), replication_factor); + let controller = IngestController::new( + metastore, + ingester_pool.clone(), + replication_factor, + SHARD_THROUGHPUT_LIMIT_MB, + ); let mut model = ControlPlaneModel::default(); @@ -1681,8 +1701,12 @@ mod tests { let ingester_pool = IngesterPool::default(); let replication_factor = 1; - let controller = - IngestController::new(metastore, ingester_pool.clone(), replication_factor); + let controller = IngestController::new( + metastore, + ingester_pool.clone(), + replication_factor, + SHARD_THROUGHPUT_LIMIT_MB, + ); let ingester_id_0 = NodeId::from("test-ingester-0"); let mut mock_ingester_0 = MockIngesterService::new(); @@ -1895,8 +1919,12 @@ mod tests { let ingester_pool = IngesterPool::default(); let replication_factor = 1; - let mut controller = - IngestController::new(metastore, ingester_pool.clone(), replication_factor); + let mut controller = IngestController::new( + metastore, + ingester_pool.clone(), + replication_factor, + SHARD_THROUGHPUT_LIMIT_MB, + ); let index_uid = IndexUid::for_test("test-index", 0); let source_id = "test-source".to_string(); @@ -2023,8 +2051,12 @@ mod tests { let ingester_pool = IngesterPool::default(); let replication_factor = 1; - let mut controller = - IngestController::new(metastore, ingester_pool.clone(), replication_factor); + let mut controller = IngestController::new( + metastore, + ingester_pool.clone(), + replication_factor, + SHARD_THROUGHPUT_LIMIT_MB, + ); let index_uid = IndexUid::for_test("test-index", 0); let index_metadata = IndexMetadata::for_test("test-index", "ram://indexes/test-index"); @@ -2223,8 +2255,12 @@ mod tests { let ingester_pool = IngesterPool::default(); let replication_factor = 1; - let mut controller = - IngestController::new(metastore, ingester_pool.clone(), replication_factor); + let mut controller = IngestController::new( + metastore, + ingester_pool.clone(), + replication_factor, + SHARD_THROUGHPUT_LIMIT_MB, + ); let index_uid = IndexUid::for_test("test-index", 0); let source_id: SourceId = INGEST_V2_SOURCE_ID.to_string(); @@ -2330,8 +2366,12 @@ mod tests { let ingester_pool = IngesterPool::default(); let replication_factor = 1; - let controller = - IngestController::new(metastore, ingester_pool.clone(), replication_factor); + let controller = IngestController::new( + metastore, + ingester_pool.clone(), + replication_factor, + SHARD_THROUGHPUT_LIMIT_MB, + ); let index_uid = IndexUid::for_test("test-index", 0); let source_id: SourceId = "test-source".to_string(); @@ -2545,8 +2585,12 @@ mod tests { let ingester_pool = IngesterPool::default(); let replication_factor = 2; - let controller = - IngestController::new(metastore, ingester_pool.clone(), replication_factor); + let controller = IngestController::new( + metastore, + ingester_pool.clone(), + replication_factor, + SHARD_THROUGHPUT_LIMIT_MB, + ); let index_uid = IndexUid::for_test("test-index", 0); let source_id: SourceId = "test-source".to_string(); @@ -2624,7 +2668,12 @@ mod tests { let ingester_pool = IngesterPool::default(); let replication_factor = 2; - let controller = IngestController::new(metastore, ingester_pool, replication_factor); + let controller = IngestController::new( + metastore, + ingester_pool, + replication_factor, + SHARD_THROUGHPUT_LIMIT_MB, + ); let mut model = ControlPlaneModel::default(); @@ -2694,8 +2743,12 @@ mod tests { let metastore = MetastoreServiceClient::mocked(); let ingester_pool = IngesterPool::default(); let replication_factor = 1; - let controller = - IngestController::new(metastore, ingester_pool.clone(), replication_factor); + let controller = IngestController::new( + metastore, + ingester_pool.clone(), + replication_factor, + SHARD_THROUGHPUT_LIMIT_MB, + ); let closed_shards = controller.close_shards(Vec::new()).await; assert_eq!(closed_shards.len(), 0); @@ -2846,8 +2899,12 @@ mod tests { let metastore = MetastoreServiceClient::from_mock(mock_metastore); let ingester_pool = IngesterPool::default(); let replication_factor = 1; - let mut controller = - IngestController::new(metastore, ingester_pool.clone(), replication_factor); + let mut controller = IngestController::new( + metastore, + ingester_pool.clone(), + replication_factor, + SHARD_THROUGHPUT_LIMIT_MB, + ); let mut model = ControlPlaneModel::default(); diff --git a/quickwit/quickwit-ingest/src/lib.rs b/quickwit/quickwit-ingest/src/lib.rs index 93d605f96a6..099126fa6d6 100644 --- a/quickwit/quickwit-ingest/src/lib.rs +++ b/quickwit/quickwit-ingest/src/lib.rs @@ -154,7 +154,6 @@ macro_rules! with_lock_metrics { #[cfg(test)] mod tests { - use bytesize::ByteSize; use quickwit_actors::AskError; use super::*; @@ -244,11 +243,13 @@ mod tests { let queues_dir_path = temp_dir.path().join("queues-0"); get_ingest_api_service(&queues_dir_path).await.unwrap_err(); - let ingest_api_config = IngestApiConfig { - max_queue_memory_usage: ByteSize(1200), - max_queue_disk_usage: ByteSize::mib(256), - ..Default::default() - }; + let ingest_api_config = serde_json::from_str( + r#"{ + "max_queue_memory_usage": "1200b", + "max_queue_disk_usage": "256mb" + }"#, + ) + .unwrap(); init_ingest_api(&universe, &queues_dir_path, &ingest_api_config) .await .unwrap(); diff --git a/quickwit/quickwit-serve/src/ingest_api/rest_handler.rs b/quickwit/quickwit-serve/src/ingest_api/rest_handler.rs index 5522f94ea04..9acadd080a8 100644 --- a/quickwit/quickwit-serve/src/ingest_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/ingest_api/rest_handler.rs @@ -277,7 +277,6 @@ pub(crate) mod tests { use std::time::Duration; use bytes::Bytes; - use bytesize::ByteSize; use quickwit_actors::{Mailbox, Universe}; use quickwit_config::IngestApiConfig; use quickwit_ingest::{ @@ -398,10 +397,8 @@ pub(crate) mod tests { #[tokio::test] async fn test_ingest_api_return_429_if_above_limits() { - let config = IngestApiConfig { - max_queue_memory_usage: ByteSize(1), - ..Default::default() - }; + let config: IngestApiConfig = + serde_json::from_str(r#"{ "max_queue_memory_usage": "1" }"#).unwrap(); let (universe, _temp_dir, ingest_service, _) = setup_ingest_service(&["my-index"], &config).await; let ingest_router = IngestRouterServiceClient::mocked(); @@ -420,10 +417,8 @@ pub(crate) mod tests { #[tokio::test] async fn test_ingest_api_return_413_if_above_content_limit() { - let config = IngestApiConfig { - content_length_limit: ByteSize(1), - ..Default::default() - }; + let config: IngestApiConfig = + serde_json::from_str(r#"{ "content_length_limit": "1" }"#).unwrap(); let (universe, _temp_dir, ingest_service, _) = setup_ingest_service(&["my-index"], &IngestApiConfig::default()).await; let ingest_router = IngestRouterServiceClient::mocked(); diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index 47f04c022b6..5fb5f10b26e 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -78,7 +78,7 @@ use quickwit_common::tower::{ use quickwit_common::uri::Uri; use quickwit_common::{get_bool_from_env, spawn_named_task}; use quickwit_config::service::QuickwitService; -use quickwit_config::{ClusterConfig, NodeConfig}; +use quickwit_config::{ClusterConfig, IngestApiConfig, NodeConfig}; use quickwit_control_plane::control_plane::{ControlPlane, ControlPlaneEventSubscriber}; use quickwit_control_plane::{IndexerNodeInfo, IndexerPool}; use quickwit_index_management::{IndexService as IndexManager, IndexServiceError}; @@ -313,11 +313,6 @@ async fn start_control_plane_if_needed( .await?; let self_node_id: NodeId = cluster.self_node_id().into(); - let replication_factor = node_config - .ingest_api_config - .replication_factor() - .expect("replication factor should have been validated") - .get(); let control_plane_mailbox = setup_control_plane( universe, @@ -328,7 +323,7 @@ async fn start_control_plane_if_needed( ingester_pool.clone(), metastore_client.clone(), node_config.default_index_root_uri.clone(), - replication_factor, + &node_config.ingest_api_config, ) .await?; @@ -1041,14 +1036,19 @@ async fn setup_control_plane( ingester_pool: IngesterPool, metastore: MetastoreServiceClient, default_index_root_uri: Uri, - replication_factor: usize, + ingest_api_config: &IngestApiConfig, ) -> anyhow::Result> { let cluster_id = cluster.cluster_id().to_string(); + let replication_factor = ingest_api_config + .replication_factor() + .expect("replication factor should have been validated") + .get(); let cluster_config = ClusterConfig { cluster_id, auto_create_indexes: true, default_index_root_uri, replication_factor, + shard_throughput_limit: ingest_api_config.shard_throughput_limit, }; let (control_plane_mailbox, _control_plane_handle, mut readiness_rx) = ControlPlane::spawn( universe,