diff --git a/opentelemetry-sdk/Cargo.toml b/opentelemetry-sdk/Cargo.toml index 39928fecb3..b4c4ea28e5 100644 --- a/opentelemetry-sdk/Cargo.toml +++ b/opentelemetry-sdk/Cargo.toml @@ -29,6 +29,8 @@ tokio = { workspace = true, features = ["rt", "time"], optional = true } tokio-stream = { workspace = true, optional = true } http = { workspace = true, optional = true } tracing = {workspace = true, optional = true} +crossbeam-utils = "0.8.20" +rustc-hash = "2.0.0" [package.metadata.docs.rs] all-features = true diff --git a/opentelemetry-sdk/src/metrics/internal/hashed.rs b/opentelemetry-sdk/src/metrics/internal/hashed.rs new file mode 100644 index 0000000000..77c712d6c6 --- /dev/null +++ b/opentelemetry-sdk/src/metrics/internal/hashed.rs @@ -0,0 +1,139 @@ +use std::{ + borrow::{Borrow, Cow}, + hash::{BuildHasher, Hash, Hasher}, + ops::Deref, +}; + +use rustc_hash::FxHasher; + +/// Hash value only once, works with references and owned types. +pub(crate) struct Hashed<'a, T> +where + T: ToOwned + ?Sized, +{ + value: Cow<'a, T>, + hash: u64, +} + +impl<'a, T> Hashed<'a, T> +where + T: ToOwned + Hash + ?Sized, +{ + pub(crate) fn from_borrowed(value: &'a T) -> Self { + let hash = calc_hash(&value); + Self { + value: Cow::Borrowed(value), + hash, + } + } + + pub(crate) fn from_owned(value: ::Owned) -> Self { + let hash = calc_hash(value.borrow()); + Self { + value: Cow::Owned(value), + hash, + } + } + + pub(crate) fn into_owned(self) -> Hashed<'static, T> { + let value = self.value.into_owned(); + Hashed { + value: Cow::Owned(value), + hash: self.hash, + } + } + + pub(crate) fn into_inner_owned(self) -> T::Owned { + self.value.into_owned() + } + + pub(crate) fn hash_value(&self) -> u64 { + self.hash + } +} + +fn calc_hash(value: T) -> u64 +where + T: Hash, +{ + let mut hasher = FxHasher::default(); + value.hash(&mut hasher); + hasher.finish() +} + +impl Clone for Hashed<'_, T> +where + T: ToOwned + ?Sized, +{ + fn clone(&self) -> Self { + Self { + value: self.value.clone(), + hash: self.hash, + } + } + + fn clone_from(&mut self, source: &Self) { + self.value.clone_from(&source.value); + self.hash = source.hash; + } +} + +impl Hash for Hashed<'_, T> +where + T: ToOwned + Hash + ?Sized, +{ + fn hash(&self, state: &mut H) { + state.write_u64(self.hash); + } +} + +impl PartialEq for Hashed<'_, T> +where + T: ToOwned + PartialEq + ?Sized, +{ + fn eq(&self, other: &Self) -> bool { + self.value.as_ref() == other.value.as_ref() + } +} + +impl Eq for Hashed<'_, T> where T: ToOwned + Eq + ?Sized {} + +impl Deref for Hashed<'_, T> +where + T: ToOwned + ?Sized, +{ + type Target = T; + + fn deref(&self) -> &Self::Target { + self.value.deref() + } +} + +/// Used to make [`Hashed`] values no-op in [`HashMap`](std::collections::HashMap) or [`HashSet`](std::collections::HashSet). +/// For all other keys types (except for [`u64`]) it will panic. +#[derive(Default, Clone)] +pub(crate) struct HashedNoOpBuilder { + hashed: u64, +} + +impl Hasher for HashedNoOpBuilder { + fn finish(&self) -> u64 { + self.hashed + } + + fn write(&mut self, _bytes: &[u8]) { + panic!("Only works with `Hashed` value") + } + + fn write_u64(&mut self, i: u64) { + self.hashed = i; + } +} + +impl BuildHasher for HashedNoOpBuilder { + type Hasher = HashedNoOpBuilder; + + fn build_hasher(&self) -> Self::Hasher { + HashedNoOpBuilder::default() + } +} diff --git a/opentelemetry-sdk/src/metrics/internal/mod.rs b/opentelemetry-sdk/src/metrics/internal/mod.rs index 8b6136d7ce..49cf3b5356 100644 --- a/opentelemetry-sdk/src/metrics/internal/mod.rs +++ b/opentelemetry-sdk/src/metrics/internal/mod.rs @@ -1,27 +1,31 @@ mod aggregate; mod exponential_histogram; +mod hashed; mod histogram; mod last_value; mod precomputed_sum; mod sum; use core::fmt; -use std::collections::{HashMap, HashSet}; -use std::mem::take; +use std::collections::hash_map::Entry; +use std::collections::HashMap; +use std::mem::replace; use std::ops::{Add, AddAssign, DerefMut, Sub}; -use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, AtomicUsize, Ordering}; -use std::sync::{Arc, RwLock}; +use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering}; +use std::sync::{Arc, Mutex, RwLock}; use aggregate::is_under_cardinality_limit; pub(crate) use aggregate::{AggregateBuilder, ComputeAggregation, Measure}; +use crossbeam_utils::CachePadded; pub(crate) use exponential_histogram::{EXPO_MAX_SCALE, EXPO_MIN_SCALE}; +use hashed::{Hashed, HashedNoOpBuilder}; use once_cell::sync::Lazy; use opentelemetry::{otel_warn, KeyValue}; -use crate::metrics::AttributeSet; +use super::sort_and_dedup; -pub(crate) static STREAM_OVERFLOW_ATTRIBUTES: Lazy> = - Lazy::new(|| vec![KeyValue::new("otel.metric.overflow", "true")]); +pub(crate) static STREAM_OVERFLOW_ATTRIBUTES: Lazy> = + Lazy::new(|| Hashed::from_owned(vec![KeyValue::new("otel.metric.overflow", "true")])); pub(crate) trait Aggregator { /// A static configuration that is needed in order to initialize aggregator. @@ -43,6 +47,11 @@ pub(crate) trait Aggregator { fn clone_and_reset(&self, init: &Self::InitConfig) -> Self; } +struct NoAttribs { + tracker: A, + is_set: AtomicBool, +} + /// The storage for sums. /// /// This structure is parametrized by an `Operation` that indicates how @@ -51,14 +60,16 @@ pub(crate) struct ValueMap where A: Aggregator, { - /// Trackers store the values associated with different attribute sets. - trackers: RwLock, Arc>>, - /// Number of different attribute set stored in the `trackers` map. - count: AtomicUsize, - /// Indicates whether a value with no attributes has been stored. - has_no_attribute_value: AtomicBool, - /// Tracker for values with no attributes attached. - no_attribute_tracker: A, + shards_count: usize, + shift_by: usize, + // for performance reasons, no_attribs tracker + no_attribs: NoAttribs, + // for performance reasons, to handle attributes in the provided order + all_attribs: + Vec, Arc, HashedNoOpBuilder>>>>, + // different order of attribute keys should still map to same tracker instance + // this helps to achieve that and also enables implementing collection efficiently + sorted_attribs: Mutex, Arc, HashedNoOpBuilder>>, /// Configuration for an Aggregator config: A::InitConfig, } @@ -68,71 +79,87 @@ where A: Aggregator, { fn new(config: A::InitConfig) -> Self { + // mostly inspired by dashmap crate + let shards_count = (std::thread::available_parallelism() + .map(|v| v.get()) + .unwrap_or(1) + * 4) + .next_power_of_two(); + let shift_by = + (std::mem::size_of::() * 8) - (shards_count.trailing_zeros() as usize); + let mut all_attribs = Vec::with_capacity(shards_count); + all_attribs.resize_with(shards_count, || { + CachePadded::new(RwLock::new(HashMap::default())) + }); ValueMap { - trackers: RwLock::new(HashMap::new()), - has_no_attribute_value: AtomicBool::new(false), - no_attribute_tracker: A::create(&config), - count: AtomicUsize::new(0), + shift_by, + shards_count, + no_attribs: NoAttribs { + tracker: A::create(&config), + is_set: AtomicBool::new(false), + }, + all_attribs, + sorted_attribs: Mutex::new(HashMap::default()), config, } } fn measure(&self, value: A::PreComputedValue, attributes: &[KeyValue]) { if attributes.is_empty() { - self.no_attribute_tracker.update(value); - self.has_no_attribute_value.store(true, Ordering::Release); + self.no_attribs.tracker.update(value); + self.no_attribs.is_set.store(true, Ordering::Release); return; } - let Ok(trackers) = self.trackers.read() else { - return; - }; + let attributes = Hashed::from_borrowed(attributes); + + let shard = self.determine_shard(attributes.hash_value()); // Try to retrieve and update the tracker with the attributes in the provided order first - if let Some(tracker) = trackers.get(attributes) { - tracker.update(value); - return; - } + match self.all_attribs[shard].read() { + Ok(trackers) => { + if let Some(tracker) = trackers.get(&attributes) { + tracker.update(value); + return; + } + } + Err(_) => return, + }; - // Try to retrieve and update the tracker with the attributes sorted. - let sorted_attrs = AttributeSet::from(attributes).into_vec(); - if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) { - tracker.update(value); + // Get or create a tracker + let sorted_attrs = Hashed::from_owned(sort_and_dedup(&attributes)); + let Ok(mut sorted_trackers) = self.sorted_attribs.lock() else { return; - } + }; - // Give up the read lock before acquiring the write lock. - drop(trackers); + let sorted_count = sorted_trackers.len(); + let new_tracker = match sorted_trackers.entry(sorted_attrs) { + Entry::Occupied(occupied_entry) => occupied_entry.get().clone(), + Entry::Vacant(vacant_entry) => { + if !is_under_cardinality_limit(sorted_count) { + sorted_trackers.entry(STREAM_OVERFLOW_ATTRIBUTES.clone()) + .or_insert_with(|| { + otel_warn!( name: "ValueMap.measure", + message = "Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged." + ); + Arc::new(A::create(&self.config)) + }) + .update(value); + return; + } + let new_tracker = Arc::new(A::create(&self.config)); + vacant_entry.insert(new_tracker).clone() + } + }; + drop(sorted_trackers); - let Ok(mut trackers) = self.trackers.write() else { + new_tracker.update(value); + + // Insert new tracker, so we could find it next time + let Ok(mut all_trackers) = self.all_attribs[shard].write() else { return; }; - - // Recheck both the provided and sorted orders after acquiring the write lock - // in case another thread has pushed an update in the meantime. - if let Some(tracker) = trackers.get(attributes) { - tracker.update(value); - } else if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) { - tracker.update(value); - } else if is_under_cardinality_limit(self.count.load(Ordering::SeqCst)) { - let new_tracker = Arc::new(A::create(&self.config)); - new_tracker.update(value); - - // Insert tracker with the attributes in the provided and sorted orders - trackers.insert(attributes.to_vec(), new_tracker.clone()); - trackers.insert(sorted_attrs, new_tracker); - - self.count.fetch_add(1, Ordering::SeqCst); - } else if let Some(overflow_value) = trackers.get(STREAM_OVERFLOW_ATTRIBUTES.as_slice()) { - overflow_value.update(value); - } else { - let new_tracker = A::create(&self.config); - new_tracker.update(value); - trackers.insert(STREAM_OVERFLOW_ATTRIBUTES.clone(), Arc::new(new_tracker)); - otel_warn!( name: "ValueMap.measure", - message = "Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged." - ); - } + all_trackers.insert(attributes.into_owned(), new_tracker); } /// Iterate through all attribute sets and populate `DataPoints` in readonly mode. @@ -141,20 +168,23 @@ where where MapFn: FnMut(Vec, &A) -> Res, { - prepare_data(dest, self.count.load(Ordering::SeqCst)); - if self.has_no_attribute_value.load(Ordering::Acquire) { - dest.push(map_fn(vec![], &self.no_attribute_tracker)); - } - - let Ok(trackers) = self.trackers.read() else { - return; + let trackers = match self.sorted_attribs.lock() { + Ok(trackers) => { + // it's important to release lock as fast as possible, + // so we don't block insertion of new attribute sets + trackers.clone() + } + Err(_) => return, }; - let mut seen = HashSet::new(); - for (attrs, tracker) in trackers.iter() { - if seen.insert(Arc::as_ptr(tracker)) { - dest.push(map_fn(attrs.clone(), tracker)); - } + prepare_data(dest, trackers.len()); + + if self.no_attribs.is_set.load(Ordering::Acquire) { + dest.push(map_fn(vec![], &self.no_attribs.tracker)); + } + + for (attrs, tracker) in trackers.into_iter() { + dest.push(map_fn(attrs.clone().into_inner_owned(), &tracker)); } } @@ -164,35 +194,70 @@ where where MapFn: FnMut(Vec, A) -> Res, { - prepare_data(dest, self.count.load(Ordering::SeqCst)); - if self.has_no_attribute_value.swap(false, Ordering::AcqRel) { + // reset sorted trackers so new attributes set will be written into new hashmap + let trackers = match self.sorted_attribs.lock() { + Ok(mut trackers) => { + let new = + HashMap::with_capacity_and_hasher(trackers.len(), HashedNoOpBuilder::default()); + replace(trackers.deref_mut(), new) + } + Err(_) => return, + }; + // reset all trackers, so all attribute sets will start using new hashmap + for shard in 0..self.shards_count { + match self.all_attribs[shard].write() { + Ok(mut all_trackers) => all_trackers.clear(), + Err(_) => return, + }; + } + + prepare_data(dest, trackers.len()); + + if self.no_attribs.is_set.swap(false, Ordering::AcqRel) { dest.push(map_fn( vec![], - self.no_attribute_tracker.clone_and_reset(&self.config), + self.no_attribs.tracker.clone_and_reset(&self.config), )); } - let trackers = match self.trackers.write() { - Ok(mut trackers) => { - self.count.store(0, Ordering::SeqCst); - take(trackers.deref_mut()) - } - Err(_) => todo!(), - }; - - let mut seen = HashSet::new(); - for (attrs, tracker) in trackers.into_iter() { - if seen.insert(Arc::as_ptr(&tracker)) { - dest.push(map_fn(attrs, tracker.clone_and_reset(&self.config))); - } + for (attrs, mut tracker) in trackers.into_iter() { + // Handles special case: + // measure-thread: get inserted tracker from `sorted_attribs` (holds tracker) + // collect-thread: replace sorted_attribs (clears sorted_attribs) + // collect-thread: clear all_attribs + // collect_thread: THIS-LOOP: loop until measure-thread still holds a tracker + // measure-thread: insert tracker into `all_attribs`` + // collect_thread: exits this loop after clearing trackers + let tracker = loop { + match Arc::try_unwrap(tracker) { + Ok(inner) => { + break inner; + } + Err(reinserted) => { + tracker = reinserted; + for shard in 0..self.shards_count { + match self.all_attribs[shard].write() { + Ok(mut all_trackers) => all_trackers.clear(), + Err(_) => return, + }; + } + } + }; + }; + dest.push(map_fn(attrs.into_inner_owned(), tracker)); } } + + fn determine_shard(&self, hash: u64) -> usize { + // Leave the high 7 bits for the HashBrown SIMD tag. + ((hash as usize) << 7) >> self.shift_by + } } /// Clear and allocate exactly required amount of space for all attribute-sets fn prepare_data(data: &mut Vec, list_len: usize) { data.clear(); - let total_len = list_len + 2; // to account for no_attributes case + overflow state + let total_len = list_len + 1; // to account for no_attributes case if total_len > data.capacity() { data.reserve_exact(total_len - data.capacity()); } @@ -392,6 +457,7 @@ impl AtomicallyUpdate for f64 { #[cfg(test)] mod tests { + use super::*; #[test] diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index 7db8a63ec2..54a606243b 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -115,23 +115,27 @@ pub(crate) struct AttributeSet(Vec, u64); impl From<&[KeyValue]> for AttributeSet { fn from(values: &[KeyValue]) -> Self { - let mut seen_keys = HashSet::with_capacity(values.len()); - let vec = values - .iter() - .rev() - .filter_map(|kv| { - if seen_keys.insert(kv.key.clone()) { - Some(kv.clone()) - } else { - None - } - }) - .collect::>(); - - AttributeSet::new(vec) + AttributeSet::new(sort_and_dedup(values)) } } +pub(crate) fn sort_and_dedup(values: &[KeyValue]) -> Vec { + let mut seen_keys = HashSet::with_capacity(values.len()); + let mut vec = values + .iter() + .rev() + .filter_map(|kv| { + if seen_keys.insert(kv.key.clone()) { + Some(kv.clone()) + } else { + None + } + }) + .collect::>(); + vec.sort_unstable_by(|a, b| a.key.cmp(&b.key)); + vec +} + fn calculate_hash(values: &[KeyValue]) -> u64 { let mut hasher = DefaultHasher::new(); values.iter().fold(&mut hasher, |mut hasher, item| { @@ -142,8 +146,7 @@ fn calculate_hash(values: &[KeyValue]) -> u64 { } impl AttributeSet { - fn new(mut values: Vec) -> Self { - values.sort_unstable_by(|a, b| a.key.cmp(&b.key)); + fn new(values: Vec) -> Self { let hash = calculate_hash(&values); AttributeSet(values, hash) } @@ -152,11 +155,6 @@ impl AttributeSet { pub(crate) fn iter(&self) -> impl Iterator { self.0.iter().map(|kv| (&kv.key, &kv.value)) } - - /// Returns the underlying Vec of KeyValue pairs - pub(crate) fn into_vec(self) -> Vec { - self.0 - } } impl Hash for AttributeSet {