Skip to content

Commit

Permalink
Plugging the shard throughput limit configuration to the ingest
Browse files Browse the repository at this point in the history
controller.
  • Loading branch information
fulmicoton committed Jul 5, 2024
1 parent cb63d32 commit 458b619
Show file tree
Hide file tree
Showing 8 changed files with 126 additions and 61 deletions.
4 changes: 4 additions & 0 deletions quickwit/quickwit-common/src/shared_consts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

use std::time::Duration;

use bytesize::ByteSize;

/// Field name reserved for storing the dynamically indexed fields.
pub const FIELD_PRESENCE_FIELD_NAME: &str = "_field_presence";

Expand All @@ -41,3 +43,5 @@ pub const INGESTER_PRIMARY_SHARDS_PREFIX: &str = "ingester.primary_shards:";

/// File name for the encoded list of fields in the split
pub const SPLIT_FIELDS_FILE_NAME: &str = "split_fields";

pub const DEFAULT_SHARD_THROUGHPUT_LIMIT: ByteSize = ByteSize::mb(5);
3 changes: 3 additions & 0 deletions quickwit/quickwit-config/src/cluster_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use bytesize::ByteSize;
use quickwit_common::uri::Uri;

/// An embryo of a cluster config.
Expand All @@ -27,6 +28,7 @@ pub struct ClusterConfig {
pub auto_create_indexes: bool,
pub default_index_root_uri: Uri,
pub replication_factor: usize,
pub shard_throughput_limit: ByteSize,
}

impl ClusterConfig {
Expand All @@ -37,6 +39,7 @@ impl ClusterConfig {
auto_create_indexes: false,
default_index_root_uri: Uri::for_test("ram:///indexes"),
replication_factor: 1,
shard_throughput_limit: ByteSize::mb(5),
}
}
}
7 changes: 5 additions & 2 deletions quickwit/quickwit-config/src/node_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use anyhow::{bail, ensure};
use bytesize::ByteSize;
use http::HeaderMap;
use quickwit_common::net::HostAddr;
use quickwit_common::shared_consts::DEFAULT_SHARD_THROUGHPUT_LIMIT;
use quickwit_common::uri::Uri;
use quickwit_proto::indexing::CpuCapacity;
use quickwit_proto::types::NodeId;
Expand Down Expand Up @@ -270,7 +271,7 @@ impl SearcherConfig {
pub struct IngestApiConfig {
pub max_queue_memory_usage: ByteSize,
pub max_queue_disk_usage: ByteSize,
pub replication_factor: usize,
replication_factor: usize,
pub content_length_limit: ByteSize,
pub shard_throughput_limit: ByteSize,
}
Expand All @@ -282,12 +283,14 @@ impl Default for IngestApiConfig {
max_queue_disk_usage: ByteSize::gib(4),
replication_factor: 1,
content_length_limit: ByteSize::mib(10),
shard_throughput_limit: ByteSize::mib(5),
shard_throughput_limit: DEFAULT_SHARD_THROUGHPUT_LIMIT,
}
}
}

