Skip to content

Commit

Permalink
Make io limits for merges a node-level parameter. (#4440)
Browse files Browse the repository at this point in the history
This PR also removes the index label from the IO metrics.
It also removes the limits for the delete pipeline.

Closes #4439
  • Loading branch information
fulmicoton authored Jan 23, 2024
1 parent 0fa6348 commit 0c283ab
Show file tree
Hide file tree
Showing 14 changed files with 82 additions and 62 deletions.
48 changes: 34 additions & 14 deletions quickwit/quickwit-common/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ use std::time::Duration;

use async_speed_limit::clock::StandardClock;
use async_speed_limit::limiter::Consume;
use async_speed_limit::Limiter;
pub use async_speed_limit::Limiter;
use bytesize::ByteSize;
use once_cell::sync::Lazy;
use pin_project::pin_project;
use prometheus::IntCounter;
Expand All @@ -53,7 +54,7 @@ fn truncate_bytes(bytes: &[u8]) -> &[u8] {
}

struct IoMetrics {
write_bytes: IntCounterVec<2>,
write_bytes: IntCounterVec<1>,
}

impl Default for IoMetrics {
Expand All @@ -63,7 +64,7 @@ impl Default for IoMetrics {
"Number of bytes written by a given component in [indexer, merger, deleter, \
split_downloader_{merge,delete}]",
"quickwit",
["index", "component"],
["component"],
);
Self { write_bytes }
}
Expand All @@ -85,9 +86,15 @@ const REFILL_DURATION: Duration = if cfg!(test) {
Duration::from_millis(100)
};

pub fn limiter(throughput: ByteSize) -> Limiter {
Limiter::builder(throughput.as_u64() as f64)
.refill(REFILL_DURATION)
.build()
}

#[derive(Clone)]
pub struct IoControls {
throughput_limiter: Limiter,
throughput_limiter_opt: Option<Limiter>,
bytes_counter: IntCounter,
progress: Progress,
kill_switch: KillSwitch,
Expand All @@ -98,7 +105,7 @@ impl Default for IoControls {
let default_bytes_counter =
IntCounter::new("default_write_num_bytes", "Default write counter.").unwrap();
IoControls {
throughput_limiter: Limiter::new(f64::INFINITY),
throughput_limiter_opt: None,
progress: Progress::default(),
kill_switch: KillSwitch::default(),
bytes_counter: default_bytes_counter,
Expand Down Expand Up @@ -131,13 +138,20 @@ impl IoControls {
Ok(guard)
}

pub fn set_index_and_component(mut self, index: &str, component: &str) -> Self {
self.bytes_counter = IO_METRICS.write_bytes.with_label_values([index, component]);
pub fn set_component(mut self, component: &str) -> Self {
self.bytes_counter = IO_METRICS.write_bytes.with_label_values([component]);
self
}

pub fn set_throughput_limit(mut self, throughput: f64) -> Self {
self.throughput_limiter = Limiter::builder(throughput).refill(REFILL_DURATION).build();
pub fn set_throughput_limit(self, throughput: ByteSize) -> Self {
let throughput_limiter = Limiter::builder(throughput.as_u64() as f64)
.refill(REFILL_DURATION)
.build();
self.set_throughput_limiter_opt(Some(throughput_limiter))
}

pub fn set_throughput_limiter_opt(mut self, throughput_limiter_opt: Option<Limiter>) -> Self {
self.throughput_limiter_opt = throughput_limiter_opt;
self
}

Expand All @@ -157,7 +171,9 @@ impl IoControls {
}
fn consume_blocking(&self, num_bytes: usize) -> io::Result<()> {
let _guard = self.check_if_alive()?;
self.throughput_limiter.blocking_consume(num_bytes);
if let Some(throughput_limiter) = &self.throughput_limiter_opt {
throughput_limiter.blocking_consume(num_bytes);
}
self.bytes_counter.inc_by(num_bytes as u64);
Ok(())
}
Expand Down Expand Up @@ -212,9 +228,12 @@ impl<A: IoControlsAccess, W: AsyncWrite> ControlledWrite<A, W> {
if len > 0 {
let waiter = this.io_controls_access.apply(|io_controls| {
io_controls.bytes_counter.inc_by(len as u64);
io_controls.throughput_limiter.consume(len)
io_controls
.throughput_limiter_opt
.as_ref()
.map(|limiter| limiter.consume(len))
});
*this.waiter = Some(waiter)
*this.waiter = waiter
}
}
res
Expand Down Expand Up @@ -330,14 +349,15 @@ mod tests {
use std::io::{IoSlice, Write};
use std::time::Duration;

use bytesize::ByteSize;
use tokio::io::{sink, AsyncWriteExt};
use tokio::time::Instant;

use crate::io::{IoControls, IoControlsAccess};

#[tokio::test]
async fn test_controlled_writer_limited_async() {
let io_controls = IoControls::default().set_throughput_limit(2_000_000f64);
let io_controls = IoControls::default().set_throughput_limit(ByteSize::mb(2));
let mut controlled_write = io_controls.clone().wrap_write(sink());
let buf = vec![44u8; 1_000];
let start = Instant::now();
Expand Down Expand Up @@ -370,7 +390,7 @@ mod tests {

#[test]
fn test_controlled_writer_limited_sync() {
let io_controls = IoControls::default().set_throughput_limit(2_000_000f64);
let io_controls = IoControls::default().set_throughput_limit(ByteSize::mb(2));
let mut controlled_write = io_controls.clone().wrap_write(std::io::sink());
let buf = vec![44u8; 1_000];
let start = Instant::now();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@
"enable_otlp_endpoint": true,
"split_store_max_num_bytes": "1T",
"split_store_max_num_splits": 10000,
"max_concurrent_split_uploads": 8
"max_concurrent_split_uploads": 8,
o
},
"ingest_api": {
"replication_factor": 2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ enable_otlp_endpoint = true
split_store_max_num_bytes = "1T"
split_store_max_num_splits = 10_000
max_concurrent_split_uploads = 8
max_merge_write_throughput = 100mb

[ingest_api]
replication_factor = 2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ indexer:
split_store_max_num_bytes: 1T
split_store_max_num_splits: 10000
max_concurrent_split_uploads: 8
max_merge_write_throughput: 100mb

ingest_api:
replication_factor: 2
Expand Down
20 changes: 14 additions & 6 deletions quickwit/quickwit-config/src/index_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use quickwit_doc_mapper::{
use quickwit_proto::types::IndexId;
use serde::{Deserialize, Serialize};
pub use serialize::load_index_config_from_user_config;
use tracing::warn;

use crate::index_config::serialize::VersionedIndexConfig;
use crate::merge_policy_config::{MergePolicyConfig, StableLogMergePolicyConfig};
Expand Down Expand Up @@ -97,14 +98,11 @@ pub struct IndexingResources {
#[schema(value_type = String, default = "2 GB")]
#[serde(default = "IndexingResources::default_heap_size")]
pub heap_size: ByteSize,
/// Sets the maximum write IO throughput in bytes/sec for the merge and delete pipelines.
/// The IO limit is applied both to the downloader and to the merge executor.
/// On hardware where IO is limited, this parameter can help limiting the impact of
/// merges/deletes on indexing.
// DEPRECATED: See #4439
#[schema(value_type = String)]
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
pub max_merge_write_throughput: Option<ByteSize>,
#[serde(skip_serializing)]
max_merge_write_throughput: Option<ByteSize>,
}

impl PartialEq for IndexingResources {
Expand All @@ -125,6 +123,16 @@ impl IndexingResources {
..Default::default()
}
}

pub fn validate(&self) -> anyhow::Result<()> {
if self.max_merge_write_throughput.is_some() {
warn!(
"`max_merge_write_throughput` is deprecated and will be removed in a future \
version. See #4439. A global limit now exists in indexer configuration."
);
}
Ok(())
}
}

impl Default for IndexingResources {
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-config/src/index_config/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ impl IndexConfigForSerialization {
build_doc_mapper(&self.doc_mapping, &self.search_settings)?;

self.indexing_settings.merge_policy.validate()?;
self.indexing_settings.resources.validate()?;

Ok(IndexConfig {
index_id: self.index_id,
Expand Down
7 changes: 7 additions & 0 deletions quickwit/quickwit-config/src/node_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ pub struct IndexerConfig {
pub split_store_max_num_splits: usize,
#[serde(default = "IndexerConfig::default_max_concurrent_split_uploads")]
pub max_concurrent_split_uploads: usize,
/// Limits the IO throughput of the `SplitDownloader` and the `MergeExecutor`.
/// On hardware where IO is constrained, it makes sure that Merges (a batch operation)
/// does not starve indexing itself (as it is a latency sensitive operation).
#[serde(default)]
pub max_merge_write_throughput: Option<ByteSize>,
/// Enables the OpenTelemetry exporter endpoint to ingest logs and traces via the OpenTelemetry
/// Protocol (OTLP).
#[serde(default = "IndexerConfig::default_enable_otlp_endpoint")]
Expand Down Expand Up @@ -143,6 +148,7 @@ impl IndexerConfig {
split_store_max_num_splits: 3,
max_concurrent_split_uploads: 4,
cpu_capacity: PIPELINE_FULL_CAPACITY * 4u32,
max_merge_write_throughput: None,
};
Ok(indexer_config)
}
Expand All @@ -157,6 +163,7 @@ impl Default for IndexerConfig {
split_store_max_num_splits: Self::default_split_store_max_num_splits(),
max_concurrent_split_uploads: Self::default_max_concurrent_split_uploads(),
cpu_capacity: Self::default_cpu_capacity(),
max_merge_write_throughput: None,
}
}
}
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-config/src/node_config/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,7 @@ mod tests {
max_concurrent_split_uploads: 8,
cpu_capacity: IndexerConfig::default_cpu_capacity(),
enable_cooperative_indexing: false,
max_merge_write_throughput: None,
}
);
assert_eq!(
Expand Down
5 changes: 1 addition & 4 deletions quickwit/quickwit-indexing/src/actors/index_serializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,7 @@ impl Handler<IndexedSplitBatchBuilder> for IndexSerializer {
let io_controls = IoControls::default()
.set_progress(ctx.progress().clone())
.set_kill_switch(ctx.kill_switch().clone())
.set_index_and_component(
split_builder.split_attrs.pipeline_id.index_uid.index_id(),
"index_serializer",
);
.set_component("index_serializer");
controlled_directory.set_io_controls(io_controls);
}
let split = split_builder.finalize()?;
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-indexing/src/actors/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ impl IndexerState {
let io_controls = IoControls::default()
.set_progress(ctx.progress().clone())
.set_kill_switch(ctx.kill_switch().clone())
.set_index_and_component(self.pipeline_id.index_uid.index_id(), "indexer");
.set_component("indexer");

let indexed_split = IndexedSplitBuilder::new_in_dir(
self.pipeline_id.clone(),
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -878,7 +878,7 @@ mod tests {
split_store: split_store.clone(),
merge_policy: default_merge_policy(),
max_concurrent_split_uploads: 2,
merge_max_io_num_bytes_per_sec: None,
merge_io_throughput_limiter_opt: None,
event_broker: Default::default(),
};
let merge_pipeline = MergePipeline::new(merge_pipeline_params, universe.spawn_ctx());
Expand Down
12 changes: 7 additions & 5 deletions quickwit/quickwit-indexing/src/actors/indexing_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ use quickwit_actors::{
};
use quickwit_cluster::Cluster;
use quickwit_common::fs::get_cache_directory_path;
use quickwit_common::io::Limiter;
use quickwit_common::pubsub::EventBroker;
use quickwit_common::temp_dir;
use quickwit_common::{io, temp_dir};
use quickwit_config::{
build_doc_mapper, IndexConfig, IndexerConfig, SourceConfig, INGEST_API_SOURCE_ID,
};
Expand Down Expand Up @@ -128,6 +129,7 @@ pub struct IndexingService {
max_concurrent_split_uploads: usize,
merge_pipeline_handles: HashMap<MergePipelineId, MergePipelineHandle>,
cooperative_indexing_permits: Option<Arc<Semaphore>>,
merge_io_throughput_limiter_opt: Option<Limiter>,
event_broker: EventBroker,
}

Expand Down Expand Up @@ -160,6 +162,8 @@ impl IndexingService {
indexer_config.split_store_max_num_splits,
indexer_config.split_store_max_num_bytes,
);
let merge_io_throughput_limiter_opt =
indexer_config.max_merge_write_throughput.map(io::limiter);
let split_cache_dir_path = get_cache_directory_path(&data_dir_path);
let local_split_store =
LocalSplitStore::open(split_cache_dir_path, split_store_space_quota).await?;
Expand All @@ -185,6 +189,7 @@ impl IndexingService {
counters: Default::default(),
max_concurrent_split_uploads: indexer_config.max_concurrent_split_uploads,
merge_pipeline_handles: HashMap::new(),
merge_io_throughput_limiter_opt,
cooperative_indexing_permits,
event_broker,
})
Expand Down Expand Up @@ -293,10 +298,7 @@ impl IndexingService {
metastore: self.metastore.clone(),
split_store: split_store.clone(),
merge_policy: merge_policy.clone(),
merge_max_io_num_bytes_per_sec: index_config
.indexing_settings
.resources
.max_merge_write_throughput,
merge_io_throughput_limiter_opt: self.merge_io_throughput_limiter_opt.clone(),
max_concurrent_split_uploads: self.max_concurrent_split_uploads,
event_broker: self.event_broker.clone(),
};
Expand Down
26 changes: 7 additions & 19 deletions quickwit/quickwit-indexing/src/actors/merge_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,11 @@ use std::sync::Arc;
use std::time::{Duration, Instant};

use async_trait::async_trait;
use bytesize::ByteSize;
use quickwit_actors::{
Actor, ActorContext, ActorExitStatus, ActorHandle, Handler, Health, Inbox, Mailbox,
SpawnContext, Supervisable, HEARTBEAT,
};
use quickwit_common::io::IoControls;
use quickwit_common::io::{IoControls, Limiter};
use quickwit_common::pubsub::EventBroker;
use quickwit_common::temp_dir::TempDirectory;
use quickwit_common::KillSwitch;
Expand Down Expand Up @@ -270,25 +269,14 @@ impl MergePipeline {
.set_kill_switch(self.kill_switch.clone())
.spawn(merge_packager);

let max_merge_write_throughput: f64 = self
.params
.merge_max_io_num_bytes_per_sec
.as_ref()
.map(|bytes_per_sec| bytes_per_sec.as_u64() as f64)
.unwrap_or(f64::INFINITY);

let split_downloader_io_controls = IoControls::default()
.set_throughput_limit(max_merge_write_throughput)
.set_index_and_component(
self.params.pipeline_id.index_uid.index_id(),
"split_downloader_merge",
);
.set_throughput_limiter_opt(self.params.merge_io_throughput_limiter_opt.clone())
.set_component("split_downloader_merge");

// The merge and split download share the same throughput limiter.
// This is how cloning the `IoControls` works.
let merge_executor_io_controls = split_downloader_io_controls
.clone()
.set_index_and_component(self.params.pipeline_id.index_uid.index_id(), "merger");
let merge_executor_io_controls =
split_downloader_io_controls.clone().set_component("merger");

let merge_executor = MergeExecutor::new(
self.params.pipeline_id.clone(),
Expand Down Expand Up @@ -473,7 +461,7 @@ pub struct MergePipelineParams {
pub split_store: IndexingSplitStore,
pub merge_policy: Arc<dyn MergePolicy>,
pub max_concurrent_split_uploads: usize, //< TODO share with the indexing pipeline.
pub merge_max_io_num_bytes_per_sec: Option<ByteSize>,
pub merge_io_throughput_limiter_opt: Option<Limiter>,
pub event_broker: EventBroker,
}

Expand Down Expand Up @@ -533,7 +521,7 @@ mod tests {
split_store,
merge_policy: default_merge_policy(),
max_concurrent_split_uploads: 2,
merge_max_io_num_bytes_per_sec: None,
merge_io_throughput_limiter_opt: None,
event_broker: Default::default(),
};
let pipeline = MergePipeline::new(pipeline_params, universe.spawn_ctx());
Expand Down
Loading

0 comments on commit 0c283ab

Please sign in to comment.