Skip to content

Commit

Permalink
lower contention on AggregationLimits (#2394)
Browse files Browse the repository at this point in the history
PR quickwit-oss/quickwit#4962 fixes an issue
where the AggregationLimits are not passed correctly. Since the
AggregationLimits are shared properly we run into contention issues.

This PR includes some straightforward improvement to reduce contention,
by only calling if the memory changed and avoiding the second read.

We probably need some sharding with multiple counters or local caching before updating the
global after some threshold.
  • Loading branch information
PSeitz authored May 15, 2024
1 parent a795904 commit 5b7cca1
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 16 deletions.
22 changes: 12 additions & 10 deletions src/aggregation/agg_limits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,11 @@ impl AggregationLimits {
}
}

pub(crate) fn add_memory_consumed(&self, num_bytes: u64) -> crate::Result<()> {
self.memory_consumption
.fetch_add(num_bytes, Ordering::Relaxed);
validate_memory_consumption(&self.memory_consumption, self.memory_limit)?;
pub(crate) fn add_memory_consumed(&self, add_num_bytes: u64) -> crate::Result<()> {
let prev_value = self
.memory_consumption
.fetch_add(add_num_bytes, Ordering::Relaxed);
validate_memory_consumption(prev_value + add_num_bytes, self.memory_limit)?;
Ok(())
}

Expand All @@ -94,11 +95,11 @@ impl AggregationLimits {
}

fn validate_memory_consumption(
memory_consumption: &AtomicU64,
memory_consumption: u64,
memory_limit: ByteCount,
) -> Result<(), AggregationError> {
// Load the estimated memory consumed by the aggregations
let memory_consumed: ByteCount = memory_consumption.load(Ordering::Relaxed).into();
let memory_consumed: ByteCount = memory_consumption.into();
if memory_consumed > memory_limit {
return Err(AggregationError::MemoryExceeded {
limit: memory_limit,
Expand All @@ -118,10 +119,11 @@ pub struct ResourceLimitGuard {
}

impl ResourceLimitGuard {
pub(crate) fn add_memory_consumed(&self, num_bytes: u64) -> crate::Result<()> {
self.memory_consumption
.fetch_add(num_bytes, Ordering::Relaxed);
validate_memory_consumption(&self.memory_consumption, self.memory_limit)?;
pub(crate) fn add_memory_consumed(&self, add_num_bytes: u64) -> crate::Result<()> {
let prev_value = self
.memory_consumption
.fetch_add(add_num_bytes, Ordering::Relaxed);
validate_memory_consumption(prev_value + add_num_bytes, self.memory_limit)?;
Ok(())
}
}
Expand Down
8 changes: 5 additions & 3 deletions src/aggregation/bucket/histogram/histogram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,9 +331,11 @@ impl SegmentAggregationCollector for SegmentHistogramCollector {
}

let mem_delta = self.get_memory_consumption() - mem_pre;
bucket_agg_accessor
.limits
.add_memory_consumed(mem_delta as u64)?;
if mem_delta > 0 {
bucket_agg_accessor
.limits
.add_memory_consumed(mem_delta as u64)?;
}

Ok(())
}
Expand Down
8 changes: 5 additions & 3 deletions src/aggregation/bucket/term_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,9 +324,11 @@ impl SegmentAggregationCollector for SegmentTermCollector {
}

let mem_delta = self.get_memory_consumption() - mem_pre;
bucket_agg_accessor
.limits
.add_memory_consumed(mem_delta as u64)?;
if mem_delta > 0 {
bucket_agg_accessor
.limits
.add_memory_consumed(mem_delta as u64)?;
}

Ok(())
}
Expand Down

0 comments on commit 5b7cca1

Please sign in to comment.