impl IngestApiConfig {
/// Returns the replication factor, as defined in environment variable or in the configuration
/// in that order (the environment variable can overrrides the configuration).
pub fn replication_factor(&self) -> anyhow::Result<NonZeroUsize> {
if let Ok(replication_factor_str) = env::var("QW_INGEST_REPLICATION_FACTOR") {
let replication_factor = match replication_factor_str.trim() {
Expand Down
4 changes: 3 additions & 1 deletion quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ impl ControlPlane {
watch::Receiver<bool>,
) {
let disable_control_loop = false;

Self::spawn_inner(
universe,
cluster_config,
Expand Down Expand Up @@ -156,12 +155,15 @@ impl ControlPlane {
universe.spawn_builder().supervise_fn(move || {
let cluster_id = cluster_config.cluster_id.clone();
let replication_factor = cluster_config.replication_factor;
let shard_throughput_limit_mb: f32 =
cluster_config.shard_throughput_limit.as_u64() as f32 / 1_000_000f32;
let indexing_scheduler =
IndexingScheduler::new(cluster_id, self_node_id.clone(), indexer_pool.clone());
let ingest_controller = IngestController::new(
metastore.clone(),
ingester_pool.clone(),
replication_factor,
shard_throughput_limit_mb,
);

let readiness_tx = readiness_tx.clone();
Expand Down
127 changes: 92 additions & 35 deletions quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,6 @@ use crate::control_plane::ControlPlane;
use crate::ingest::wait_handle::WaitHandle;
use crate::model::{ControlPlaneModel, ScalingMode, ShardEntry, ShardStats};

const MAX_SHARD_INGESTION_THROUGHPUT_MIB_PER_SEC: f32 = 5.;

/// Threshold in MiB/s above which we increase the number of shards.
const SCALE_UP_SHARDS_THRESHOLD_MIB_PER_SEC: f32 =
MAX_SHARD_INGESTION_THROUGHPUT_MIB_PER_SEC * 8. / 10.;

/// Threshold in MiB/s below which we decrease the number of shards.
const SCALE_DOWN_SHARDS_THRESHOLD_MIB_PER_SEC: f32 =
MAX_SHARD_INGESTION_THROUGHPUT_MIB_PER_SEC * 2. / 10.;

const CLOSE_SHARDS_REQUEST_TIMEOUT: Duration = if cfg!(test) {
Duration::from_millis(50)
} else {
Expand Down Expand Up @@ -112,6 +102,10 @@ pub struct IngestController {
// This lock ensures that only one rebalance operation is performed at a time.
rebalance_lock: Arc<Mutex<()>>,
pub stats: IngestControllerStats,
// Threshold in MiB/s below which we decrease the number of shards.
scale_down_shards_threshold_mib_per_sec: f32,
// Threshold in MiB/s above which we increase the number of shards.
scale_up_shards_threshold_mib_per_sec: f32,
}

impl fmt::Debug for IngestController {
Expand Down Expand Up @@ -190,13 +184,17 @@ impl IngestController {
metastore: MetastoreServiceClient,
ingester_pool: IngesterPool,
replication_factor: usize,
max_shard_ingestion_throughput_mib_per_sec: f32,
) -> Self {
IngestController {
metastore,
ingester_pool,
replication_factor,
rebalance_lock: Arc::new(Mutex::new(())),
stats: IngestControllerStats::default(),
scale_up_shards_threshold_mib_per_sec: max_shard_ingestion_throughput_mib_per_sec * 0.8,
scale_down_shards_threshold_mib_per_sec: max_shard_ingestion_throughput_mib_per_sec
* 0.2,
}
}

Expand Down Expand Up @@ -293,10 +291,10 @@ impl IngestController {
&local_shards_update.source_uid,
&local_shards_update.shard_infos,
);
if shard_stats.avg_ingestion_rate >= SCALE_UP_SHARDS_THRESHOLD_MIB_PER_SEC {
if shard_stats.avg_ingestion_rate >= self.scale_up_shards_threshold_mib_per_sec {
self.try_scale_up_shards(local_shards_update.source_uid, shard_stats, model, progress)
.await?;
} else if shard_stats.avg_ingestion_rate <= SCALE_DOWN_SHARDS_THRESHOLD_MIB_PER_SEC
} else if shard_stats.avg_ingestion_rate <= self.scale_down_shards_threshold_mib_per_sec
&& shard_stats.num_open_shards > 1
{
self.try_scale_down_shards(
Expand Down Expand Up @@ -1142,6 +1140,7 @@ mod tests {

use quickwit_actors::Universe;
use quickwit_common::setup_logging_for_tests;
use quickwit_common::shared_consts::DEFAULT_SHARD_THROUGHPUT_LIMIT;
use quickwit_common::tower::DelayLayer;
use quickwit_config::{DocMapping, SourceConfig, INGEST_V2_SOURCE_ID};
use quickwit_ingest::{RateMibPerSec, ShardInfo};
Expand All @@ -1159,6 +1158,9 @@ mod tests {

use super::*;

const SHARD_THROUGHPUT_LIMIT_MB: f32 =
DEFAULT_SHARD_THROUGHPUT_LIMIT.as_u64() as f32 / 1_000_000f32;

#[tokio::test]
async fn test_ingest_controller_get_or_create_open_shards() {
let source_id: &'static str = "test-source";
Expand Down Expand Up @@ -1244,8 +1246,12 @@ mod tests {
ingester_pool.insert(NodeId::from("test-ingester-2"), ingester.clone());

let replication_factor = 2;
let mut controller =
IngestController::new(metastore, ingester_pool.clone(), replication_factor);
let mut controller = IngestController::new(
metastore,
ingester_pool.clone(),
replication_factor,
SHARD_THROUGHPUT_LIMIT_MB,
);

let mut model = ControlPlaneModel::default();
model.add_index(index_metadata_0.clone());
Expand Down Expand Up @@ -1425,7 +1431,12 @@ mod tests {
ingester_pool.insert(NodeId::from("test-ingester-1"), ingester.clone());

let replication_factor = 1;
let mut controller = IngestController::new(metastore, ingester_pool, replication_factor);
let mut controller = IngestController::new(
metastore,
ingester_pool,
replication_factor,
SHARD_THROUGHPUT_LIMIT_MB,
);

let mut model = ControlPlaneModel::default();
model.add_index(index_metadata_0.clone());
Expand Down Expand Up @@ -1462,7 +1473,12 @@ mod tests {
let ingester_pool = IngesterPool::default();
let replication_factor = 2;

let mut controller = IngestController::new(metastore, ingester_pool, replication_factor);
let mut controller = IngestController::new(
metastore,
ingester_pool,
replication_factor,
SHARD_THROUGHPUT_LIMIT_MB,
);
let mut model = ControlPlaneModel::default();

let index_uid = IndexUid::for_test("test-index-0", 0);
Expand Down Expand Up @@ -1507,8 +1523,12 @@ mod tests {
let ingester_pool = IngesterPool::default();
let replication_factor = 2;

let controller =
IngestController::new(metastore, ingester_pool.clone(), replication_factor);
let controller = IngestController::new(
metastore,
ingester_pool.clone(),
replication_factor,
SHARD_THROUGHPUT_LIMIT_MB,
);

let mut model = ControlPlaneModel::default();

Expand Down Expand Up @@ -1681,8 +1701,12 @@ mod tests {
let ingester_pool = IngesterPool::default();
let replication_factor = 1;

let controller =
IngestController::new(metastore, ingester_pool.clone(), replication_factor);
let controller = IngestController::new(
metastore,
ingester_pool.clone(),
replication_factor,
SHARD_THROUGHPUT_LIMIT_MB,
);

let ingester_id_0 = NodeId::from("test-ingester-0");
let mut mock_ingester_0 = MockIngesterService::new();
Expand Down Expand Up @@ -1895,8 +1919,12 @@ mod tests {
let ingester_pool = IngesterPool::default();
let replication_factor = 1;

let mut controller =
IngestController::new(metastore, ingester_pool.clone(), replication_factor);
let mut controller = IngestController::new(
metastore,
ingester_pool.clone(),
replication_factor,
SHARD_THROUGHPUT_LIMIT_MB,
);

let index_uid = IndexUid::for_test("test-index", 0);
let source_id = "test-source".to_string();
Expand Down Expand Up @@ -2023,8 +2051,12 @@ mod tests {
let ingester_pool = IngesterPool::default();
let replication_factor = 1;

let mut controller =
IngestController::new(metastore, ingester_pool.clone(), replication_factor);
let mut controller = IngestController::new(
metastore,
ingester_pool.clone(),
replication_factor,
SHARD_THROUGHPUT_LIMIT_MB,
);

let index_uid = IndexUid::for_test("test-index", 0);
let index_metadata = IndexMetadata::for_test("test-index", "ram://indexes/test-index");
Expand Down Expand Up @@ -2223,8 +2255,12 @@ mod tests {
let ingester_pool = IngesterPool::default();
let replication_factor = 1;

let mut controller =
IngestController::new(metastore, ingester_pool.clone(), replication_factor);
let mut controller = IngestController::new(
metastore,
ingester_pool.clone(),
replication_factor,
SHARD_THROUGHPUT_LIMIT_MB,
);

let index_uid = IndexUid::for_test("test-index", 0);
let source_id: SourceId = INGEST_V2_SOURCE_ID.to_string();
Expand Down Expand Up @@ -2330,8 +2366,12 @@ mod tests {
let ingester_pool = IngesterPool::default();
let replication_factor = 1;

let controller =
IngestController::new(metastore, ingester_pool.clone(), replication_factor);
let controller = IngestController::new(
metastore,
ingester_pool.clone(),
replication_factor,
SHARD_THROUGHPUT_LIMIT_MB,
);

let index_uid = IndexUid::for_test("test-index", 0);
let source_id: SourceId = "test-source".to_string();
Expand Down Expand Up @@ -2545,8 +2585,12 @@ mod tests {
let ingester_pool = IngesterPool::default();
let replication_factor = 2;

let controller =
IngestController::new(metastore, ingester_pool.clone(), replication_factor);
let controller = IngestController::new(
metastore,
ingester_pool.clone(),
replication_factor,
SHARD_THROUGHPUT_LIMIT_MB,
);

let index_uid = IndexUid::for_test("test-index", 0);
let source_id: SourceId = "test-source".to_string();
Expand Down Expand Up @@ -2624,7 +2668,12 @@ mod tests {
let ingester_pool = IngesterPool::default();
let replication_factor = 2;

let controller = IngestController::new(metastore, ingester_pool, replication_factor);
let controller = IngestController::new(
metastore,
ingester_pool,
replication_factor,
SHARD_THROUGHPUT_LIMIT_MB,
);

let mut model = ControlPlaneModel::default();

Expand Down Expand Up @@ -2694,8 +2743,12 @@ mod tests {
let metastore = MetastoreServiceClient::mocked();
let ingester_pool = IngesterPool::default();
let replication_factor = 1;
let controller =
IngestController::new(metastore, ingester_pool.clone(), replication_factor);
let controller = IngestController::new(
metastore,
ingester_pool.clone(),
replication_factor,
SHARD_THROUGHPUT_LIMIT_MB,
);

let closed_shards = controller.close_shards(Vec::new()).await;
assert_eq!(closed_shards.len(), 0);
Expand Down Expand Up @@ -2846,8 +2899,12 @@ mod tests {
let metastore = MetastoreServiceClient::from_mock(mock_metastore);
let ingester_pool = IngesterPool::default();
let replication_factor = 1;
let mut controller =
IngestController::new(metastore, ingester_pool.clone(), replication_factor);
let mut controller = IngestController::new(
metastore,
ingester_pool.clone(),
replication_factor,
SHARD_THROUGHPUT_LIMIT_MB,
);

let mut model = ControlPlaneModel::default();

Expand Down
13 changes: 7 additions & 6 deletions quickwit/quickwit-ingest/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ macro_rules! with_lock_metrics {
#[cfg(test)]
mod tests {

use bytesize::ByteSize;
use quickwit_actors::AskError;

use super::*;
Expand Down Expand Up @@ -244,11 +243,13 @@ mod tests {
let queues_dir_path = temp_dir.path().join("queues-0");
get_ingest_api_service(&queues_dir_path).await.unwrap_err();

let ingest_api_config = IngestApiConfig {
max_queue_memory_usage: ByteSize(1200),
max_queue_disk_usage: ByteSize::mib(256),
..Default::default()
};
let ingest_api_config = serde_json::from_str(
r#"{
"max_queue_memory_usage": "1200b",
"max_queue_disk_usage": "256mb"
}"#,
)
.unwrap();
init_ingest_api(&universe, &queues_dir_path, &ingest_api_config)
.await
.unwrap();
Expand Down
Loading

0 comments on commit 458b619

Please sign in to comment.