diff --git a/Cargo.lock b/Cargo.lock index 73460105cdd..2abfc3a9db6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2755,13 +2755,18 @@ version = "0.1.0" dependencies = [ "anyhow", "arrow", - "arrow_util", + "async-trait", + "datafusion", + "indexmap 2.6.0", "influxdb3_catalog", "influxdb3_id", "influxdb3_wal", "influxdb3_write", "iox_time", + "parking_lot", "schema", + "thiserror 1.0.69", + "tokio", ] [[package]] diff --git a/influxdb3_cache/Cargo.toml b/influxdb3_cache/Cargo.toml index 4b7fb0f0de9..21c42be9972 100644 --- a/influxdb3_cache/Cargo.toml +++ b/influxdb3_cache/Cargo.toml @@ -18,10 +18,14 @@ influxdb3_wal = { path = "../influxdb3_wal" } # crates.io dependencies anyhow.workspace = true arrow.workspace = true +async-trait.workspace = true +datafusion.workspace = true +indexmap.workspace = true +parking_lot.workspace = true +thiserror.workspace = true +tokio.workspace = true [dev-dependencies] -# Core Crates -arrow_util.workspace = true # Local deps influxdb3_write = { path = "../influxdb3_write" } diff --git a/influxdb3_cache/src/meta_cache/cache.rs b/influxdb3_cache/src/meta_cache/cache.rs new file mode 100644 index 00000000000..5a5f17ca754 --- /dev/null +++ b/influxdb3_cache/src/meta_cache/cache.rs @@ -0,0 +1,513 @@ +use std::{ + collections::{BTreeMap, BTreeSet, BinaryHeap}, + num::NonZeroUsize, + sync::Arc, + time::Duration, +}; + +use anyhow::{bail, Context}; +use arrow::{ + array::{ArrayRef, RecordBatch, StringViewBuilder}, + datatypes::{DataType, Field, SchemaBuilder, SchemaRef}, + error::ArrowError, +}; +use indexmap::IndexMap; +use influxdb3_catalog::catalog::TableDefinition; +use influxdb3_id::ColumnId; +use influxdb3_wal::{FieldData, Row}; +use iox_time::TimeProvider; +use schema::{InfluxColumnType, InfluxFieldType}; + +/// A metadata cache for storing distinct values for a set of columns in a table +#[derive(Debug)] +pub(crate) struct MetaCache { + time_provider: Arc, + /// The maximum number of unique value combinations in the cache + max_cardinality: usize, + /// The maximum age for entries in the cache + max_age: Duration, + /// The fixed Arrow schema used to produce record batches from the cache + schema: SchemaRef, + /// Holds current state of the cache + state: MetaCacheState, + /// The identifiers of the columns used in the cache + column_ids: Vec, + /// The cache data, stored in a tree + data: Node, +} + +/// Type for tracking the current state of a [`MetaCache`] +#[derive(Debug, Default)] +struct MetaCacheState { + /// The current number of unique value combinations in the cache + cardinality: usize, +} + +/// Arguments to create a new [`MetaCache`] +#[derive(Debug)] +pub struct CreateMetaCacheArgs { + pub table_def: Arc, + pub max_cardinality: MaxCardinality, + pub max_age: MaxAge, + pub column_ids: Vec, +} + +#[derive(Debug, Clone, Copy)] +pub struct MaxCardinality(NonZeroUsize); + +impl TryFrom for MaxCardinality { + type Error = anyhow::Error; + + fn try_from(value: usize) -> Result { + Ok(Self( + NonZeroUsize::try_from(value).context("invalid size provided")?, + )) + } +} + +impl Default for MaxCardinality { + fn default() -> Self { + Self(NonZeroUsize::new(100_000).unwrap()) + } +} + +impl From for usize { + fn from(value: MaxCardinality) -> Self { + value.0.into() + } +} + +#[derive(Debug, Clone, Copy)] +pub struct MaxAge(Duration); + +impl Default for MaxAge { + fn default() -> Self { + Self(Duration::from_secs(24 * 60 * 60)) + } +} + +impl From for Duration { + fn from(value: MaxAge) -> Self { + value.0 + } +} + +impl From for MaxAge { + fn from(duration: Duration) -> Self { + Self(duration) + } +} + +impl MaxAge { + pub(crate) fn as_seconds(&self) -> u64 { + self.0.as_secs() + } +} + +impl MetaCache { + /// Create a new [`MetaCache`] + /// + /// Must pass a non-empty set of [`ColumnId`]s which correspond to valid columns in the provided + /// [`TableDefinition`]. + pub(crate) fn new( + time_provider: Arc, + CreateMetaCacheArgs { + table_def, + max_cardinality, + max_age, + column_ids, + }: CreateMetaCacheArgs, + ) -> Result { + if column_ids.is_empty() { + bail!("must pass a non-empty set of column ids"); + } + let mut builder = SchemaBuilder::new(); + for id in &column_ids { + let col = table_def.columns.get(id).with_context(|| { + format!("invalid column id ({id}) encountered while creating metadata cache") + })?; + let data_type = match col.data_type { + InfluxColumnType::Tag | InfluxColumnType::Field(InfluxFieldType::String) => { + DataType::Utf8View + } + other => bail!( + "cannot use a column of type {other} in a metadata cache, only \ + tags and string fields can be used" + ), + }; + + builder.push(Arc::new(Field::new(col.name.as_ref(), data_type, false))); + } + Ok(Self { + time_provider, + max_cardinality: max_cardinality.into(), + max_age: max_age.into(), + state: MetaCacheState::default(), + schema: Arc::new(builder.finish()), + column_ids, + data: Node::default(), + }) + } + + /// Push a [`Row`] from the WAL into the cache, if the row contains all of the cached columns. + pub(crate) fn push(&mut self, row: &Row) { + let mut values = Vec::with_capacity(self.column_ids.len()); + for id in &self.column_ids { + let Some(value) = row + .fields + .iter() + .find(|f| &f.id == id) + .map(|f| Value::from(&f.value)) + else { + // ignore the row if it does not contain all columns in the cache: + return; + }; + values.push(value); + } + let mut target = &mut self.data; + let mut val_iter = values.into_iter().peekable(); + let mut is_new = false; + while let (Some(value), peek) = (val_iter.next(), val_iter.peek()) { + let (last_seen, node) = target.0.entry(value).or_insert_with(|| { + is_new = true; + (row.time, peek.is_some().then(Node::default)) + }); + *last_seen = row.time; + if let Some(node) = node { + target = node; + } else { + break; + } + } + if is_new { + self.state.cardinality += 1; + } + } + + /// Gather a record batch from a cache given the set of predicates + /// + /// This assumes the predicates are well behaved, and validated before being passed in. For example, + /// there cannot be multiple predicates on a single column; the caller needs to validate/transform + /// the incoming predicates from Datafusion before calling. + /// + /// Entries in the cache that have not been seen since before the `max_age` of the cache will be + /// filtered out of the result. + pub(crate) fn to_record_batch( + &self, + predicates: &IndexMap, + ) -> Result { + // predicates may not be provided for all columns in the cache, or not be provided in the + // order of columns in the cache. This re-orders them to the correct order, and fills in any + // gaps with None. + let predicates: Vec> = self + .column_ids + .iter() + .map(|id| predicates.get(id)) + .collect(); + + // Uses a [`StringViewBuilder`] to compose the set of [`RecordBatch`]es. This is convenient for + // the sake of nested caches, where a predicate on a higher branch in the cache will need to have + // its value in the outputted batches duplicated. + let mut builders: Vec = (0..self.column_ids.len()) + .map(|_| StringViewBuilder::new()) + .collect(); + + let expired_time_ns = self.expired_time_ns(); + let _ = + self.data + .evaluate_predicates(expired_time_ns, predicates.as_slice(), &mut builders); + + RecordBatch::try_new( + Arc::clone(&self.schema), + builders + .into_iter() + .map(|mut builder| Arc::new(builder.finish()) as ArrayRef) + .collect(), + ) + } + + /// Prune nodes from within the cache + /// + /// This first prunes entries that are older than the `max_age` of the cache. If the cardinality + /// of the cache is still over its `max_cardinality`, it will do another pass to bring the cache + /// size down. + pub(crate) fn prune(&mut self) { + let before_time_ns = self.expired_time_ns(); + let _ = self.data.remove_before(before_time_ns); + self.state.cardinality = self.data.cardinality(); + if self.state.cardinality > self.max_cardinality { + let n_to_remove = self.state.cardinality - self.max_cardinality; + self.data.remove_n_oldest(n_to_remove); + self.state.cardinality = self.data.cardinality(); + } + } + + /// Get the nanosecond timestamp as an `i64`, before which, entries that have not been seen + /// since are considered expired. + fn expired_time_ns(&self) -> i64 { + self.time_provider + .now() + .checked_sub(self.max_age) + .expect("max age on cache should not cause an overflow") + .timestamp_nanos() + } + + /// Get the arrow [`SchemaRef`] for this cache + pub(crate) fn arrow_schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } + + /// Compare the configuration of a given cache, producing a helpful error message if they differ + pub(crate) fn compare_config(&self, other: &Self) -> Result<(), anyhow::Error> { + if self.max_cardinality != other.max_cardinality { + bail!( + "incompatible `max_cardinality`, expected {}, got {}", + self.max_cardinality, + other.max_cardinality + ); + } + if self.max_age != other.max_age { + bail!( + "incompatible `max_age`, expected {}, got {}", + self.max_age.as_secs(), + other.max_age.as_secs() + ) + } + if self.column_ids != other.column_ids { + bail!( + "incompatible column id selection, expected {}, got {}", + self.column_ids + .iter() + .map(|id| id.to_string()) + .collect::>() + .join(","), + other + .column_ids + .iter() + .map(|id| id.to_string()) + .collect::>() + .join(","), + ) + } + + Ok(()) + } +} + +/// A node in the `data` tree of a [`MetaCache`] +/// +/// Recursive struct holding a [`BTreeMap`] whose keys are the values nested under this node, and +/// whose values hold the last seen time as an [`i64`] of each value, and an optional reference to +/// the node in the next level of the tree. +#[derive(Debug, Default)] +struct Node(BTreeMap)>); + +impl Node { + /// Remove all elements before the given nanosecond timestamp returning `true` if the resulting + /// node is empty. + fn remove_before(&mut self, time_ns: i64) -> bool { + self.0.retain(|_, (last_seen, node)| { + // Note that for a branch node, the `last_seen` time will be the greatest of + // all its nodes, so an entire branch can be removed if its value has not been seen since + // before `time_ns`, hence the short-curcuit here: + *last_seen > time_ns + || node + .as_mut() + .is_some_and(|node| node.remove_before(time_ns)) + }); + self.0.is_empty() + } + + /// Remove the `n_to_remove` oldest entries from the cache + fn remove_n_oldest(&mut self, n_to_remove: usize) { + let mut times = BinaryHeap::with_capacity(n_to_remove); + self.find_n_oldest(n_to_remove, &mut times); + self.remove_before(*times.peek().unwrap()); + } + + /// Use a binary heap to find the time before which all nodes should be removed + fn find_n_oldest(&self, n_to_remove: usize, times_heap: &mut BinaryHeap) { + self.0 + .values() + // do not need to add the last_seen time for a branch node to the + // heap since it will be equal to the greatest of that of its leaves + .for_each(|(last_seen, node)| { + if let Some(node) = node { + node.find_n_oldest(n_to_remove, times_heap) + } else if times_heap.len() < n_to_remove { + times_heap.push(*last_seen); + } else if times_heap.peek().is_some_and(|newest| last_seen < newest) { + times_heap.pop(); + times_heap.push(*last_seen); + } + }); + } + + /// Get the total count of unique value combinations nested under this node + /// + /// Note that this includes expired elements, which still contribute to the total size of the + /// cache until they are pruned. + fn cardinality(&self) -> usize { + self.0 + .values() + .map(|(_, node)| node.as_ref().map_or(1, |node| node.cardinality())) + .sum() + } + + /// Evaluate the set of provided predicates against this node, adding values to the provided + /// [`StringViewBuilder`]s. Predicates and builders are provided as slices, as this is called + /// recursively down the cache tree. + /// + /// Returns the number of values that were added to the arrow builders. + /// + /// # Panics + /// + /// This will panic if invalid sized `predicates` and `builders` slices are passed in. When + /// called from the root [`Node`], their size should be that of the depth of the cache, i.e., + /// the number of columns in the cache. + fn evaluate_predicates( + &self, + expired_time_ns: i64, + predicates: &[Option<&Predicate>], + builders: &mut [StringViewBuilder], + ) -> usize { + let mut total_count = 0; + let (predicate, next_predicates) = predicates + .split_first() + .expect("predicates should not be empty"); + // if there is a predicate, evaluate it, otherwise, just grab everything from the node: + let values_and_nodes = if let Some(predicate) = predicate { + self.evaluate_predicate(expired_time_ns, predicate) + } else { + self.0 + .iter() + .filter(|&(_, (t, _))| (t > &expired_time_ns)) + .map(|(v, (_, n))| (v.clone(), n.as_ref())) + .collect() + }; + let (builder, next_builders) = builders + .split_first_mut() + .expect("builders should not be empty"); + // iterate over the resulting set of values and next nodes (if this is a branch), and add + // the values to the arrow builders: + for (value, node) in values_and_nodes { + if let Some(node) = node { + let count = + node.evaluate_predicates(expired_time_ns, next_predicates, next_builders); + if count > 0 { + // we are not on a terminal node in the cache, so create a block, as this value + // repeated `count` times, i.e., depending on how many values come out of + // subsequent nodes: + let block = builder.append_block(value.0.as_bytes().into()); + for _ in 0..count { + builder + .try_append_view(block, 0u32, value.0.as_bytes().len() as u32) + .expect("append view for known valid block, offset and length"); + } + total_count += count; + } + } else { + builder.append_value(value.0); + total_count += 1; + } + } + total_count + } + + /// Evaluate a predicate against a [`Node`], producing a list of [`Value`]s and, if this is a + /// branch node in the cache tree, a reference to the next [`Node`]. + fn evaluate_predicate( + &self, + expired_time_ns: i64, + predicate: &Predicate, + ) -> Vec<(Value, Option<&Node>)> { + match &predicate { + Predicate::In(in_list) => in_list + .iter() + .filter_map(|v| { + self.0.get_key_value(v).and_then(|(v, (t, n))| { + (t > &expired_time_ns).then(|| (v.clone(), n.as_ref())) + }) + }) + .collect(), + Predicate::NotIn(not_in_set) => self + .0 + .iter() + .filter(|(v, (t, _))| t > &expired_time_ns && !not_in_set.contains(v)) + .map(|(v, (_, n))| (v.clone(), n.as_ref())) + .collect(), + } + } +} + +/// A cache value, which for now, only holds strings +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Hash)] +pub(crate) struct Value(Arc); + +impl From<&FieldData> for Value { + fn from(field: &FieldData) -> Self { + match field { + FieldData::Key(s) => Self(Arc::from(s.as_str())), + FieldData::Tag(s) => Self(Arc::from(s.as_str())), + FieldData::String(s) => Self(Arc::from(s.as_str())), + FieldData::Timestamp(_) + | FieldData::Integer(_) + | FieldData::UInteger(_) + | FieldData::Float(_) + | FieldData::Boolean(_) => panic!("unexpected field type for metadata cache"), + } + } +} + +impl From for Value { + fn from(value: String) -> Self { + Self(Arc::from(value.as_str())) + } +} + +/// A predicate that can be applied when gathering [`RecordBatch`]es from a [`MetaCache`] +/// +/// This is intended to be derived from a set of filter expressions in Datafusion by analyzing +/// them with a `LiteralGuarantee`. +/// +/// This uses a `BTreeSet` to store the values so that they are iterated over in sorted order. +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) enum Predicate { + In(BTreeSet), + NotIn(BTreeSet), +} + +impl std::fmt::Display for Predicate { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Predicate::In(_) => write!(f, "IN (")?, + Predicate::NotIn(_) => write!(f, "NOT IN (")?, + } + let mut values = self.values(); + while let Some(v) = values.next() { + write!(f, "{}", v.0)?; + if values.size_hint().0 > 0 { + write!(f, ",")?; + } + } + + write!(f, ")") + } +} + +impl Predicate { + pub(crate) fn new_in(in_vals: impl IntoIterator>>) -> Self { + Self::In(in_vals.into_iter().map(Into::into).map(Value).collect()) + } + + pub(crate) fn new_not_in(in_vals: impl IntoIterator>>) -> Self { + Self::NotIn(in_vals.into_iter().map(Into::into).map(Value).collect()) + } + + fn values(&self) -> impl Iterator { + match self { + Predicate::In(vals) => vals.iter(), + Predicate::NotIn(vals) => vals.iter(), + } + } +} diff --git a/influxdb3_cache/src/meta_cache/mod.rs b/influxdb3_cache/src/meta_cache/mod.rs index 67f9eeba9cd..ea50ab66683 100644 --- a/influxdb3_cache/src/meta_cache/mod.rs +++ b/influxdb3_cache/src/meta_cache/mod.rs @@ -1,493 +1,28 @@ //! The Metadata Cache holds the distinct values for a column or set of columns on a table -use std::{ - cmp::Eq, - collections::{BTreeMap, BinaryHeap, HashSet}, - num::NonZeroUsize, - sync::Arc, - time::Duration, -}; - -use anyhow::{bail, Context}; -use arrow::{ - array::{ArrayRef, RecordBatch, StringViewBuilder}, - datatypes::{DataType, Field, SchemaBuilder, SchemaRef}, - error::ArrowError, -}; -use influxdb3_catalog::catalog::TableDefinition; -use influxdb3_id::ColumnId; -use influxdb3_wal::{FieldData, Row}; -use iox_time::TimeProvider; -use schema::{InfluxColumnType, InfluxFieldType}; - -/// A metadata cache for storing distinct values for a set of columns in a table -#[derive(Debug)] -pub struct MetaCache { - time_provider: Arc, - /// The maximum number of unique value combinations in the cache - max_cardinality: usize, - /// The maximum age for entries in the cache - max_age: Duration, - /// The fixed Arrow schema used to produce record batches from the cache - schema: SchemaRef, - /// Holds current state of the cache - state: MetaCacheState, - /// The identifiers of the columns used in the cache - column_ids: Vec, - /// The cache data, stored in a tree - data: Node, -} - -/// Type for tracking the current state of a [`MetaCache`] -#[derive(Debug, Default)] -struct MetaCacheState { - /// The current number of unique value combinations in the cache - cardinality: usize, -} - -/// Arguments to create a new [`MetaCache`] -#[derive(Debug)] -pub struct CreateMetaCacheArgs { - pub time_provider: Arc, - pub table_def: Arc, - pub max_cardinality: MaxCardinality, - pub max_age: MaxAge, - pub column_ids: Vec, -} - -#[derive(Debug, Clone, Copy)] -pub struct MaxCardinality(NonZeroUsize); - -#[cfg(test)] -impl MaxCardinality { - fn try_new(size: usize) -> Result { - Ok(Self( - NonZeroUsize::try_from(size).context("invalid size provided")?, - )) - } -} - -impl Default for MaxCardinality { - fn default() -> Self { - Self(NonZeroUsize::new(100_000).unwrap()) - } -} - -impl From for usize { - fn from(value: MaxCardinality) -> Self { - value.0.into() - } -} - -#[derive(Debug, Clone, Copy)] -pub struct MaxAge(Duration); - -impl Default for MaxAge { - fn default() -> Self { - Self(Duration::from_secs(24 * 60 * 60)) - } -} - -impl From for Duration { - fn from(value: MaxAge) -> Self { - value.0 - } -} - -impl From for MaxAge { - fn from(duration: Duration) -> Self { - Self(duration) - } -} - -impl MetaCache { - /// Create a new [`MetaCache`] - /// - /// Must pass a non-empty set of [`ColumnId`]s which correspond to valid columns in the provided - /// [`TableDefinition`]. - pub fn new( - CreateMetaCacheArgs { - time_provider, - table_def, - max_cardinality, - max_age, - column_ids, - }: CreateMetaCacheArgs, - ) -> Result { - if column_ids.is_empty() { - bail!("must pass a non-empty set of column ids"); - } - let mut builder = SchemaBuilder::new(); - for id in &column_ids { - let col = table_def.columns.get(id).with_context(|| { - format!("invalid column id ({id}) encountered while creating metadata cache") - })?; - let data_type = match col.data_type { - InfluxColumnType::Tag | InfluxColumnType::Field(InfluxFieldType::String) => { - DataType::Utf8View - } - other => bail!( - "cannot use a column of type {other} in a metadata cache, only \ - tags and string fields can be used" - ), - }; - - builder.push(Arc::new(Field::new(col.name.as_ref(), data_type, false))); - } - Ok(Self { - time_provider, - max_cardinality: max_cardinality.into(), - max_age: max_age.into(), - state: MetaCacheState::default(), - schema: Arc::new(builder.finish()), - column_ids, - data: Node::default(), - }) - } - - /// Push a [`Row`] from the WAL into the cache, if the row contains all of the cached columns. - pub fn push(&mut self, row: &Row) { - let mut values = Vec::with_capacity(self.column_ids.len()); - for id in &self.column_ids { - let Some(value) = row - .fields - .iter() - .find(|f| &f.id == id) - .map(|f| Value::from(&f.value)) - else { - // ignore the row if it does not contain all columns in the cache: - return; - }; - values.push(value); - } - let mut target = &mut self.data; - let mut val_iter = values.into_iter().peekable(); - let mut is_new = false; - while let (Some(value), peek) = (val_iter.next(), val_iter.peek()) { - let (last_seen, node) = target.0.entry(value).or_insert_with(|| { - is_new = true; - (row.time, peek.is_some().then(Node::default)) - }); - *last_seen = row.time; - if let Some(node) = node { - target = node; - } else { - break; - } - } - if is_new { - self.state.cardinality += 1; - } - } - - /// Gather a record batch from a cache given the set of predicates - /// - /// This assumes the predicates are well behaved, and validated before being passed in. For example, - /// there cannot be multiple predicates on a single column; the caller needs to validate/transform - /// the incoming predicates from Datafusion before calling. - /// - /// Entries in the cache that have not been seen since before the `max_age` of the cache will be - /// filtered out of the result. - pub fn to_record_batch(&self, predicates: &[Predicate]) -> Result { - // predicates may not be provided for all columns in the cache, or not be provided in the - // order of columns in the cache. This re-orders them to the correct order, and fills in any - // gaps with None. - let predicates: Vec> = self - .column_ids - .iter() - .map(|id| predicates.iter().find(|p| &p.column_id == id)) - .collect(); - - // Uses a [`StringViewBuilder`] to compose the set of [`RecordBatch`]es. This is convenient for - // the sake of nested caches, where a predicate on a higher branch in the cache will need to have - // its value in the outputted batches duplicated. - let mut builders: Vec = (0..self.column_ids.len()) - .map(|_| StringViewBuilder::new()) - .collect(); - - let expired_time_ns = self.expired_time_ns(); - let _ = self - .data - .evaluate_predicates(expired_time_ns, &predicates, &mut builders); - - RecordBatch::try_new( - Arc::clone(&self.schema), - builders - .into_iter() - .map(|mut builder| Arc::new(builder.finish()) as ArrayRef) - .collect(), - ) - } - - /// Prune nodes from within the cache - /// - /// This first prunes entries that are older than the `max_age` of the cache. If the cardinality - /// of the cache is still over its `max_cardinality`, it will do another pass to bring the cache - /// size down. - pub fn prune(&mut self) { - let before_time_ns = self.expired_time_ns(); - let _ = self.data.remove_before(before_time_ns); - self.state.cardinality = self.data.cardinality(); - if self.state.cardinality > self.max_cardinality { - let n_to_remove = self.state.cardinality - self.max_cardinality; - self.data.remove_n_oldest(n_to_remove); - self.state.cardinality = self.data.cardinality(); - } - } - - /// Get the nanosecond timestamp as an `i64`, before which, entries that have not been seen - /// since are considered expired. - fn expired_time_ns(&self) -> i64 { - self.time_provider - .now() - .checked_sub(self.max_age) - .expect("max age on cache should not cause an overflow") - .timestamp_nanos() - } -} - -/// A node in the `data` tree of a [`MetaCache`] -/// -/// Recursive struct holding a [`BTreeMap`] whose keys are the values nested under this node, and -/// whose values hold the last seen time as an [`i64`] of each value, and an optional reference to -/// the node in the next level of the tree. -#[derive(Debug, Default)] -struct Node(BTreeMap)>); - -impl Node { - /// Remove all elements before the given nanosecond timestamp returning `true` if the resulting - /// node is empty. - fn remove_before(&mut self, time_ns: i64) -> bool { - self.0.retain(|_, (last_seen, node)| { - // Note that for a branch node, the `last_seen` time will be the greatest of - // all its nodes, so an entire branch can be removed if its value has not been seen since - // before `time_ns`, hence the short-curcuit here: - *last_seen > time_ns - || node - .as_mut() - .is_some_and(|node| node.remove_before(time_ns)) - }); - self.0.is_empty() - } - - /// Remove the `n_to_remove` oldest entries from the cache - fn remove_n_oldest(&mut self, n_to_remove: usize) { - let mut times = BinaryHeap::with_capacity(n_to_remove); - self.find_n_oldest(n_to_remove, &mut times); - self.remove_before(*times.peek().unwrap()); - } - - /// Use a binary heap to find the time before which all nodes should be removed - fn find_n_oldest(&self, n_to_remove: usize, times_heap: &mut BinaryHeap) { - self.0 - .values() - // do not need to add the last_seen time for a branch node to the - // heap since it will be equal to the greatest of that of its leaves - .for_each(|(last_seen, node)| { - if let Some(node) = node { - node.find_n_oldest(n_to_remove, times_heap) - } else if times_heap.len() < n_to_remove { - times_heap.push(*last_seen); - } else if times_heap.peek().is_some_and(|newest| last_seen < newest) { - times_heap.pop(); - times_heap.push(*last_seen); - } - }); - } - - /// Get the total count of unique value combinations nested under this node - /// - /// Note that this includes expired elements, which still contribute to the total size of the - /// cache until they are pruned. - fn cardinality(&self) -> usize { - self.0 - .values() - .map(|(_, node)| node.as_ref().map_or(1, |node| node.cardinality())) - .sum() - } - - /// Evaluate the set of provided predicates against this node, adding values to the provided - /// [`StringViewBuilder`]s. Predicates and builders are provided as slices, as this is called - /// recursively down the cache tree. - /// - /// Returns the number of values that were added to the arrow builders. - /// - /// # Panics - /// - /// This will panic if invalid sized `predicates` and `builders` slices are passed in. When - /// called from the root [`Node`], their size should be that of the depth of the cache, i.e., - /// the number of columns in the cache. - fn evaluate_predicates( - &self, - expired_time_ns: i64, - predicates: &[Option<&Predicate>], - builders: &mut [StringViewBuilder], - ) -> usize { - let mut total_count = 0; - let (predicate, next_predicates) = predicates - .split_first() - .expect("predicates should not be empty"); - // if there is a predicate, evaluate it, otherwise, just grab everything from the node: - let values_and_nodes = if let Some(predicate) = predicate { - self.evaluate_predicate(expired_time_ns, predicate) - } else { - self.0 - .iter() - .filter(|&(_, (t, _))| (t > &expired_time_ns)) - .map(|(v, (_, n))| (v.clone(), n.as_ref())) - .collect() - }; - let (builder, next_builders) = builders - .split_first_mut() - .expect("builders should not be empty"); - // iterate over the resulting set of values and next nodes (if this is a branch), and add - // the values to the arrow builders: - for (value, node) in values_and_nodes { - if let Some(node) = node { - let count = - node.evaluate_predicates(expired_time_ns, next_predicates, next_builders); - if count > 0 { - // we are not on a terminal node in the cache, so create a block, as this value - // repeated `count` times, i.e., depending on how many values come out of - // subsequent nodes: - let block = builder.append_block(value.0.as_bytes().into()); - for _ in 0..count { - builder - .try_append_view(block, 0u32, value.0.as_bytes().len() as u32) - .expect("append view for known valid block, offset and length"); - } - total_count += count; - } - } else { - builder.append_value(value.0); - total_count += 1; - } - } - total_count - } - - /// Evaluate a predicate against a [`Node`], producing a list of [`Value`]s and, if this is a - /// branch node in the cache tree, a reference to the next [`Node`]. - fn evaluate_predicate( - &self, - expired_time_ns: i64, - predicate: &Predicate, - ) -> Vec<(Value, Option<&Node>)> { - match &predicate.kind { - PredicateKind::Eq(rhs) => self - .0 - .get_key_value(rhs) - .and_then(|(v, (t, n))| { - (t > &expired_time_ns).then(|| vec![(v.clone(), n.as_ref())]) - }) - .unwrap_or_default(), - PredicateKind::NotEq(rhs) => self - .0 - .iter() - .filter(|(v, (t, _))| t > &expired_time_ns && *v != rhs) - .map(|(v, (_, n))| (v.clone(), n.as_ref())) - .collect(), - PredicateKind::In(in_list) => in_list - .iter() - .filter_map(|v| { - self.0.get_key_value(v).and_then(|(v, (t, n))| { - (t > &expired_time_ns).then(|| (v.clone(), n.as_ref())) - }) - }) - .collect(), - PredicateKind::NotIn(not_in_set) => self - .0 - .iter() - .filter(|(v, (t, _))| t > &expired_time_ns && !not_in_set.contains(v)) - .map(|(v, (_, n))| (v.clone(), n.as_ref())) - .collect(), - } - } -} - -/// A cache value, which for now, only holds strings -#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Hash)] -struct Value(Arc); - -impl From<&FieldData> for Value { - fn from(field: &FieldData) -> Self { - match field { - FieldData::Key(s) => Self(Arc::from(s.as_str())), - FieldData::Tag(s) => Self(Arc::from(s.as_str())), - FieldData::String(s) => Self(Arc::from(s.as_str())), - FieldData::Timestamp(_) - | FieldData::Integer(_) - | FieldData::UInteger(_) - | FieldData::Float(_) - | FieldData::Boolean(_) => panic!("unexpected field type for metadata cache"), - } - } -} - -/// A predicate that can be applied when gathering [`RecordBatch`]es from a [`MetaCache`] -/// -/// This is intended to be derived from a set of filter expressions in Datafusion -#[derive(Debug)] -pub struct Predicate { - column_id: ColumnId, - kind: PredicateKind, -} - -#[cfg(test)] -impl Predicate { - fn new_eq(column_id: ColumnId, rhs: impl Into>) -> Self { - Self { - column_id, - kind: PredicateKind::Eq(Value(rhs.into())), - } - } - - fn new_not_eq(column_id: ColumnId, rhs: impl Into>) -> Self { - Self { - column_id, - kind: PredicateKind::NotEq(Value(rhs.into())), - } - } - - fn new_in(column_id: ColumnId, in_vals: impl IntoIterator>>) -> Self { - Self { - column_id, - kind: PredicateKind::In(in_vals.into_iter().map(Into::into).map(Value).collect()), - } - } - - fn new_not_in(column_id: ColumnId, in_vals: impl IntoIterator>>) -> Self { - Self { - column_id, - kind: PredicateKind::NotIn(in_vals.into_iter().map(Into::into).map(Value).collect()), - } - } -} - -// TODO: remove this when fully implemented, for now just suppressing compiler warnings -#[allow(dead_code)] -#[derive(Debug)] -enum PredicateKind { - Eq(Value), - NotEq(Value), - In(Vec), - NotIn(HashSet), -} +mod cache; +mod provider; +pub use provider::MetaCacheProvider; +mod table_function; +pub use table_function::MetaCacheFunction; +pub use table_function::META_CACHE_UDTF_NAME; #[cfg(test)] mod tests { - use std::{sync::Arc, time::Duration}; - - use arrow_util::assert_batches_sorted_eq; + use arrow::array::AsArray; + use datafusion::{assert_batches_eq, assert_batches_sorted_eq, prelude::SessionContext}; + use indexmap::IndexMap; use influxdb3_catalog::catalog::{Catalog, DatabaseSchema}; use influxdb3_id::ColumnId; - use influxdb3_wal::Row; + use influxdb3_wal::{Gen1Duration, Row, WriteBatch}; use influxdb3_write::write_buffer::validator::WriteValidator; use iox_time::{MockProvider, Time, TimeProvider}; + use std::{sync::Arc, time::Duration}; - use crate::meta_cache::Predicate; - - use super::{CreateMetaCacheArgs, MaxAge, MaxCardinality, MetaCache}; + use crate::meta_cache::{ + cache::{CreateMetaCacheArgs, MaxAge, MaxCardinality, MetaCache, Predicate}, + MetaCacheFunction, MetaCacheProvider, META_CACHE_UDTF_NAME, + }; struct TestWriter { catalog: Arc, @@ -501,7 +36,8 @@ mod tests { catalog: Arc::new(Catalog::new("test-host".into(), "test-instance".into())), } } - fn write_lp(&self, lp: impl AsRef, time_ns: i64) -> Vec { + + fn write_lp_to_rows(&self, lp: impl AsRef, time_ns: i64) -> Vec { let lines_parsed = WriteValidator::initialize( Self::DB_NAME.try_into().unwrap(), Arc::clone(&self.catalog), @@ -519,6 +55,28 @@ mod tests { lines_parsed.into_inner().to_rows() } + fn write_lp_to_write_batch(&self, lp: impl AsRef, time_ns: i64) -> WriteBatch { + WriteValidator::initialize( + Self::DB_NAME.try_into().unwrap(), + Arc::clone(&self.catalog), + time_ns, + ) + .expect("initialize write validator") + .v1_parse_lines_and_update_schema( + lp.as_ref(), + false, + Time::from_timestamp_nanos(time_ns), + influxdb3_write::Precision::Nanosecond, + ) + .expect("parse and validate v1 line protocol") + .convert_lines_to_buffer(Gen1Duration::new_1m()) + .into() + } + + fn catalog(&self) -> Arc { + Arc::clone(&self.catalog) + } + fn db_schema(&self) -> Arc { self.catalog .db_schema(Self::DB_NAME) @@ -531,7 +89,7 @@ mod tests { let writer = TestWriter::new(); let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0))); // write some data to get a set of rows destined for the WAL, and an updated catalog: - let rows = writer.write_lp( + let rows = writer.write_lp_to_rows( "\ cpu,region=us-east,host=a usage=100\n\ cpu,region=us-east,host=b usage=100\n\ @@ -558,13 +116,15 @@ mod tests { let region_col_id = column_ids[0]; let host_col_id = column_ids[1]; // create the cache: - let mut cache = MetaCache::new(CreateMetaCacheArgs { + let mut cache = MetaCache::new( time_provider, - table_def, - max_cardinality: MaxCardinality::default(), - max_age: MaxAge::default(), - column_ids, - }) + CreateMetaCacheArgs { + table_def, + max_cardinality: MaxCardinality::default(), + max_age: MaxAge::default(), + column_ids, + }, + ) .expect("create cache"); // push the row data into the cache: for row in rows { @@ -574,14 +134,20 @@ mod tests { // run a series of test cases with varying predicates: struct TestCase<'a> { desc: &'a str, - predicates: &'a [Predicate], + predicates: IndexMap, expected: &'a [&'a str], } + fn create_predicate_map( + predicates: &[(ColumnId, Predicate)], + ) -> IndexMap { + predicates.iter().cloned().collect() + } + let test_cases = [ TestCase { desc: "no predicates", - predicates: &[], + predicates: create_predicate_map(&[]), expected: &[ "+---------+------+", "| region | host |", @@ -601,88 +167,12 @@ mod tests { "+---------+------+", ], }, - TestCase { - desc: "eq predicate on region", - predicates: &[Predicate::new_eq(region_col_id, "us-east")], - expected: &[ - "+---------+------+", - "| region | host |", - "+---------+------+", - "| us-east | a |", - "| us-east | b |", - "+---------+------+", - ], - }, - TestCase { - desc: "eq predicate on host", - predicates: &[Predicate::new_eq(host_col_id, "h")], - expected: &[ - "+---------+------+", - "| region | host |", - "+---------+------+", - "| ca-cent | h |", - "+---------+------+", - ], - }, - TestCase { - desc: "eq predicate on region and host", - predicates: &[ - Predicate::new_eq(region_col_id, "ca-cent"), - Predicate::new_eq(host_col_id, "h"), - ], - expected: &[ - "+---------+------+", - "| region | host |", - "+---------+------+", - "| ca-cent | h |", - "+---------+------+", - ], - }, - TestCase { - desc: "not eq predicate on region", - predicates: &[Predicate::new_not_eq(region_col_id, "ca-cent")], - expected: &[ - "+---------+------+", - "| region | host |", - "+---------+------+", - "| ca-east | e |", - "| ca-east | f |", - "| eu-cent | k |", - "| eu-cent | l |", - "| eu-east | i |", - "| eu-east | j |", - "| us-east | a |", - "| us-east | b |", - "| us-west | c |", - "| us-west | d |", - "+---------+------+", - ], - }, - TestCase { - desc: "not eq predicate on region and host", - predicates: &[ - Predicate::new_not_eq(region_col_id, "ca-cent"), - Predicate::new_not_eq(host_col_id, "a"), - ], - expected: &[ - "+---------+------+", - "| region | host |", - "+---------+------+", - "| ca-east | e |", - "| ca-east | f |", - "| eu-cent | k |", - "| eu-cent | l |", - "| eu-east | i |", - "| eu-east | j |", - "| us-east | b |", - "| us-west | c |", - "| us-west | d |", - "+---------+------+", - ], - }, TestCase { desc: "in predicate on region", - predicates: &[Predicate::new_in(region_col_id, ["ca-cent", "ca-east"])], + predicates: create_predicate_map(&[( + region_col_id, + Predicate::new_in(["ca-cent", "ca-east"]), + )]), expected: &[ "+---------+------+", "| region | host |", @@ -696,10 +186,10 @@ mod tests { }, TestCase { desc: "in predicate on region and host", - predicates: &[ - Predicate::new_in(region_col_id, ["ca-cent", "ca-east"]), - Predicate::new_in(host_col_id, ["g", "e"]), - ], + predicates: create_predicate_map(&[ + (region_col_id, Predicate::new_in(["ca-cent", "ca-east"])), + (host_col_id, Predicate::new_in(["g", "e"])), + ]), expected: &[ "+---------+------+", "| region | host |", @@ -711,7 +201,10 @@ mod tests { }, TestCase { desc: "not in predicate on region", - predicates: &[Predicate::new_not_in(region_col_id, ["ca-cent", "ca-east"])], + predicates: create_predicate_map(&[( + region_col_id, + Predicate::new_not_in(["ca-cent", "ca-east"]), + )]), expected: &[ "+---------+------+", "| region | host |", @@ -729,10 +222,10 @@ mod tests { }, TestCase { desc: "not in predicate on region and host", - predicates: &[ - Predicate::new_not_in(region_col_id, ["ca-cent", "ca-east"]), - Predicate::new_not_in(host_col_id, ["j", "k"]), - ], + predicates: create_predicate_map(&[ + (region_col_id, Predicate::new_not_in(["ca-cent", "ca-east"])), + (host_col_id, Predicate::new_not_in(["j", "k"])), + ]), expected: &[ "+---------+------+", "| region | host |", @@ -750,7 +243,7 @@ mod tests { for tc in test_cases { let records = cache - .to_record_batch(tc.predicates) + .to_record_batch(&tc.predicates) .expect("get record batches"); println!("{}", tc.desc); assert_batches_sorted_eq!(tc.expected, &[records]); @@ -762,7 +255,7 @@ mod tests { let writer = TestWriter::new(); let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0))); // write some data to update the catalog: - let _ = writer.write_lp( + let _ = writer.write_lp_to_rows( "\ cpu,region=us-east,host=a usage=100\n\ ", @@ -776,18 +269,20 @@ mod tests { .map(|name| table_def.column_name_to_id_unchecked(name)) .collect(); // create a cache with some cardinality and age limits: - let mut cache = MetaCache::new(CreateMetaCacheArgs { - time_provider: Arc::clone(&time_provider) as _, - table_def, - max_cardinality: MaxCardinality::try_new(10).unwrap(), - max_age: MaxAge::from(Duration::from_nanos(100)), - column_ids, - }) + let mut cache = MetaCache::new( + Arc::clone(&time_provider) as _, + CreateMetaCacheArgs { + table_def, + max_cardinality: MaxCardinality::try_from(10).unwrap(), + max_age: MaxAge::from(Duration::from_nanos(100)), + column_ids, + }, + ) .expect("create cache"); // push a bunch of rows with incrementing times and varying tag values: (0..10).for_each(|mult| { time_provider.set(Time::from_timestamp_nanos(mult * 20)); - let rows = writer.write_lp( + let rows = writer.write_lp_to_rows( format!( "\ cpu,region=us-east-{mult},host=a-{mult} usage=100\n\ @@ -805,7 +300,7 @@ mod tests { // check the cache before prune: // NOTE: this does not include entries that have surpassed the max_age of the cache, though, // there are still more than the cache's max cardinality, as it has not yet been pruned. - let records = cache.to_record_batch(&[]).unwrap(); + let records = cache.to_record_batch(&Default::default()).unwrap(); assert_batches_sorted_eq!( [ "+-----------+------+", @@ -831,7 +326,7 @@ mod tests { &[records] ); cache.prune(); - let records = cache.to_record_batch(&[]).unwrap(); + let records = cache.to_record_batch(&Default::default()).unwrap(); assert_batches_sorted_eq!( [ "+-----------+------+", @@ -851,4 +346,448 @@ mod tests { &[records] ); } + + /// This test sets up a [`MetaCacheProvider`], creates a [`MetaCache`] using the `region` and + /// `host` column, and then writes several different unique combinations of values into it. + /// It then sets up a DataFusion [`SessionContext`], registers our [`MetaCacheFunction`] as a + /// UDTF, and then runs a series of test cases to verify queries against the function. + /// + /// The purpose of this is to see that the cache works as intended, and importantly, that the + /// predicate pushdown is happening and being leveraged by the underlying [`MetaCache`], vs. + /// DataFusion doing it for us with a higher level FilterExec. + /// + /// Each test case verifies the `RecordBatch` output of the query as well as the output for + /// EXPLAIN on the same query. The EXPLAIN output contains a line for the MetaCacheExec, which + /// is the custom execution plan impl for the metadata cache that captures the predicates that + /// are pushed down to the underlying [`MetaCacahe::to_record_batch`] method, if any. + #[tokio::test] + async fn test_datafusion_meta_cache_udtf() { + // create a test writer and do a write in to populate the catalog with a db/table: + let writer = TestWriter::new(); + let _ = writer.write_lp_to_write_batch( + "\ + cpu,region=us-east,host=a usage=100\n\ + ", + 0, + ); + + // create a meta provider and a cache on tag columns 'region' and 'host': + let db_schema = writer.db_schema(); + let table_def = db_schema.table_definition("cpu").unwrap(); + let column_ids: Vec = ["region", "host"] + .into_iter() + .map(|name| table_def.column_name_to_id_unchecked(name)) + .collect(); + let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0))); + let meta_provider = + MetaCacheProvider::new_from_catalog(time_provider, writer.catalog()).unwrap(); + meta_provider + .create_cache( + db_schema.id, + None, + CreateMetaCacheArgs { + table_def, + max_cardinality: MaxCardinality::default(), + max_age: MaxAge::default(), + column_ids, + }, + ) + .unwrap(); + + // do some writes to generate a write batch and send it into the cache: + let write_batch = writer.write_lp_to_write_batch( + "\ + cpu,region=us-east,host=a usage=100\n\ + cpu,region=us-east,host=b usage=100\n\ + cpu,region=us-west,host=c usage=100\n\ + cpu,region=us-west,host=d usage=100\n\ + cpu,region=ca-east,host=e usage=100\n\ + cpu,region=ca-cent,host=f usage=100\n\ + cpu,region=ca-west,host=g usage=100\n\ + cpu,region=ca-west,host=h usage=100\n\ + cpu,region=eu-cent,host=i usage=100\n\ + cpu,region=eu-cent,host=j usage=100\n\ + cpu,region=eu-west,host=k usage=100\n\ + cpu,region=eu-west,host=l usage=100\n\ + ", + 0, + ); + let wal_contents = influxdb3_wal::create::wal_contents( + (0, 1, 0), + [influxdb3_wal::create::write_batch_op(write_batch)], + ); + meta_provider.write_wal_contents_to_cache(&wal_contents); + + // Spin up a DataFusion session context and add the meta_cache function to it so we + // can query for data in the cache we created and populated above: + let ctx = SessionContext::new(); + let meta_func = MetaCacheFunction::new(db_schema.id, Arc::clone(&meta_provider)); + ctx.register_udtf(META_CACHE_UDTF_NAME, Arc::new(meta_func)); + + struct TestCase<'a> { + /// A short description of the test + _desc: &'a str, + /// A SQL expression to evaluate using the datafusion session context, should be of + /// the form: + /// ```sql + /// SELECT * FROM meta_cache('cpu') ... + /// ``` + sql: &'a str, + /// Expected record batch output + expected: &'a [&'a str], + /// Expected EXPLAIN output contains this. + /// + /// For checking the `MetaCacheExec` portion of the EXPLAIN output for the given `sql` + /// query. A "contains" is used instead of matching the whole EXPLAIN output to prevent + /// flakyness from upstream changes to other parts of the query plan. + explain_contains: &'a str, + /// Use `assert_batches_sorted_eq!` to check the outputted record batches instead of + /// `assert_batches_eq!`. + /// + /// # Note + /// + /// The cache should produce results in a sorted order as-is, however, some queries + /// that process the results after they are emitted from the cache may have their order + /// altered by DataFusion, e.g., `SELECT DISTINCT(column_name) FROM meta_cache('table')` + /// or queries that project columns that are not at the top level of the cache. + use_sorted_assert: bool, + } + + let test_cases = [ + TestCase { + _desc: "no predicates", + sql: "SELECT * FROM meta_cache('cpu')", + expected: &[ + "+---------+------+", + "| region | host |", + "+---------+------+", + "| ca-cent | f |", + "| ca-east | e |", + "| ca-west | g |", + "| ca-west | h |", + "| eu-cent | i |", + "| eu-cent | j |", + "| eu-west | k |", + "| eu-west | l |", + "| us-east | a |", + "| us-east | b |", + "| us-west | c |", + "| us-west | d |", + "+---------+------+", + ], + explain_contains: "MetaCacheExec: inner=MemoryExec: partitions=1, partition_sizes=[1]", + use_sorted_assert: false, + }, + TestCase { + _desc: "eq predicate on region", + sql: "SELECT * FROM meta_cache('cpu') WHERE region = 'us-east'", + expected: &[ + "+---------+------+", + "| region | host |", + "+---------+------+", + "| us-east | a |", + "| us-east | b |", + "+---------+------+", + ], + explain_contains: "MetaCacheExec: predicates=[[0 IN (us-east)]] inner=MemoryExec: partitions=1, partition_sizes=[1]", + use_sorted_assert: false, + }, + TestCase { + _desc: "eq predicate on region and host", + sql: "SELECT * FROM meta_cache('cpu') \ + WHERE region = 'us-east' AND host = 'a'", + expected: &[ + "+---------+------+", + "| region | host |", + "+---------+------+", + "| us-east | a |", + "+---------+------+", + ], + explain_contains: "MetaCacheExec: predicates=[[0 IN (us-east)], [1 IN (a)]] inner=MemoryExec: partitions=1, partition_sizes=[1]", + use_sorted_assert: false, + }, + TestCase { + _desc: "eq predicate on region; in predicate on host", + sql: "SELECT * FROM meta_cache('cpu') \ + WHERE region = 'us-east' AND host IN ('a', 'b')", + expected: &[ + "+---------+------+", + "| region | host |", + "+---------+------+", + "| us-east | a |", + "| us-east | b |", + "+---------+------+", + ], + explain_contains: "MetaCacheExec: predicates=[[0 IN (us-east)], [1 IN (a,b)]] inner=MemoryExec: partitions=1, partition_sizes=[1]", + use_sorted_assert: false, + }, + TestCase { + _desc: "eq predicate on region; not in predicate on host", + sql: "SELECT * FROM meta_cache('cpu') \ + WHERE region = 'us-east' AND host != 'a'", + expected: &[ + "+---------+------+", + "| region | host |", + "+---------+------+", + "| us-east | b |", + "+---------+------+", + ], + explain_contains: "MetaCacheExec: predicates=[[0 IN (us-east)], [1 NOT IN (a)]] inner=MemoryExec: partitions=1, partition_sizes=[1]", + use_sorted_assert: false, + }, + TestCase { + _desc: "in predicate on region", + sql: "SELECT * FROM meta_cache('cpu') \ + WHERE region IN ('ca-cent', 'ca-east', 'us-east', 'us-west')", + expected: &[ + "+---------+------+", + "| region | host |", + "+---------+------+", + "| ca-cent | f |", + "| ca-east | e |", + "| us-east | a |", + "| us-east | b |", + "| us-west | c |", + "| us-west | d |", + "+---------+------+", + ], + explain_contains: "MetaCacheExec: predicates=[[0 IN (ca-cent,ca-east,us-east,us-west)]] inner=MemoryExec: partitions=1, partition_sizes=[1]", + use_sorted_assert: false, + }, + TestCase { + _desc: "not in predicate on region", + sql: "SELECT * FROM meta_cache('cpu') \ + WHERE region NOT IN ('ca-cent', 'ca-east', 'us-east', 'us-west')", + expected: &[ + "+---------+------+", + "| region | host |", + "+---------+------+", + "| ca-west | g |", + "| ca-west | h |", + "| eu-cent | i |", + "| eu-cent | j |", + "| eu-west | k |", + "| eu-west | l |", + "+---------+------+", + ], + explain_contains: "MetaCacheExec: predicates=[[0 NOT IN (ca-cent,ca-east,us-east,us-west)]] inner=MemoryExec: partitions=1, partition_sizes=[1]", + use_sorted_assert: false, + }, + TestCase { + _desc: "or eq predicates on region", + sql: "SELECT * FROM meta_cache('cpu') \ + WHERE region = 'us-east' OR region = 'ca-east'", + expected: &[ + "+---------+------+", + "| region | host |", + "+---------+------+", + "| ca-east | e |", + "| us-east | a |", + "| us-east | b |", + "+---------+------+", + ], + explain_contains: "MetaCacheExec: predicates=[[0 IN (ca-east,us-east)]] inner=MemoryExec: partitions=1, partition_sizes=[1]", + use_sorted_assert: false, + }, + TestCase { + _desc: "or eq predicate on host", + sql: "SELECT * FROM meta_cache('cpu') \ + WHERE host = 'd' OR host = 'e'", + expected: &[ + "+---------+------+", + "| region | host |", + "+---------+------+", + "| ca-east | e |", + "| us-west | d |", + "+---------+------+", + ], + explain_contains: "MetaCacheExec: predicates=[[1 IN (d,e)]] inner=MemoryExec: partitions=1, partition_sizes=[1]", + use_sorted_assert: false, + }, + TestCase { + _desc: "un-grouped host conditions are not handled in predicate pushdown", + sql: "SELECT * FROM meta_cache('cpu') \ + WHERE region = 'us-east' AND host = 'a' OR host = 'b'", + expected: &[ + "+---------+------+", + "| region | host |", + "+---------+------+", + "| us-east | a |", + "| us-east | b |", + "+---------+------+", + ], + explain_contains: "MetaCacheExec: inner=MemoryExec: partitions=1, partition_sizes=[1]", + use_sorted_assert: false, + }, + TestCase { + _desc: "grouped host conditions are handled in predicate pushdown", + sql: "SELECT * FROM meta_cache('cpu') \ + WHERE region = 'us-east' AND (host = 'a' OR host = 'b')", + expected: &[ + "+---------+------+", + "| region | host |", + "+---------+------+", + "| us-east | a |", + "| us-east | b |", + "+---------+------+", + ], + explain_contains: "MetaCacheExec: predicates=[[0 IN (us-east)], [1 IN (a,b)]] inner=MemoryExec: partitions=1, partition_sizes=[1]", + use_sorted_assert: false, + }, + TestCase { + _desc: "project region column", + sql: "SELECT region FROM meta_cache('cpu')", + expected: &[ + "+---------+", + "| region |", + "+---------+", + "| ca-cent |", + "| ca-east |", + "| ca-west |", + "| ca-west |", + "| eu-cent |", + "| eu-cent |", + "| eu-west |", + "| eu-west |", + "| us-east |", + "| us-east |", + "| us-west |", + "| us-west |", + "+---------+", + ], + explain_contains: "MetaCacheExec: inner=MemoryExec: partitions=1, partition_sizes=[1]", + use_sorted_assert: false, + }, + TestCase { + _desc: "project region column taking distinct", + sql: "SELECT DISTINCT(region) FROM meta_cache('cpu')", + expected: &[ + "+---------+", + "| region |", + "+---------+", + "| ca-cent |", + "| ca-east |", + "| ca-west |", + "| eu-cent |", + "| eu-west |", + "| us-east |", + "| us-west |", + "+---------+", + ], + explain_contains: "MetaCacheExec: inner=MemoryExec: partitions=1, partition_sizes=[1]", + // it seems that DISTINCT changes around the order of results + use_sorted_assert: true, + }, + TestCase { + _desc: "project host column", + sql: "SELECT host FROM meta_cache('cpu')", + expected: &[ + "+------+", // commenting for no new line + "| host |", // commenting for no new line + "+------+", // commenting for no new line + "| a |", // commenting for no new line + "| b |", // commenting for no new line + "| c |", // commenting for no new line + "| d |", // commenting for no new line + "| e |", // commenting for no new line + "| f |", // commenting for no new line + "| g |", // commenting for no new line + "| h |", // commenting for no new line + "| i |", // commenting for no new line + "| j |", // commenting for no new line + "| k |", // commenting for no new line + "| l |", // commenting for no new line + "+------+", // commenting for no new line + ], + explain_contains: "MetaCacheExec: inner=MemoryExec: partitions=1, partition_sizes=[1]", + // this column will not be sorted since the order of elements depends on the next level + // up in the cache, so the `region` column is iterated over in order, but the nested + // `host` values, although sorted within `region`s, will not be globally sorted. + use_sorted_assert: true, + }, + TestCase { + _desc: "limit clause", + sql: "SELECT * FROM meta_cache('cpu') LIMIT 8", + expected: &[ + "+---------+------+", + "| region | host |", + "+---------+------+", + "| ca-cent | f |", + "| ca-east | e |", + "| ca-west | g |", + "| ca-west | h |", + "| eu-cent | i |", + "| eu-cent | j |", + "| eu-west | k |", + "| eu-west | l |", + "+---------+------+", + ], + explain_contains: "MetaCacheExec: inner=MemoryExec: partitions=1, partition_sizes=[1]", + use_sorted_assert: false, + }, + TestCase { + _desc: "limit and offset", + sql: "SELECT * FROM meta_cache('cpu') LIMIT 8 OFFSET 8", + expected: &[ + "+---------+------+", + "| region | host |", + "+---------+------+", + "| us-east | a |", + "| us-east | b |", + "| us-west | c |", + "| us-west | d |", + "+---------+------+", + ], + explain_contains: "MetaCacheExec: inner=MemoryExec: partitions=1, partition_sizes=[1]", + use_sorted_assert: false, + }, + TestCase { + _desc: "like clause", + sql: "SELECT * FROM meta_cache('cpu') \ + WHERE region LIKE 'u%'", + expected: &[ + "+---------+------+", + "| region | host |", + "+---------+------+", + "| us-east | a |", + "| us-east | b |", + "| us-west | c |", + "| us-west | d |", + "+---------+------+", + ], + explain_contains: "MetaCacheExec: inner=MemoryExec: partitions=1, partition_sizes=[1]", + use_sorted_assert: false, + }, + ]; + + for tc in test_cases { + let results = ctx.sql(tc.sql).await.unwrap().collect().await.unwrap(); + // Uncommenting may make figuring out which test case is failing easier: + println!("test case: {}", tc._desc); + if tc.use_sorted_assert { + assert_batches_sorted_eq!(tc.expected, &results); + } else { + assert_batches_eq!(tc.expected, &results); + } + let explain = ctx + .sql(format!("EXPLAIN {sql}", sql = tc.sql).as_str()) + .await + .unwrap() + .collect() + .await + .unwrap() + .pop() + .unwrap(); + + // NOTE(hiltontj): this probably can be done a better way? + // The EXPLAIN output will have two columns, the one we are interested in that contains + // the details of the MetaCacheExec is called `plan`... + assert!(explain + .column_by_name("plan") + .unwrap() + .as_string::() + .iter() + .any(|plan| plan.is_some_and(|plan| plan.contains(tc.explain_contains))),); + } + } } diff --git a/influxdb3_cache/src/meta_cache/provider.rs b/influxdb3_cache/src/meta_cache/provider.rs new file mode 100644 index 00000000000..99269fbaf2b --- /dev/null +++ b/influxdb3_cache/src/meta_cache/provider.rs @@ -0,0 +1,310 @@ +use std::{collections::HashMap, sync::Arc, time::Duration}; + +use anyhow::Context; +use arrow::datatypes::SchemaRef; +use influxdb3_catalog::catalog::{Catalog, TableDefinition}; +use influxdb3_id::{DbId, TableId}; +use influxdb3_wal::{MetaCacheDefinition, WalContents, WalOp}; +use iox_time::TimeProvider; +use parking_lot::RwLock; + +use crate::meta_cache::cache::{MaxAge, MaxCardinality}; + +use super::cache::{CreateMetaCacheArgs, MetaCache}; + +#[derive(Debug, thiserror::Error)] +#[error("metadata cache provider error: {0:#}")] +pub struct Error(#[from] anyhow::Error); + +/// Triple nested map for storing a multiple metadata caches per table. +/// +/// That is, the map nesting is `database -> table -> cache name` +type CacheMap = RwLock, MetaCache>>>>; + +/// Provides the metadata caches for the running instance of InfluxDB +#[derive(Debug)] +pub struct MetaCacheProvider { + pub(crate) time_provider: Arc, + pub(crate) catalog: Arc, + pub(crate) cache_map: CacheMap, +} + +impl MetaCacheProvider { + /// Initialize a [`MetaCacheProvider`] from a [`Catalog`], populating the provider's + /// `cache_map` from the definitions in the catalog. + pub fn new_from_catalog( + time_provider: Arc, + catalog: Arc, + ) -> Result, Error> { + let provider = Arc::new(MetaCacheProvider { + time_provider, + catalog: Arc::clone(&catalog), + cache_map: Default::default(), + }); + for db_schema in catalog.list_db_schema() { + for table_def in db_schema.tables() { + for (cache_name, cache_def) in table_def.meta_caches() { + assert!( + provider + .create_cache( + db_schema.id, + Some(cache_name), + CreateMetaCacheArgs { + table_def: Arc::clone(&table_def), + max_cardinality: MaxCardinality::try_from( + cache_def.max_cardinality + )?, + max_age: MaxAge::from(Duration::from_secs( + cache_def.max_age_seconds + )), + column_ids: cache_def.column_ids.to_vec() + } + )? + .is_some(), + "there should not be duplicated cache definitions in the catalog" + ) + } + } + } + Ok(provider) + } + + /// Initialize a [`MetaCacheProvider`] from a [`Catalog`], populating the provider's + /// `cache_map` from the definitions in the catalog. This starts a background process that + /// runs on the provided `eviction_interval` to perform eviction on all of the caches + /// in the created [`MetaCacheProvider`]'s `cache_map`. + pub fn new_from_catalog_with_background_eviction( + time_provider: Arc, + catalog: Arc, + eviction_interval: Duration, + ) -> Result, Error> { + let provider = Self::new_from_catalog(time_provider, catalog)?; + + background_eviction_process(Arc::clone(&provider), eviction_interval); + + Ok(provider) + } + + /// Get a particular cache's name and arrow schema + /// + /// This is used for the implementation of DataFusion's `TableFunctionImpl` and + /// `TableProvider` traits as a convenience method for the scenario where there is only a + /// single cache on a table, and therefore one does not need to specify the cache name + /// in addition to the db/table identifiers. + pub(crate) fn get_cache_name_and_schema( + &self, + db_id: DbId, + table_id: TableId, + cache_name: Option<&str>, + ) -> Option<(Arc, SchemaRef)> { + self.cache_map + .read() + .get(&db_id) + .and_then(|db| db.get(&table_id)) + .and_then(|table| { + if let Some(cache_name) = cache_name { + table + .get_key_value(cache_name) + .map(|(name, mc)| (Arc::clone(name), mc.arrow_schema())) + } else if table.len() == 1 { + table + .iter() + .map(|(name, mc)| (Arc::clone(name), mc.arrow_schema())) + .next() + } else { + None + } + }) + } + + /// Create a new entry in the metadata cache for a given database and parameters. + /// + /// If a new cache is created, this will return the [`MetaCacheDefinition`] for the created + /// cache; otherwise, if the provided arguments are identical to an existing cache, along with + /// any defaults, then `None` will be returned. It is an error to attempt to create a cache that + /// overwite an existing one with different parameters. + /// + /// The cache name is optional; if not provided, it will be of the form: + /// ```text + /// __meta_cache + /// ``` + /// Where `` is an `_`-separated list of the column names used in the cache. + pub fn create_cache( + &self, + db_id: DbId, + cache_name: Option>, + CreateMetaCacheArgs { + table_def, + max_cardinality, + max_age, + column_ids, + }: CreateMetaCacheArgs, + ) -> Result, Error> { + let cache_name = if let Some(cache_name) = cache_name { + cache_name + } else { + format!( + "{table_name}_{cols}_meta_cache", + table_name = table_def.table_name, + cols = column_ids + .iter() + .map( + |id| table_def.column_id_to_name(id).with_context(|| format!( + "invalid column id ({id}) encountered in cache creation arguments" + )) + ) + .collect::, anyhow::Error>>()? + .join("_") + ) + .into() + }; + + let new_cache = MetaCache::new( + Arc::clone(&self.time_provider), + CreateMetaCacheArgs { + table_def: Arc::clone(&table_def), + max_cardinality, + max_age, + column_ids: column_ids.clone(), + }, + )?; + + let mut lock = self.cache_map.write(); + if let Some(cache) = lock + .get(&db_id) + .and_then(|db| db.get(&table_def.table_id)) + .and_then(|table| table.get(&cache_name)) + { + return cache + .compare_config(&new_cache) + .map(|_| None) + .map_err(Into::into); + } + + lock.entry(db_id) + .or_default() + .entry(table_def.table_id) + .or_default() + .insert(Arc::clone(&cache_name), new_cache); + + Ok(Some(MetaCacheDefinition { + table_id: table_def.table_id, + table_name: Arc::clone(&table_def.table_name), + cache_name, + column_ids, + max_cardinality: max_cardinality.into(), + max_age_seconds: max_age.as_seconds(), + })) + } + + /// Create a new cache given the database schema and WAL definition. This is useful during WAL + /// replay. + pub fn create_from_definition( + &self, + db_id: DbId, + table_def: Arc, + definition: &MetaCacheDefinition, + ) { + let meta_cache = MetaCache::new( + Arc::clone(&self.time_provider), + CreateMetaCacheArgs { + table_def, + max_cardinality: definition + .max_cardinality + .try_into() + .expect("definition should have a valid max_cardinality"), + max_age: MaxAge::from(Duration::from_secs(definition.max_age_seconds)), + column_ids: definition.column_ids.to_vec(), + }, + ) + .expect("definition should be valid coming from the WAL"); + self.cache_map + .write() + .entry(db_id) + .or_default() + .entry(definition.table_id) + .or_default() + .insert(Arc::clone(&definition.cache_name), meta_cache); + } + + /// Delete a cache from the provider + /// + /// This also cleans up the provider hierarchy, so if the delete leaves a branch for a given + /// table or its parent database empty, this will remove that branch. + pub fn delete_cache( + &self, + db_id: DbId, + table_id: TableId, + cache_name: &str, + ) -> Result<(), Error> { + let mut lock = self.cache_map.write(); + let db = lock.get_mut(&db_id).context("database does not exist")?; + let table = db.get_mut(&table_id).context("table does not exist")?; + table.remove(cache_name).context("cache does not exist")?; + if table.is_empty() { + db.remove(&table_id); + } + if db.is_empty() { + lock.remove(&db_id); + } + Ok(()) + } + + /// Write the contents of a WAL file to the cache by iterating over its database and table + /// batches to find entries that belong in the cache. + pub fn write_wal_contents_to_cache(&self, wal_contents: &WalContents) { + let mut lock = self.cache_map.write(); + for op in &wal_contents.ops { + let WalOp::Write(write_batch) = op else { + continue; + }; + let Some(db_caches) = lock.get_mut(&write_batch.database_id) else { + continue; + }; + if db_caches.is_empty() { + continue; + } + for (table_id, table_chunks) in &write_batch.table_chunks { + let Some(table_caches) = db_caches.get_mut(table_id) else { + continue; + }; + if table_caches.is_empty() { + continue; + } + for (_, cache) in table_caches.iter_mut() { + for chunk in table_chunks.chunk_time_to_chunk.values() { + for row in &chunk.rows { + cache.push(row); + } + } + } + } + } + } + + /// Run eviction across all caches in the provider. + pub fn evict_cache_entries(&self) { + let mut lock = self.cache_map.write(); + lock.iter_mut().for_each(|(_, db_caches)| { + db_caches.iter_mut().for_each(|(_, table_caches)| { + table_caches.iter_mut().for_each(|(_, cache)| cache.prune()) + }) + }); + } +} + +fn background_eviction_process( + provider: Arc, + eviction_interval: Duration, +) -> tokio::task::JoinHandle<()> { + tokio::spawn(async move { + let mut interval = tokio::time::interval(eviction_interval); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + loop { + interval.tick().await; + + provider.evict_cache_entries(); + } + }) +} diff --git a/influxdb3_cache/src/meta_cache/table_function.rs b/influxdb3_cache/src/meta_cache/table_function.rs new file mode 100644 index 00000000000..7150695a4ba --- /dev/null +++ b/influxdb3_cache/src/meta_cache/table_function.rs @@ -0,0 +1,359 @@ +use std::{any::Any, sync::Arc}; + +use arrow::{array::RecordBatch, datatypes::SchemaRef}; +use async_trait::async_trait; +use datafusion::{ + catalog::{Session, TableProvider}, + common::{internal_err, plan_err, DFSchema, Result}, + datasource::{function::TableFunctionImpl, TableType}, + execution::context::ExecutionProps, + logical_expr::TableProviderFilterPushDown, + physical_expr::{ + create_physical_expr, + utils::{Guarantee, LiteralGuarantee}, + }, + physical_plan::{memory::MemoryExec, DisplayAs, DisplayFormatType, ExecutionPlan}, + prelude::Expr, + scalar::ScalarValue, +}; +use indexmap::IndexMap; +use influxdb3_catalog::catalog::TableDefinition; +use influxdb3_id::{ColumnId, DbId}; + +use super::{cache::Predicate, MetaCacheProvider}; + +/// The name used to call the metadata cache in SQL queries +pub const META_CACHE_UDTF_NAME: &str = "meta_cache"; + +/// Implementor of the [`TableProvider`] trait that is produced a call to the [`MetaCacheFunction`] +struct MetaCacheFunctionProvider { + /// Reference to the [`MetaCache`][super::cache::MetaCache] being queried's schema + schema: SchemaRef, + /// Forwarded ref to the [`MetaCacheProvider`] which is used to get the + /// [`MetaCache`][super::cache::MetaCache] for the query, along with the `db_id` and + /// `table_def`. This is done instead of passing forward a reference to the `MetaCache` + /// directly because doing so is not easy or possible with the Rust borrow checker. + provider: Arc, + /// The database ID that the called cache is related to + db_id: DbId, + /// The table definition that the called cache is related to + table_def: Arc, + /// The name of the cache, which is determined when calling the `meta_cache` function + cache_name: Arc, +} + +#[async_trait] +impl TableProvider for MetaCacheFunctionProvider { + fn as_any(&self) -> &dyn Any { + self as &dyn Any + } + + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } + + fn table_type(&self) -> TableType { + TableType::Temporary + } + + fn supports_filters_pushdown( + &self, + filters: &[&Expr], + ) -> Result> { + Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()]) + } + + async fn scan( + &self, + ctx: &dyn Session, + projection: Option<&Vec>, + filters: &[Expr], + _limit: Option, + ) -> Result> { + let read = self.provider.cache_map.read(); + let (batches, predicates) = if let Some(cache) = read + .get(&self.db_id) + .and_then(|db| db.get(&self.table_def.table_id)) + .and_then(|tbl| tbl.get(&self.cache_name)) + { + let predicates = convert_filter_exprs(&self.table_def, self.schema(), filters)?; + ( + cache + .to_record_batch(&predicates) + .map(|batch| vec![batch])?, + (!predicates.is_empty()).then_some(predicates), + ) + } else { + (vec![], None) + }; + let mut exec = + MetaCacheExec::try_new(predicates, &[batches], self.schema(), projection.cloned())?; + + let show_sizes = ctx.config_options().explain.show_sizes; + exec = exec.with_show_sizes(show_sizes); + + Ok(Arc::new(exec)) + } +} + +/// Convert the given list of filter expressions to a map of [`ColumnId`] to [`Predicate`] +/// +/// The resulting map uses [`IndexMap`] to ensure consistent ordering of the map. This makes testing +/// the filter conversion significantly easier using EXPLAIN queries. +fn convert_filter_exprs( + table_def: &TableDefinition, + cache_schema: SchemaRef, + filters: &[Expr], +) -> Result> { + let mut predicate_map: IndexMap> = IndexMap::new(); + + // for create_physical_expr: + let schema: DFSchema = cache_schema.try_into()?; + let props = ExecutionProps::new(); + + // The set of `filters` that are passed in from DataFusion varies: 1) based on how they are + // defined in the query, and 2) based on some decisions that DataFusion makes when parsing the + // query into the `Expr` syntax tree. For example, the predicate: + // + // WHERE foo IN ('bar', 'baz') + // + // instead of being expressed as an `InList`, would be simplified to the following `Expr` tree: + // + // [ + // BinaryExpr { + // left: BinaryExpr { left: "foo", op: Eq, right: "bar" }, + // op: Or, + // right: BinaryExpr { left: "foo", op: Eq, right: "baz" } + // } + // ] + // + // while the predicate: + // + // WHERE foo = 'bar' OR foo = 'baz' OR foo = 'bop' OR foo = 'bla' + // + // instead of being expressed as a tree of `BinaryExpr`s, is expressed as an `InList` with four + // entries: + // + // [ + // InList { col: "foo", values: ["bar", "baz", "bop", "bla"], negated: false } + // ] + // + // Instead of handling all the combinations of `Expr`s that may be passed by the caller of + // `TableProider::scan`, we can use the cache's schema to convert each `Expr` to a `PhysicalExpr` + // and analyze it using DataFusion's `LiteralGuarantee`. + // + // This will distill the provided set of `Expr`s down to either an IN list, or a NOT IN list + // which we can convert to the `Predicate` type for the metadata cache. + // + // The main caveat is that if for some reason there are multiple `Expr`s that apply predicates + // on a given column, i.e., leading to multiple `LiteralGuarantee`s on a specific column, we + // discard those predicates and have DataFusion handle the filtering. + // + // This is a conservative approach; it may be that we can combine multiple literal guarantees on + // a single column, but thusfar, from testing in the parent module, this does not seem necessary. + + for expr in filters { + let physical_expr = create_physical_expr(expr, &schema, &props)?; + let literal_guarantees = LiteralGuarantee::analyze(&physical_expr); + for LiteralGuarantee { + column, + guarantee, + literals, + } in literal_guarantees + { + let Some(column_id) = table_def.column_name_to_id(column.name()) else { + return plan_err!( + "invalid column name in filter expression: {}", + column.name() + ); + }; + let value_iter = literals.into_iter().filter_map(|l| match l { + ScalarValue::Utf8(Some(s)) | ScalarValue::Utf8View(Some(s)) => Some(s), + _ => None, + }); + + let predicate = match guarantee { + Guarantee::In => Predicate::new_in(value_iter), + Guarantee::NotIn => Predicate::new_not_in(value_iter), + }; + predicate_map + .entry(column_id) + .and_modify(|e| { + // We do not currently support multiple literal guarantees per column. + // + // In this case we replace the predicate with None so that it does not filter + // any records from the cache downstream. Datafusion will still do filtering at + // a higher level, once _all_ records are produced from the cache. + e.take(); + }) + .or_insert_with(|| Some(predicate)); + } + } + + Ok(predicate_map + .into_iter() + .filter_map(|(column_id, predicate)| predicate.map(|predicate| (column_id, predicate))) + .collect()) +} + +/// Implementor of the [`TableFunctionImpl`] trait, to be registered as a user-defined table function +/// in the Datafusion `SessionContext`. +#[derive(Debug)] +pub struct MetaCacheFunction { + db_id: DbId, + provider: Arc, +} + +impl MetaCacheFunction { + pub fn new(db_id: DbId, provider: Arc) -> Self { + Self { db_id, provider } + } +} + +impl TableFunctionImpl for MetaCacheFunction { + fn call(&self, args: &[Expr]) -> Result> { + let Some(Expr::Literal(ScalarValue::Utf8(Some(table_name)))) = args.first() else { + return plan_err!("first argument must be the table name as a string"); + }; + let cache_name = match args.get(1) { + Some(Expr::Literal(ScalarValue::Utf8(Some(name)))) => Some(name), + Some(_) => { + return plan_err!("second argument, if passed, must be the cache name as a string") + } + None => None, + }; + + let Some(table_def) = self + .provider + .catalog + .db_schema_by_id(&self.db_id) + .and_then(|db| db.table_definition(table_name.as_str())) + else { + return plan_err!("provided table name ({}) is invalid", table_name); + }; + let Some((cache_name, schema)) = self.provider.get_cache_name_and_schema( + self.db_id, + table_def.table_id, + cache_name.map(|n| n.as_str()), + ) else { + return plan_err!("could not find meta cache for the given arguments"); + }; + Ok(Arc::new(MetaCacheFunctionProvider { + schema, + provider: Arc::clone(&self.provider), + db_id: self.db_id, + table_def, + cache_name, + })) + } +} + +/// Custom implementor of the [`ExecutionPlan`] trait for use by the metadata cache +/// +/// Wraps a [`MemoryExec`] from DataFusion, and mostly re-uses that. The special functionality +/// provided by this type is to track the predicates that are pushed down to the underlying cache +/// during query planning/execution. +/// +/// # Example +/// +/// For a query that does not provide any predicates, or one that does provide predicates, but they +/// do no get pushed down, the `EXPLAIN` for said query will contain a line for the `MetaCacheExec` +/// with no predicates, including what is emitted by the inner `MemoryExec`: +/// +/// ```text +/// MetaCacheExec: inner=MemoryExec: partitions=1, partition_sizes=[1] +/// ``` +/// +/// For queries that do have predicates that get pushed down, the output will include them, e.g.: +/// +/// ```text +/// MetaCacheExec: predicates=[[0 IN (us-east)], [1 IN (a,b)]] inner=MemoryExec: partitions=1, partition_sizes=[1] +/// ``` +#[derive(Debug)] +struct MetaCacheExec { + inner: MemoryExec, + predicates: Option>, +} + +impl MetaCacheExec { + fn try_new( + predicates: Option>, + partitions: &[Vec], + schema: SchemaRef, + projection: Option>, + ) -> Result { + Ok(Self { + inner: MemoryExec::try_new(partitions, schema, projection)?, + predicates, + }) + } + + fn with_show_sizes(self, show_sizes: bool) -> Self { + Self { + inner: self.inner.with_show_sizes(show_sizes), + ..self + } + } +} + +impl DisplayAs for MetaCacheExec { + fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!(f, "MetaCacheExec:")?; + if let Some(predicates) = self.predicates.as_ref() { + write!(f, " predicates=[")?; + let mut p_iter = predicates.iter(); + while let Some((col_id, predicate)) = p_iter.next() { + write!(f, "[{col_id} {predicate}]")?; + if p_iter.size_hint().0 > 0 { + write!(f, ", ")?; + } + } + write!(f, "]")?; + } + write!(f, " inner=")?; + self.inner.fmt_as(t, f) + } + } + } +} + +impl ExecutionPlan for MetaCacheExec { + fn name(&self) -> &str { + "MetaCacheExec" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &datafusion::physical_plan::PlanProperties { + self.inner.properties() + } + + fn children(&self) -> Vec<&Arc> { + self.inner.children() + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result> { + // (copied from MemoryExec): + // MemoryExec has no children + if children.is_empty() { + Ok(self) + } else { + internal_err!("Children cannot be replaced in {self:?}") + } + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> Result { + self.inner.execute(partition, context) + } +} diff --git a/influxdb3_catalog/src/catalog.rs b/influxdb3_catalog/src/catalog.rs index 64092b810a3..e94f3d44bbe 100644 --- a/influxdb3_catalog/src/catalog.rs +++ b/influxdb3_catalog/src/catalog.rs @@ -7,7 +7,7 @@ use indexmap::IndexMap; use influxdb3_id::{ColumnId, DbId, SerdeVecMap, TableId}; use influxdb3_wal::{ CatalogBatch, CatalogOp, DeleteTableDefinition, FieldAdditions, LastCacheDefinition, - LastCacheDelete, + LastCacheDelete, MetaCacheDefinition, }; use influxdb_line_protocol::FieldValue; use iox_time::Time; @@ -709,6 +709,7 @@ pub struct TableDefinition { pub column_map: BiHashMap>, pub series_key: Option>, pub last_caches: HashMap, LastCacheDefinition>, + pub meta_caches: HashMap, MetaCacheDefinition>, pub deleted: bool, } @@ -764,6 +765,7 @@ impl TableDefinition { column_map, series_key, last_caches: HashMap::new(), + meta_caches: HashMap::new(), deleted: false, }) } @@ -993,6 +995,12 @@ impl TableDefinition { .map(|(name, def)| (Arc::clone(name), def)) } + pub fn meta_caches(&self) -> impl Iterator, &MetaCacheDefinition)> { + self.meta_caches + .iter() + .map(|(name, def)| (Arc::clone(name), def)) + } + pub fn column_name_to_id(&self, name: impl Into>) -> Option { self.column_map.get_by_right(&name.into()).copied() } diff --git a/influxdb3_wal/src/create.rs b/influxdb3_wal/src/create.rs index c91cf2ddfe8..915afaebfa6 100644 --- a/influxdb3_wal/src/create.rs +++ b/influxdb3_wal/src/create.rs @@ -19,6 +19,10 @@ pub fn wal_contents( } } +pub fn write_batch_op(write_batch: WriteBatch) -> WalOp { + WalOp::Write(write_batch) +} + pub fn catalog_batch_op( db_id: DbId, db_name: impl Into>, diff --git a/influxdb3_wal/src/lib.rs b/influxdb3_wal/src/lib.rs index f70ce9133d2..3bb59d10cb5 100644 --- a/influxdb3_wal/src/lib.rs +++ b/influxdb3_wal/src/lib.rs @@ -493,6 +493,23 @@ pub struct LastCacheDelete { pub name: Arc, } +/// Defines a metadata cache in a given table and database +#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)] +pub struct MetaCacheDefinition { + /// The id of the associated table + pub table_id: TableId, + /// The name of the associated table + pub table_name: Arc, + /// The name of the cache, is unique within the associated table + pub cache_name: Arc, + /// The ids of columns tracked by this metadata cache, in the defined order + pub column_ids: Vec, + /// The maximum number of distinct value combintions the cache will hold + pub max_cardinality: usize, + /// The maximum age in seconds, similar to a time-to-live (TTL), for entries in the cache + pub max_age_seconds: u64, +} + #[serde_as] #[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] pub struct WriteBatch { diff --git a/influxdb3_write/src/write_buffer/validator.rs b/influxdb3_write/src/write_buffer/validator.rs index e0632781612..845ef8cd5c0 100644 --- a/influxdb3_write/src/write_buffer/validator.rs +++ b/influxdb3_write/src/write_buffer/validator.rs @@ -723,7 +723,7 @@ fn validate_and_qualify_v1_line( /// Result of conversion from line protocol to valid chunked data /// for the buffer. #[derive(Debug)] -pub(crate) struct ValidatedLines { +pub struct ValidatedLines { /// Number of lines passed in pub(crate) line_count: usize, /// Number of fields passed in @@ -738,6 +738,12 @@ pub(crate) struct ValidatedLines { pub(crate) catalog_updates: Option, } +impl From for WriteBatch { + fn from(value: ValidatedLines) -> Self { + value.valid_data + } +} + impl WriteValidator { /// Convert this into the inner [`LinesParsed`] /// @@ -752,7 +758,7 @@ impl WriteValidator { /// This involves splitting out the writes into different batches for each chunk, which will /// map to the `Gen1Duration`. This function should be infallible, because /// the schema for incoming writes has been fully validated. - pub(crate) fn convert_lines_to_buffer(self, gen1_duration: Gen1Duration) -> ValidatedLines { + pub fn convert_lines_to_buffer(self, gen1_duration: Gen1Duration) -> ValidatedLines { let mut table_chunks = IndexMap::new(); let line_count = self.state.lines.len(); let mut field_count = 0;