Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve ingest lock #5296

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions quickwit/quickwit-ingest/src/ingest_v2/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::sync::Arc;

use bytes::{BufMut, BytesMut};
use bytesize::ByteSize;
use futures::StreamExt;
use futures::{FutureExt, StreamExt};
use mrecordlog::Record;
use quickwit_common::metrics::MEMORY_METRICS;
use quickwit_common::retry::RetryParams;
Expand Down Expand Up @@ -133,8 +133,13 @@ impl FetchStreamTask {
let mut mrecord_buffer = BytesMut::with_capacity(self.batch_num_bytes);
let mut mrecord_lengths = Vec::new();

let mrecordlog_guard =
with_lock_metrics!(self.mrecordlog.read().await, "fetch", "read");
let Some(mrecordlog_guard) =
with_lock_metrics!(self.mrecordlog.read().map(Some), "fetch", "read")
else {
// we always get a Some, that layer is just added to satisfly with_lock_metrics
// needs for a Future<Item = Result | Option>
unreachable!();
};

let Ok(mrecords) = mrecordlog_guard
.as_ref()
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-ingest/src/ingest_v2/idle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl CloseIdleShardsTask {
return;
};
let Ok(mut state_guard) =
with_lock_metrics!(state.lock_partially(), "close_idle_shards", "write").await
with_lock_metrics!(state.lock_partially(), "close_idle_shards", "write")
else {
return;
};
Expand Down
21 changes: 10 additions & 11 deletions quickwit/quickwit-ingest/src/ingest_v2/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ impl Ingester {

let mut per_source_shard_ids: HashMap<(IndexUid, SourceId), Vec<ShardId>> = HashMap::new();

let state_guard = with_lock_metrics!(self.state.lock_fully().await, "reset_shards", "read")
let state_guard = with_lock_metrics!(self.state.lock_fully(), "reset_shards", "read")
.expect("ingester should be ready");

for queue_id in state_guard.mrecordlog.list_queues() {
Expand Down Expand Up @@ -325,7 +325,7 @@ impl Ingester {
match advise_reset_shards_result {
Ok(Ok(advise_reset_shards_response)) => {
let mut state_guard =
with_lock_metrics!(self.state.lock_fully().await, "reset_shards", "write")
with_lock_metrics!(self.state.lock_fully(), "reset_shards", "write")
.expect("ingester should be ready");

state_guard
Expand Down Expand Up @@ -458,8 +458,7 @@ impl Ingester {
let force_commit = commit_type == CommitTypeV2::Force;
let leader_id: NodeId = persist_request.leader_id.into();

let mut state_guard =
with_lock_metrics!(self.state.lock_fully().await, "persist", "write")?;
let mut state_guard = with_lock_metrics!(self.state.lock_fully(), "persist", "write")?;

if state_guard.status() != IngesterStatus::Ready {
persist_failures.reserve_exact(persist_request.subrequests.len());
Expand Down Expand Up @@ -929,8 +928,9 @@ impl Ingester {
&self,
init_shards_request: InitShardsRequest,
) -> IngestV2Result<InitShardsResponse> {
let mut state_guard =
with_lock_metrics!(self.state.lock_fully().await, "init_shards", "write")?;
let mut state_guard = with_lock_metrics!(self.state.lock_fully(), "init_shards", "write")?;
// we do this to allow simultaneous reborrow of multiple fields.
let state_guard = &mut *state_guard;

if state_guard.status() != IngesterStatus::Ready {
return Err(IngestV2Error::Internal("node decommissioned".to_string()));
Expand Down Expand Up @@ -984,7 +984,7 @@ impl Ingester {
)));
}
let mut state_guard =
with_lock_metrics!(self.state.lock_fully().await, "truncate_shards", "write")?;
with_lock_metrics!(self.state.lock_fully(), "truncate_shards", "write")?;

for subrequest in truncate_shards_request.subrequests {
let queue_id = subrequest.queue_id();
Expand All @@ -1011,7 +1011,7 @@ impl Ingester {
close_shards_request: CloseShardsRequest,
) -> IngestV2Result<CloseShardsResponse> {
let mut state_guard =
with_lock_metrics!(self.state.lock_partially().await, "close_shards", "write")?;
with_lock_metrics!(self.state.lock_partially(), "close_shards", "write")?;

let mut successes = Vec::with_capacity(close_shards_request.shard_pkeys.len());

Expand Down Expand Up @@ -1166,7 +1166,7 @@ impl IngesterService for Ingester {
})
.collect();
let mut state_guard =
with_lock_metrics!(self.state.lock_fully(), "retain_shards", "write").await?;
with_lock_metrics!(self.state.lock_fully(), "retain_shards", "write")?;
let remove_queue_ids: HashSet<QueueId> = state_guard
.shards
.keys()
Expand Down Expand Up @@ -1210,8 +1210,7 @@ impl EventSubscriber<ShardPositionsUpdate> for WeakIngesterState {
warn!("ingester state update failed");
return;
};
let Ok(mut state_guard) =
with_lock_metrics!(state.lock_fully().await, "gc_shards", "write")
let Ok(mut state_guard) = with_lock_metrics!(state.lock_fully(), "gc_shards", "write")
else {
error!("failed to lock the ingester state");
return;
Expand Down
13 changes: 11 additions & 2 deletions quickwit/quickwit-ingest/src/ingest_v2/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,15 @@ impl Default for IngestResultMetrics {
}
}

pub(super) struct IngestV2Metrics {
pub(crate) struct IngestV2Metrics {
pub reset_shards_operations_total: IntCounterVec<1>,
pub open_shards: IntGauge,
pub closed_shards: IntGauge,
pub shard_lt_throughput_mib: Histogram,
pub shard_st_throughput_mib: Histogram,
pub wal_acquire_lock_requests_in_flight: IntGaugeVec<2>,
pub wal_acquire_lock_request_duration_secs: HistogramVec<2>,
pub wal_hold_lock_duration_secs: HistogramVec<2>,
pub wal_disk_used_bytes: IntGauge,
pub wal_memory_used_bytes: IntGauge,
pub ingest_results: IngestResultMetrics,
Expand Down Expand Up @@ -139,6 +140,14 @@ impl Default for IngestV2Metrics {
["operation", "type"],
exponential_buckets(0.001, 2.0, 12).unwrap(),
),
wal_hold_lock_duration_secs: new_histogram_vec(
"wal_hold_lock_duration_secs",
"Duration for which a lock was held in seconds.",
"ingest",
&[],
["operation", "type"],
exponential_buckets(0.001, 2.0, 12).unwrap(),
),
wal_disk_used_bytes: new_gauge(
"wal_disk_used_bytes",
"WAL disk space used in bytes.",
Expand Down Expand Up @@ -168,4 +177,4 @@ pub(super) fn report_wal_usage(wal_usage: ResourceUsage) {
.set(wal_usage.memory_used_bytes as i64);
}

pub(super) static INGEST_V2_METRICS: Lazy<IngestV2Metrics> = Lazy::new(IngestV2Metrics::default);
pub(crate) static INGEST_V2_METRICS: Lazy<IngestV2Metrics> = Lazy::new(IngestV2Metrics::default);
2 changes: 1 addition & 1 deletion quickwit/quickwit-ingest/src/ingest_v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ mod doc_mapper;
mod fetch;
mod idle;
mod ingester;
mod metrics;
pub(crate) mod metrics;
mod models;
mod mrecord;
mod mrecordlog_utils;
Expand Down
6 changes: 2 additions & 4 deletions quickwit/quickwit-ingest/src/ingest_v2/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -454,8 +454,7 @@ impl ReplicationTask {
};
let queue_id = replica_shard.queue_id();

let mut state_guard =
with_lock_metrics!(self.state.lock_fully(), "init_replica", "write").await?;
let mut state_guard = with_lock_metrics!(self.state.lock_fully(), "init_replica", "write")?;

match state_guard.mrecordlog.create_queue(&queue_id).await {
Ok(_) => {}
Expand Down Expand Up @@ -527,8 +526,7 @@ impl ReplicationTask {
// queue in the WAL and should be deleted.
let mut shards_to_delete: HashSet<QueueId> = HashSet::new();

let mut state_guard =
with_lock_metrics!(self.state.lock_fully(), "replicate", "write").await?;
let mut state_guard = with_lock_metrics!(self.state.lock_fully(), "replicate", "write")?;

if state_guard.status() != IngesterStatus::Ready {
replicate_failures.reserve_exact(replicate_request.subrequests.len());
Expand Down
65 changes: 60 additions & 5 deletions quickwit/quickwit-ingest/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,49 @@ impl CommitType {
}
}

struct TimedMutexGuard<T> {
guard: T,
acquired_at: std::time::Instant,
purpose: [&'static str; 2],
Copy link
Contributor

@fulmicoton fulmicoton Sep 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one of the sad things with relying on dynamic labels like this, is that you end up doing one hashmap lookup very time you need to access the counter (here once per lock).

It is probably not catastrophic in this case, but here is the alternative.

have a histogram: directly on the TimedMutexGuard

histogram: &'static Histogram,

You can then cache the Histogram counter in different ways. In ingest v2 you can see the pattern done here. The value are stored in the metrics.rs directly. https://github.com/quickwit-oss/quickwit/blob/main/quickwit/quickwit-ingest/src/ingest_v2/metrics.rs#L59-L74.

You could also cache it closer to the usage site if you prefer.

I kind of like to keep the MetricsVec definition in metrics.rs, because it really helps to maintain some consistency in the naming/help etc. and it makes it easier to search for the relevant metrics name...

But it is less critical for metrics labels...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i like this pattern of statically defining things, however it's a kind of a pain, and a large typo hazard to implement. Having some sort of macro to derive "Build a static (histogram|gauge|counter) vec from a dynamic vec" would be nice, and would likely make us use this more

}

use std::ops::{Deref, DerefMut};

impl<T> Deref for TimedMutexGuard<T> {
type Target = T;

fn deref(&self) -> &T {
&self.guard
}
}

impl<T> DerefMut for TimedMutexGuard<T> {
fn deref_mut(&mut self) -> &mut T {
&mut self.guard
}
}

impl<T> Drop for TimedMutexGuard<T> {
fn drop(&mut self) {
let elapsed = self.acquired_at.elapsed();

crate::ingest_v2::metrics::INGEST_V2_METRICS
.wal_hold_lock_duration_secs
.with_label_values(self.purpose)
.observe(elapsed.as_secs_f64());

if elapsed > std::time::Duration::from_secs(1) {
let purpose = self.purpose.join("::");
quickwit_common::rate_limited_warn!(
limit_per_min = 6,
"hold mutext for {}ms for {}",
elapsed.as_millis(),
purpose,
);
}
}
}

#[macro_export]
macro_rules! with_lock_metrics {
($future:expr, $($label:tt),*) => {
Expand All @@ -134,14 +177,19 @@ macro_rules! with_lock_metrics {
.with_label_values([$($label),*])
.inc();


let now = std::time::Instant::now();
let guard = $future;
let guard = $future.await;

let elapsed = now.elapsed();
let now_after = std::time::Instant::now();
let elapsed = now_after.duration_since(now);
if elapsed > std::time::Duration::from_secs(1) {
let text_label = with_lock_metrics!(@concat $($label,)*);
quickwit_common::rate_limited_warn!(
limit_per_min=6,
"lock acquisition took {}ms", elapsed.as_millis()
"lock acquisition took {}ms for {}",
elapsed.as_millis(),
text_label,
);
}
$crate::ingest_v2::metrics::INGEST_V2_METRICS
Expand All @@ -153,9 +201,16 @@ macro_rules! with_lock_metrics {
.with_label_values([$($label),*])
.observe(elapsed.as_secs_f64());

guard
guard.map(|guard| $crate::TimedMutexGuard {
guard,
acquired_at: now_after,
purpose: [$($label),*],
})
}
}
};
(@concat $label1:tt, $($label:tt,)*) => {
concat!($label1, $("::", $label),*)
};
}

#[cfg(test)]
Expand Down
Loading