From 088df3931df5feabb0e00b13a12a3301462d8649 Mon Sep 17 00:00:00 2001 From: Trevor Hilton Date: Mon, 18 Nov 2024 16:19:22 -0500 Subject: [PATCH] feat: add metadata cache provider with APIs for write and query This adds the MetaDataCacheProvider for managing metadata caches in the influxdb3 instance. This includes APIs to create caches through the WAL as well as from a catalog on initialization, to write data into the managed caches, and to query data out of them. The query side is fairly involved, relying on Datafusion's TableFunctionImpl and TableProvider traits to make querying the cache using a user-defined table function (UDTF) possible. The predicate code was modified to only support two kinds of predicates: IN and NOT IN, which simplifies the code, and maps nicely with the DataFusion LiteralGuarantee which we leverage to derive the predicates from the incoming queries. A custom ExecutionPlan implementation was added specifically for the metadata cache that can report the predicates that are pushed down to the cache during query planning/execution. A big set of tests was added to to check that queries are working, and that predicates are being pushed down properly. --- Cargo.lock | 7 +- influxdb3_cache/Cargo.toml | 8 +- influxdb3_cache/src/meta_cache/cache.rs | 513 ++++++++ influxdb3_cache/src/meta_cache/mod.rs | 1121 ++++++++--------- influxdb3_cache/src/meta_cache/provider.rs | 310 +++++ .../src/meta_cache/table_function.rs | 359 ++++++ influxdb3_catalog/src/catalog.rs | 9 + influxdb3_wal/src/create.rs | 4 + influxdb3_wal/src/lib.rs | 17 + influxdb3_write/src/write_buffer/validator.rs | 10 +- 10 files changed, 1762 insertions(+), 596 deletions(-) create mode 100644 influxdb3_cache/src/meta_cache/cache.rs create mode 100644 influxdb3_cache/src/meta_cache/provider.rs create mode 100644 influxdb3_cache/src/meta_cache/table_function.rs diff --git a/Cargo.lock b/Cargo.lock index 7b7ab35a47f..605bc50c8e5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2755,13 +2755,18 @@ version = "0.1.0" dependencies = [ "anyhow", "arrow", - "arrow_util", + "async-trait", + "datafusion", + "indexmap 2.6.0", "influxdb3_catalog", "influxdb3_id", "influxdb3_wal", "influxdb3_write", "iox_time", + "parking_lot", "schema", + "thiserror 1.0.69", + "tokio", ] [[package]] diff --git a/influxdb3_cache/Cargo.toml b/influxdb3_cache/Cargo.toml index 4b7fb0f0de9..21c42be9972 100644 --- a/influxdb3_cache/Cargo.toml +++ b/influxdb3_cache/Cargo.toml @@ -18,10 +18,14 @@ influxdb3_wal = { path = "../influxdb3_wal" } # crates.io dependencies anyhow.workspace = true arrow.workspace = true +async-trait.workspace = true +datafusion.workspace = true +indexmap.workspace = true +parking_lot.workspace = true +thiserror.workspace = true +tokio.workspace = true [dev-dependencies] -# Core Crates -arrow_util.workspace = true # Local deps influxdb3_write = { path = "../influxdb3_write" } diff --git a/influxdb3_cache/src/meta_cache/cache.rs b/influxdb3_cache/src/meta_cache/cache.rs new file mode 100644 index 00000000000..5a5f17ca754 --- /dev/null +++ b/influxdb3_cache/src/meta_cache/cache.rs @@ -0,0 +1,513 @@ +use std::{ + collections::{BTreeMap, BTreeSet, BinaryHeap}, + num::NonZeroUsize, + sync::Arc, + time::Duration, +}; + +use anyhow::{bail, Context}; +use arrow::{ + array::{ArrayRef, RecordBatch, StringViewBuilder}, + datatypes::{DataType, Field, SchemaBuilder, SchemaRef}, + error::ArrowError, +}; +use indexmap::IndexMap; +use influxdb3_catalog::catalog::TableDefinition; +use influxdb3_id::ColumnId; +use influxdb3_wal::{FieldData, Row}; +use iox_time::TimeProvider; +use schema::{InfluxColumnType, InfluxFieldType}; + +/// A metadata cache for storing distinct values for a set of columns in a table +#[derive(Debug)] +pub(crate) struct MetaCache { + time_provider: Arc, + /// The maximum number of unique value combinations in the cache + max_cardinality: usize, + /// The maximum age for entries in the cache + max_age: Duration, + /// The fixed Arrow schema used to produce record batches from the cache + schema: SchemaRef, + /// Holds current state of the cache + state: MetaCacheState, + /// The identifiers of the columns used in the cache + column_ids: Vec, + /// The cache data, stored in a tree + data: Node, +} + +/// Type for tracking the current state of a [`MetaCache`] +#[derive(Debug, Default)] +struct MetaCacheState { + /// The current number of unique value combinations in the cache + cardinality: usize, +} + +/// Arguments to create a new [`MetaCache`] +#[derive(Debug)] +pub struct CreateMetaCacheArgs { + pub table_def: Arc, + pub max_cardinality: MaxCardinality, + pub max_age: MaxAge, + pub column_ids: Vec, +} + +#[derive(Debug, Clone, Copy)] +pub struct MaxCardinality(NonZeroUsize); + +impl TryFrom for MaxCardinality { + type Error = anyhow::Error; + + fn try_from(value: usize) -> Result { + Ok(Self( + NonZeroUsize::try_from(value).context("invalid size provided")?, + )) + } +} + +impl Default for MaxCardinality { + fn default() -> Self { + Self(NonZeroUsize::new(100_000).unwrap()) + } +} + +impl From for usize { + fn from(value: MaxCardinality) -> Self { + value.0.into() + } +} + +#[derive(Debug, Clone, Copy)] +pub struct MaxAge(Duration); + +impl Default for MaxAge { + fn default() -> Self { + Self(Duration::from_secs(24 * 60 * 60)) + } +} + +impl From for Duration { + fn from(value: MaxAge) -> Self { + value.0 + } +} + +impl From for MaxAge { + fn from(duration: Duration) -> Self { + Self(duration) + } +} + +impl MaxAge { + pub(crate) fn as_seconds(&self) -> u64 { + self.0.as_secs() + } +} + +impl MetaCache { + /// Create a new [`MetaCache`] + /// + /// Must pass a non-empty set of [`ColumnId`]s which correspond to valid columns in the provided + /// [`TableDefinition`]. + pub(crate) fn new( + time_provider: Arc, + CreateMetaCacheArgs { + table_def, + max_cardinality, + max_age, + column_ids, + }: CreateMetaCacheArgs, + ) -> Result { + if column_ids.is_empty() { + bail!("must pass a non-empty set of column ids"); + } + let mut builder = SchemaBuilder::new(); + for id in &column_ids { + let col = table_def.columns.get(id).with_context(|| { + format!("invalid column id ({id}) encountered while creating metadata cache") + })?; + let data_type = match col.data_type { + InfluxColumnType::Tag | InfluxColumnType::Field(InfluxFieldType::String) => { + DataType::Utf8View + } + other => bail!( + "cannot use a column of type {other} in a metadata cache, only \ + tags and string fields can be used" + ), + }; + + builder.push(Arc::new(Field::new(col.name.as_ref(), data_type, false))); + } + Ok(Self { + time_provider, + max_cardinality: max_cardinality.into(), + max_age: max_age.into(), + state: MetaCacheState::default(), + schema: Arc::new(builder.finish()), + column_ids, + data: Node::default(), + }) + } + + /// Push a [`Row`] from the WAL into the cache, if the row contains all of the cached columns. + pub(crate) fn push(&mut self, row: &Row) { + let mut values = Vec::with_capacity(self.column_ids.len()); + for id in &self.column_ids { + let Some(value) = row + .fields + .iter() + .find(|f| &f.id == id) + .map(|f| Value::from(&f.value)) + else { + // ignore the row if it does not contain all columns in the cache: + return; + }; + values.push(value); + } + let mut target = &mut self.data; + let mut val_iter = values.into_iter().peekable(); + let mut is_new = false; + while let (Some(value), peek) = (val_iter.next(), val_iter.peek()) { + let (last_seen, node) = target.0.entry(value).or_insert_with(|| { + is_new = true; + (row.time, peek.is_some().then(Node::default)) + }); + *last_seen = row.time; + if let Some(node) = node { + target = node; + } else { + break; + } + } + if is_new { + self.state.cardinality += 1; + } + } + + /// Gather a record batch from a cache given the set of predicates + /// + /// This assumes the predicates are well behaved, and validated before being passed in. For example, + /// there cannot be multiple predicates on a single column; the caller needs to validate/transform + /// the incoming predicates from Datafusion before calling. + /// + /// Entries in the cache that have not been seen since before the `max_age` of the cache will be + /// filtered out of the result. + pub(crate) fn to_record_batch( + &self, + predicates: &IndexMap, + ) -> Result { + // predicates may not be provided for all columns in the cache, or not be provided in the + // order of columns in the cache. This re-orders them to the correct order, and fills in any + // gaps with None. + let predicates: Vec> = self + .column_ids + .iter() + .map(|id| predicates.get(id)) + .collect(); + + // Uses a [`StringViewBuilder`] to compose the set of [`RecordBatch`]es. This is convenient for + // the sake of nested caches, where a predicate on a higher branch in the cache will need to have + // its value in the outputted batches duplicated. + let mut builders: Vec = (0..self.column_ids.len()) + .map(|_| StringViewBuilder::new()) + .collect(); + + let expired_time_ns = self.expired_time_ns(); + let _ = + self.data + .evaluate_predicates(expired_time_ns, predicates.as_slice(), &mut builders); + + RecordBatch::try_new( + Arc::clone(&self.schema), + builders + .into_iter() + .map(|mut builder| Arc::new(builder.finish()) as ArrayRef) + .collect(), + ) + } + + /// Prune nodes from within the cache + /// + /// This first prunes entries that are older than the `max_age` of the cache. If the cardinality + /// of the cache is still over its `max_cardinality`, it will do another pass to bring the cache + /// size down. + pub(crate) fn prune(&mut self) { + let before_time_ns = self.expired_time_ns(); + let _ = self.data.remove_before(before_time_ns); + self.state.cardinality = self.data.cardinality(); + if self.state.cardinality > self.max_cardinality { + let n_to_remove = self.state.cardinality - self.max_cardinality; + self.data.remove_n_oldest(n_to_remove); + self.state.cardinality = self.data.cardinality(); + } + } + + /// Get the nanosecond timestamp as an `i64`, before which, entries that have not been seen + /// since are considered expired. + fn expired_time_ns(&self) -> i64 { + self.time_provider + .now() + .checked_sub(self.max_age) + .expect("max age on cache should not cause an overflow") + .timestamp_nanos() + } + + /// Get the arrow [`SchemaRef`] for this cache + pub(crate) fn arrow_schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } + + /// Compare the configuration of a given cache, producing a helpful error message if they differ + pub(crate) fn compare_config(&self, other: &Self) -> Result<(), anyhow::Error> { + if self.max_cardinality != other.max_cardinality { + bail!( + "incompatible `max_cardinality`, expected {}, got {}", + self.max_cardinality, + other.max_cardinality + ); + } + if self.max_age != other.max_age { + bail!( + "incompatible `max_age`, expected {}, got {}", + self.max_age.as_secs(), + other.max_age.as_secs() + ) + } + if self.column_ids != other.column_ids { + bail!( + "incompatible column id selection, expected {}, got {}", + self.column_ids + .iter() + .map(|id| id.to_string()) + .collect::>() + .join(","), + other + .column_ids + .iter() + .map(|id| id.to_string()) + .collect::>() + .join(","), + ) + } + + Ok(()) + } +} + +/// A node in the `data` tree of a [`MetaCache`] +/// +/// Recursive struct holding a [`BTreeMap`] whose keys are the values nested under this node, and +/// whose values hold the last seen time as an [`i64`] of each value, and an optional reference to +/// the node in the next level of the tree. +#[derive(Debug, Default)] +struct Node(BTreeMap)>); + +impl Node { + /// Remove all elements before the given nanosecond timestamp returning `true` if the resulting + /// node is empty. + fn remove_before(&mut self, time_ns: i64) -> bool { + self.0.retain(|_, (last_seen, node)| { + // Note that for a branch node, the `last_seen` time will be the greatest of + // all its nodes, so an entire branch can be removed if its value has not been seen since + // before `time_ns`, hence the short-curcuit here: + *last_seen > time_ns + || node + .as_mut() + .is_some_and(|node| node.remove_before(time_ns)) + }); + self.0.is_empty() + } + + /// Remove the `n_to_remove` oldest entries from the cache + fn remove_n_oldest(&mut self, n_to_remove: usize) { + let mut times = BinaryHeap::with_capacity(n_to_remove); + self.find_n_oldest(n_to_remove, &mut times); + self.remove_before(*times.peek().unwrap()); + } + + /// Use a binary heap to find the time before which all nodes should be removed + fn find_n_oldest(&self, n_to_remove: usize, times_heap: &mut BinaryHeap) { + self.0 + .values() + // do not need to add the last_seen time for a branch node to the + // heap since it will be equal to the greatest of that of its leaves + .for_each(|(last_seen, node)| { + if let Some(node) = node { + node.find_n_oldest(n_to_remove, times_heap) + } else if times_heap.len() < n_to_remove { + times_heap.push(*last_seen); + } else if times_heap.peek().is_some_and(|newest| last_seen < newest) { + times_heap.pop(); + times_heap.push(*last_seen); + } + }); + } + + /// Get the total count of unique value combinations nested under this node + /// + /// Note that this includes expired elements, which still contribute to the total size of the + /// cache until they are pruned. + fn cardinality(&self) -> usize { + self.0 + .values() + .map(|(_, node)| node.as_ref().map_or(1, |node| node.cardinality())) + .sum() + } + + /// Evaluate the set of provided predicates against this node, adding values to the provided + /// [`StringViewBuilder`]s. Predicates and builders are provided as slices, as this is called + /// recursively down the cache tree. + /// + /// Returns the number of values that were added to the arrow builders. + /// + /// # Panics + /// + /// This will panic if invalid sized `predicates` and `builders` slices are passed in. When + /// called from the root [`Node`], their size should be that of the depth of the cache, i.e., + /// the number of columns in the cache. + fn evaluate_predicates( + &self, + expired_time_ns: i64, + predicates: &[Option<&Predicate>], + builders: &mut [StringViewBuilder], + ) -> usize { + let mut total_count = 0; + let (predicate, next_predicates) = predicates + .split_first() + .expect("predicates should not be empty"); + // if there is a predicate, evaluate it, otherwise, just grab everything from the node: + let values_and_nodes = if let Some(predicate) = predicate { + self.evaluate_predicate(expired_time_ns, predicate) + } else { + self.0 + .iter() + .filter(|&(_, (t, _))| (t > &expired_time_ns)) + .map(|(v, (_, n))| (v.clone(), n.as_ref())) + .collect() + }; + let (builder, next_builders) = builders + .split_first_mut() + .expect("builders should not be empty"); + // iterate over the resulting set of values and next nodes (if this is a branch), and add + // the values to the arrow builders: + for (value, node) in values_and_nodes { + if let Some(node) = node { + let count = + node.evaluate_predicates(expired_time_ns, next_predicates, next_builders); + if count > 0 { + // we are not on a terminal node in the cache, so create a block, as this value + // repeated `count` times, i.e., depending on how many values come out of + // subsequent nodes: + let block = builder.append_block(value.0.as_bytes().into()); + for _ in 0..count { + builder + .try_append_view(block, 0u32, value.0.as_bytes().len() as u32) + .expect("append view for known valid block, offset and length"); + } + total_count += count; + } + } else { + builder.append_value(value.0); + total_count += 1; + } + } + total_count + } + + /// Evaluate a predicate against a [`Node`], producing a list of [`Value`]s and, if this is a + /// branch node in the cache tree, a reference to the next [`Node`]. + fn evaluate_predicate( + &self, + expired_time_ns: i64, + predicate: &Predicate, + ) -> Vec<(Value, Option<&Node>)> { + match &predicate { + Predicate::In(in_list) => in_list + .iter() + .filter_map(|v| { + self.0.get_key_value(v).and_then(|(v, (t, n))| { + (t > &expired_time_ns).then(|| (v.clone(), n.as_ref())) + }) + }) + .collect(), + Predicate::NotIn(not_in_set) => self + .0 + .iter() + .filter(|(v, (t, _))| t > &expired_time_ns && !not_in_set.contains(v)) + .map(|(v, (_, n))| (v.clone(), n.as_ref())) + .collect(), + } + } +} + +/// A cache value, which for now, only holds strings +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Hash)] +pub(crate) struct Value(Arc); + +impl From<&FieldData> for Value { + fn from(field: &FieldData) -> Self { + match field { + FieldData::Key(s) => Self(Arc::from(s.as_str())), + FieldData::Tag(s) => Self(Arc::from(s.as_str())), + FieldData::String(s) => Self(Arc::from(s.as_str())), + FieldData::Timestamp(_) + | FieldData::Integer(_) + | FieldData::UInteger(_) + | FieldData::Float(_) + | FieldData::Boolean(_) => panic!("unexpected field type for metadata cache"), + } + } +} + +impl From for Value { + fn from(value: String) -> Self { + Self(Arc::from(value.as_str())) + } +} + +/// A predicate that can be applied when gathering [`RecordBatch`]es from a [`MetaCache`] +/// +/// This is intended to be derived from a set of filter expressions in Datafusion by analyzing +/// them with a `LiteralGuarantee`. +/// +/// This uses a `BTreeSet` to store the values so that they are iterated over in sorted order. +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) enum Predicate { + In(BTreeSet), + NotIn(BTreeSet), +} + +impl std::fmt::Display for Predicate { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Predicate::In(_) => write!(f, "IN (")?, + Predicate::NotIn(_) => write!(f, "NOT IN (")?, + } + let mut values = self.values(); + while let Some(v) = values.next() { + write!(f, "{}", v.0)?; + if values.size_hint().0 > 0 { + write!(f, ",")?; + } + } + + write!(f, ")") + } +} + +impl Predicate { + pub(crate) fn new_in(in_vals: impl IntoIterator>>) -> Self { + Self::In(in_vals.into_iter().map(Into::into).map(Value).collect()) + } + + pub(crate) fn new_not_in(in_vals: impl IntoIterator>>) -> Self { + Self::NotIn(in_vals.into_iter().map(Into::into).map(Value).collect()) + } + + fn values(&self) -> impl Iterator { + match self { + Predicate::In(vals) => vals.iter(), + Predicate::NotIn(vals) => vals.iter(), + } + } +} diff --git a/influxdb3_cache/src/meta_cache/mod.rs b/influxdb3_cache/src/meta_cache/mod.rs index 67f9eeba9cd..ea50ab66683 100644 --- a/influxdb3_cache/src/meta_cache/mod.rs +++ b/influxdb3_cache/src/meta_cache/mod.rs @@ -1,493 +1,28 @@ //! The Metadata Cache holds the distinct values for a column or set of columns on a table -use std::{ - cmp::Eq, - collections::{BTreeMap, BinaryHeap, HashSet}, - num::NonZeroUsize, - sync::Arc, - time::Duration, -}; - -use anyhow::{bail, Context}; -use arrow::{ - array::{ArrayRef, RecordBatch, StringViewBuilder}, - datatypes::{DataType, Field, SchemaBuilder, SchemaRef}, - error::ArrowError, -}; -use influxdb3_catalog::catalog::TableDefinition; -use influxdb3_id::ColumnId; -use influxdb3_wal::{FieldData, Row}; -use iox_time::TimeProvider; -use schema::{InfluxColumnType, InfluxFieldType}; - -/// A metadata cache for storing distinct values for a set of columns in a table -#[derive(Debug)] -pub struct MetaCache { - time_provider: Arc, - /// The maximum number of unique value combinations in the cache - max_cardinality: usize, - /// The maximum age for entries in the cache - max_age: Duration, - /// The fixed Arrow schema used to produce record batches from the cache - schema: SchemaRef, - /// Holds current state of the cache - state: MetaCacheState, - /// The identifiers of the columns used in the cache - column_ids: Vec, - /// The cache data, stored in a tree - data: Node, -} - -/// Type for tracking the current state of a [`MetaCache`] -#[derive(Debug, Default)] -struct MetaCacheState { - /// The current number of unique value combinations in the cache - cardinality: usize, -} - -/// Arguments to create a new [`MetaCache`] -#[derive(Debug)] -pub struct CreateMetaCacheArgs { - pub time_provider: Arc, - pub table_def: Arc, - pub max_cardinality: MaxCardinality, - pub max_age: MaxAge, - pub column_ids: Vec, -} - -#[derive(Debug, Clone, Copy)] -pub struct MaxCardinality(NonZeroUsize); - -#[cfg(test)] -impl MaxCardinality { - fn try_new(size: usize) -> Result { - Ok(Self( - NonZeroUsize::try_from(size).context("invalid size provided")?, - )) - } -} - -impl Default for MaxCardinality { - fn default() -> Self { - Self(NonZeroUsize::new(100_000).unwrap()) - } -} - -impl From for usize { - fn from(value: MaxCardinality) -> Self { - value.0.into() - } -} - -#[derive(Debug, Clone, Copy)] -pub struct MaxAge(Duration); - -impl Default for MaxAge { - fn default() -> Self { - Self(Duration::from_secs(24 * 60 * 60)) - } -} - -impl From for Duration { - fn from(value: MaxAge) -> Self { - value.0 - } -} - -impl From for MaxAge { - fn from(duration: Duration) -> Self { - Self(duration) - } -} - -impl MetaCache { - /// Create a new [`MetaCache`] - /// - /// Must pass a non-empty set of [`ColumnId`]s which correspond to valid columns in the provided - /// [`TableDefinition`]. - pub fn new( - CreateMetaCacheArgs { - time_provider, - table_def, - max_cardinality, - max_age, - column_ids, - }: CreateMetaCacheArgs, - ) -> Result { - if column_ids.is_empty() { - bail!("must pass a non-empty set of column ids"); - } - let mut builder = SchemaBuilder::new(); - for id in &column_ids { - let col = table_def.columns.get(id).with_context(|| { - format!("invalid column id ({id}) encountered while creating metadata cache") - })?; - let data_type = match col.data_type { - InfluxColumnType::Tag | InfluxColumnType::Field(InfluxFieldType::String) => { - DataType::Utf8View - } - other => bail!( - "cannot use a column of type {other} in a metadata cache, only \ - tags and string fields can be used" - ), - }; - - builder.push(Arc::new(Field::new(col.name.as_ref(), data_type, false))); - } - Ok(Self { - time_provider, - max_cardinality: max_cardinality.into(), - max_age: max_age.into(), - state: MetaCacheState::default(), - schema: Arc::new(builder.finish()), - column_ids, - data: Node::default(), - }) - } - - /// Push a [`Row`] from the WAL into the cache, if the row contains all of the cached columns. - pub fn push(&mut self, row: &Row) { - let mut values = Vec::with_capacity(self.column_ids.len()); - for id in &self.column_ids { - let Some(value) = row - .fields - .iter() - .find(|f| &f.id == id) - .map(|f| Value::from(&f.value)) - else { - // ignore the row if it does not contain all columns in the cache: - return; - }; - values.push(value); - } - let mut target = &mut self.data; - let mut val_iter = values.into_iter().peekable(); - let mut is_new = false; - while let (Some(value), peek) = (val_iter.next(), val_iter.peek()) { - let (last_seen, node) = target.0.entry(value).or_insert_with(|| { - is_new = true; - (row.time, peek.is_some().then(Node::default)) - }); - *last_seen = row.time; - if let Some(node) = node { - target = node; - } else { - break; - } - } - if is_new { - self.state.cardinality += 1; - } - } - - /// Gather a record batch from a cache given the set of predicates - /// - /// This assumes the predicates are well behaved, and validated before being passed in. For example, - /// there cannot be multiple predicates on a single column; the caller needs to validate/transform - /// the incoming predicates from Datafusion before calling. - /// - /// Entries in the cache that have not been seen since before the `max_age` of the cache will be - /// filtered out of the result. - pub fn to_record_batch(&self, predicates: &[Predicate]) -> Result { - // predicates may not be provided for all columns in the cache, or not be provided in the - // order of columns in the cache. This re-orders them to the correct order, and fills in any - // gaps with None. - let predicates: Vec> = self - .column_ids - .iter() - .map(|id| predicates.iter().find(|p| &p.column_id == id)) - .collect(); - - // Uses a [`StringViewBuilder`] to compose the set of [`RecordBatch`]es. This is convenient for - // the sake of nested caches, where a predicate on a higher branch in the cache will need to have - // its value in the outputted batches duplicated. - let mut builders: Vec = (0..self.column_ids.len()) - .map(|_| StringViewBuilder::new()) - .collect(); - - let expired_time_ns = self.expired_time_ns(); - let _ = self - .data - .evaluate_predicates(expired_time_ns, &predicates, &mut builders); - - RecordBatch::try_new( - Arc::clone(&self.schema), - builders - .into_iter() - .map(|mut builder| Arc::new(builder.finish()) as ArrayRef) - .collect(), - ) - } - - /// Prune nodes from within the cache - /// - /// This first prunes entries that are older than the `max_age` of the cache. If the cardinality - /// of the cache is still over its `max_cardinality`, it will do another pass to bring the cache - /// size down. - pub fn prune(&mut self) { - let before_time_ns = self.expired_time_ns(); - let _ = self.data.remove_before(before_time_ns); - self.state.cardinality = self.data.cardinality(); - if self.state.cardinality > self.max_cardinality { - let n_to_remove = self.state.cardinality - self.max_cardinality; - self.data.remove_n_oldest(n_to_remove); - self.state.cardinality = self.data.cardinality(); - } - } - - /// Get the nanosecond timestamp as an `i64`, before which, entries that have not been seen - /// since are considered expired. - fn expired_time_ns(&self) -> i64 { - self.time_provider - .now() - .checked_sub(self.max_age) - .expect("max age on cache should not cause an overflow") - .timestamp_nanos() - } -} - -/// A node in the `data` tree of a [`MetaCache`] -/// -/// Recursive struct holding a [`BTreeMap`] whose keys are the values nested under this node, and -/// whose values hold the last seen time as an [`i64`] of each value, and an optional reference to -/// the node in the next level of the tree. -#[derive(Debug, Default)] -struct Node(BTreeMap)>); - -impl Node { - /// Remove all elements before the given nanosecond timestamp returning `true` if the resulting - /// node is empty. - fn remove_before(&mut self, time_ns: i64) -> bool { - self.0.retain(|_, (last_seen, node)| { - // Note that for a branch node, the `last_seen` time will be the greatest of - // all its nodes, so an entire branch can be removed if its value has not been seen since - // before `time_ns`, hence the short-curcuit here: - *last_seen > time_ns - || node - .as_mut() - .is_some_and(|node| node.remove_before(time_ns)) - }); - self.0.is_empty() - } - - /// Remove the `n_to_remove` oldest entries from the cache - fn remove_n_oldest(&mut self, n_to_remove: usize) { - let mut times = BinaryHeap::with_capacity(n_to_remove); - self.find_n_oldest(n_to_remove, &mut times); - self.remove_before(*times.peek().unwrap()); - } - - /// Use a binary heap to find the time before which all nodes should be removed - fn find_n_oldest(&self, n_to_remove: usize, times_heap: &mut BinaryHeap) { - self.0 - .values() - // do not need to add the last_seen time for a branch node to the - // heap since it will be equal to the greatest of that of its leaves - .for_each(|(last_seen, node)| { - if let Some(node) = node { - node.find_n_oldest(n_to_remove, times_heap) - } else if times_heap.len() < n_to_remove { - times_heap.push(*last_seen); - } else if times_heap.peek().is_some_and(|newest| last_seen < newest) { - times_heap.pop(); - times_heap.push(*last_seen); - } - }); - } - - /// Get the total count of unique value combinations nested under this node - /// - /// Note that this includes expired elements, which still contribute to the total size of the - /// cache until they are pruned. - fn cardinality(&self) -> usize { - self.0 - .values() - .map(|(_, node)| node.as_ref().map_or(1, |node| node.cardinality())) - .sum() - } - - /// Evaluate the set of provided predicates against this node, adding values to the provided - /// [`StringViewBuilder`]s. Predicates and builders are provided as slices, as this is called - /// recursively down the cache tree. - /// - /// Returns the number of values that were added to the arrow builders. - /// - /// # Panics - /// - /// This will panic if invalid sized `predicates` and `builders` slices are passed in. When - /// called from the root [`Node`], their size should be that of the depth of the cache, i.e., - /// the number of columns in the cache. - fn evaluate_predicates( - &self, - expired_time_ns: i64, - predicates: &[Option<&Predicate>], - builders: &mut [StringViewBuilder], - ) -> usize { - let mut total_count = 0; - let (predicate, next_predicates) = predicates - .split_first() - .expect("predicates should not be empty"); - // if there is a predicate, evaluate it, otherwise, just grab everything from the node: - let values_and_nodes = if let Some(predicate) = predicate { - self.evaluate_predicate(expired_time_ns, predicate) - } else { - self.0 - .iter() - .filter(|&(_, (t, _))| (t > &expired_time_ns)) - .map(|(v, (_, n))| (v.clone(), n.as_ref())) - .collect() - }; - let (builder, next_builders) = builders - .split_first_mut() - .expect("builders should not be empty"); - // iterate over the resulting set of values and next nodes (if this is a branch), and add - // the values to the arrow builders: - for (value, node) in values_and_nodes { - if let Some(node) = node { - let count = - node.evaluate_predicates(expired_time_ns, next_predicates, next_builders); - if count > 0 { - // we are not on a terminal node in the cache, so create a block, as this value - // repeated `count` times, i.e., depending on how many values come out of - // subsequent nodes: - let block = builder.append_block(value.0.as_bytes().into()); - for _ in 0..count { - builder - .try_append_view(block, 0u32, value.0.as_bytes().len() as u32) - .expect("append view for known valid block, offset and length"); - } - total_count += count; - } - } else { - builder.append_value(value.0); - total_count += 1; - } - } - total_count - } - - /// Evaluate a predicate against a [`Node`], producing a list of [`Value`]s and, if this is a - /// branch node in the cache tree, a reference to the next [`Node`]. - fn evaluate_predicate( - &self, - expired_time_ns: i64, - predicate: &Predicate, - ) -> Vec<(Value, Option<&Node>)> { - match &predicate.kind { - PredicateKind::Eq(rhs) => self - .0 - .get_key_value(rhs) - .and_then(|(v, (t, n))| { - (t > &expired_time_ns).then(|| vec![(v.clone(), n.as_ref())]) - }) - .unwrap_or_default(), - PredicateKind::NotEq(rhs) => self - .0 - .iter() - .filter(|(v, (t, _))| t > &expired_time_ns && *v != rhs) - .map(|(v, (_, n))| (v.clone(), n.as_ref())) - .collect(), - PredicateKind::In(in_list) => in_list - .iter() - .filter_map(|v| { - self.0.get_key_value(v).and_then(|(v, (t, n))| { - (t > &expired_time_ns).then(|| (v.clone(), n.as_ref())) - }) - }) - .collect(), - PredicateKind::NotIn(not_in_set) => self - .0 - .iter() - .filter(|(v, (t, _))| t > &expired_time_ns && !not_in_set.contains(v)) - .map(|(v, (_, n))| (v.clone(), n.as_ref())) - .collect(), - } - } -} - -/// A cache value, which for now, only holds strings -#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Hash)] -struct Value(Arc); - -impl From<&FieldData> for Value { - fn from(field: &FieldData) -> Self { - match field { - FieldData::Key(s) => Self(Arc::from(s.as_str())), - FieldData::Tag(s) => Self(Arc::from(s.as_str())), - FieldData::String(s) => Self(Arc::from(s.as_str())), - FieldData::Timestamp(_) - | FieldData::Integer(_) - | FieldData::UInteger(_) - | FieldData::Float(_) - | FieldData::Boolean(_) => panic!("unexpected field type for metadata cache"), - } - } -} - -/// A predicate that can be applied when gathering [`RecordBatch`]es from a [`MetaCache`] -/// -/// This is intended to be derived from a set of filter expressions in Datafusion -#[derive(Debug)] -pub struct Predicate { - column_id: ColumnId, - kind: PredicateKind, -} - -#[cfg(test)] -impl Predicate { - fn new_eq(column_id: ColumnId, rhs: impl Into>) -> Self { - Self { - column_id, - kind: PredicateKind::Eq(Value(rhs.into())), - } - } - - fn new_not_eq(column_id: ColumnId, rhs: impl Into>) -> Self { - Self { - column_id, - kind: PredicateKind::NotEq(Value(rhs.into())), - } - } - - fn new_in(column_id: ColumnId, in_vals: impl IntoIterator>>) -> Self { - Self { - column_id, - kind: PredicateKind::In(in_vals.into_iter().map(Into::into).map(Value).collect()), - } - } - - fn new_not_in(column_id: ColumnId, in_vals: impl IntoIterator>>) -> Self { - Self { - column_id, - kind: PredicateKind::NotIn(in_vals.into_iter().map(Into::into).map(Value).collect()), - } - } -} - -// TODO: remove this when fully implemented, for now just suppressing compiler warnings -#[allow(dead_code)] -#[derive(Debug)] -enum PredicateKind { - Eq(Value), - NotEq(Value), - In(Vec), - NotIn(HashSet), -} +mod cache; +mod provider; +pub use provider::MetaCacheProvider; +mod table_function; +pub use table_function::MetaCacheFunction; +pub use table_function::META_CACHE_UDTF_NAME; #[cfg(test)] mod tests { - use std::{sync::Arc, time::Duration}; - - use arrow_util::assert_batches_sorted_eq; + use arrow::array::AsArray; + use datafusion::{assert_batches_eq, assert_batches_sorted_eq, prelude::SessionContext}; + use indexmap::IndexMap; use influxdb3_catalog::catalog::{Catalog, DatabaseSchema}; use influxdb3_id::ColumnId; - use influxdb3_wal::Row; + use influxdb3_wal::{Gen1Duration, Row, WriteBatch}; use influxdb3_write::write_buffer::validator::WriteValidator; use iox_time::{MockProvider, Time, TimeProvider}; + use std::{sync::Arc, time::Duration}; - use crate::meta_cache::Predicate; - - use super::{CreateMetaCacheArgs, MaxAge, MaxCardinality, MetaCache}; + use crate::meta_cache::{ + cache::{CreateMetaCacheArgs, MaxAge, MaxCardinality, MetaCache, Predicate}, + MetaCacheFunction, MetaCacheProvider, META_CACHE_UDTF_NAME, + }; struct TestWriter { catalog: Arc, @@ -501,7 +36,8 @@ mod tests { catalog: Arc::new(Catalog::new("test-host".into(), "test-instance".into())), } } - fn write_lp(&self, lp: impl AsRef, time_ns: i64) -> Vec { + + fn write_lp_to_rows(&self, lp: impl AsRef, time_ns: i64) -> Vec { let lines_parsed = WriteValidator::initialize( Self::DB_NAME.try_into().unwrap(), Arc::clone(&self.catalog), @@ -519,6 +55,28 @@ mod tests { lines_parsed.into_inner().to_rows() } + fn write_lp_to_write_batch(&self, lp: impl AsRef, time_ns: i64) -> WriteBatch { + WriteValidator::initialize( + Self::DB_NAME.try_into().unwrap(), + Arc::clone(&self.catalog), + time_ns, + ) + .expect("initialize write validator") + .v1_parse_lines_and_update_schema( + lp.as_ref(), + false, + Time::from_timestamp_nanos(time_ns), + influxdb3_write::Precision::Nanosecond, + ) + .expect("parse and validate v1 line protocol") + .convert_lines_to_buffer(Gen1Duration::new_1m()) + .into() + } + + fn catalog(&self) -> Arc { + Arc::clone(&self.catalog) + } + fn db_schema(&self) -> Arc { self.catalog .db_schema(Self::DB_NAME) @@ -531,7 +89,7 @@ mod tests { let writer = TestWriter::new(); let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0))); // write some data to get a set of rows destined for the WAL, and an updated catalog: - let rows = writer.write_lp( + let rows = writer.write_lp_to_rows( "\ cpu,region=us-east,host=a usage=100\n\ cpu,region=us-east,host=b usage=100\n\ @@ -558,13 +116,15 @@ mod tests { let region_col_id = column_ids[0]; let host_col_id = column_ids[1]; // create the cache: - let mut cache = MetaCache::new(CreateMetaCacheArgs { + let mut cache = MetaCache::new( time_provider, - table_def, - max_cardinality: MaxCardinality::default(), - max_age: MaxAge::default(), - column_ids, - }) + CreateMetaCacheArgs { + table_def, + max_cardinality: MaxCardinality::default(), + max_age: MaxAge::default(), + column_ids, + }, + ) .expect("create cache"); // push the row data into the cache: for row in rows { @@ -574,14 +134,20 @@ mod tests { // run a series of test cases with varying predicates: struct TestCase<'a> { desc: &'a str, - predicates: &'a [Predicate], + predicates: IndexMap, expected: &'a [&'a str], } + fn create_predicate_map( + predicates: &[(ColumnId, Predicate)], + ) -> IndexMap { + predicates.iter().cloned().collect() + } + let test_cases = [ TestCase { desc: "no predicates", - predicates: &[], + predicates: create_predicate_map(&[]), expected: &[ "+---------+------+", "| region | host |", @@ -601,88 +167,12 @@ mod tests { "+---------+------+", ], }, - TestCase { - desc: "eq predicate on region", - predicates: &[Predicate::new_eq(region_col_id, "us-east")], - expected: &[ - "+---------+------+", - "| region | host |", - "+---------+------+", - "| us-east | a |", - "| us-east | b |", - "+---------+------+", - ], - }, - TestCase { - desc: "eq predicate on host", - predicates: &[Predicate::new_eq(host_col_id, "h")], - expected: &[ - "+---------+------+", - "| region | host |", - "+---------+------+", - "| ca-cent | h |", - "+---------+------+", - ], - }, - TestCase { - desc: "eq predicate on region and host", - predicates: &[ - Predicate::new_eq(region_col_id, "ca-cent"), - Predicate::new_eq(host_col_id, "h"), - ], - expected: &[ - "+---------+------+", - "| region | host |", - "+---------+------+", - "| ca-cent | h |", - "+---------+------+", - ], - }, - TestCase { - desc: "not eq predicate on region", - predicates: &[Predicate::new_not_eq(region_col_id, "ca-cent")], - expected: &[ - "+---------+------+", - "| region | host |", - "+---------+------+", - "| ca-east | e |", - "| ca-east | f |", - "| eu-cent | k |", - "| eu-cent | l |", - "| eu-east | i |", - "| eu-east | j |", - "| us-east | a |", - "| us-east | b |", - "| us-west | c |", - "| us-west | d |", - "+---------+------+", - ], - }, - TestCase { - desc: "not eq predicate on region and host", - predicates: &[ - Predicate::new_not_eq(region_col_id, "ca-cent"), - Predicate::new_not_eq(host_col_id, "a"), - ], - expected: &[ - "+---------+------+", - "| region | host |", - "+---------+------+", - "| ca-east | e |", - "| ca-east | f |", - "| eu-cent | k |", - "| eu-cent | l |", - "| eu-east | i |", - "| eu-east | j |", - "| us-east | b |", - "| us-west | c |", - "| us-west | d |", - "+---------+------+", - ], - }, TestCase { desc: "in predicate on region", - predicates: &[Predicate::new_in(region_col_id, ["ca-cent", "ca-east"])], + predicates: create_predicate_map(&[( + region_col_id, + Predicate::new_in(["ca-cent", "ca-east"]), + )]), expected: &[ "+---------+------+", "| region | host |", @@ -696,10 +186,10 @@ mod tests { }, TestCase { desc: "in predicate on region and host", - predicates: &[ - Predicate::new_in(region_col_id, ["ca-cent", "ca-east"]), - Predicate::new_in(host_col_id, ["g", "e"]), - ], + predicates: create_predicate_map(&[ + (region_col_id, Predicate::new_in(["ca-cent", "ca-east"])), + (host_col_id, Predicate::new_in(["g", "e"])), + ]), expected: &[ "+---------+------+", "| region | host |", @@ -711,7 +201,10 @@ mod tests { }, TestCase { desc: "not in predicate on region", - predicates: &[Predicate::new_not_in(region_col_id, ["ca-cent", "ca-east"])], + predicates: create_predicate_map(&[( + region_col_id, + Predicate::new_not_in(["ca-cent", "ca-east"]), + )]), expected: &[ "+---------+------+", "| region | host |", @@ -729,10 +222,10 @@ mod tests { }, TestCase { desc: "not in predicate on region and host", - predicates: &[ - Predicate::new_not_in(region_col_id, ["ca-cent", "ca-east"]), - Predicate::new_not_in(host_col_id, ["j", "k"]), - ], + predicates: create_predicate_map(&[ + (region_col_id, Predicate::new_not_in(["ca-cent", "ca-east"])), + (host_col_id, Predicate::new_not_in(["j", "k"])), + ]), expected: &[ "+---------+------+", "| region | host |", @@ -750,7 +243,7 @@ mod tests { for tc in test_cases { let records = cache - .to_record_batch(tc.predicates) + .to_record_batch(&tc.predicates) .expect("get record batches"); println!("{}", tc.desc); assert_batches_sorted_eq!(tc.expected, &[records]); @@ -762,7 +255,7 @@ mod tests { let writer = TestWriter::new(); let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0))); // write some data to update the catalog: - let _ = writer.write_lp( + let _ = writer.write_lp_to_rows( "\ cpu,region=us-east,host=a usage=100\n\ ", @@ -776,18 +269,20 @@ mod tests { .map(|name| table_def.column_name_to_id_unchecked(name)) .collect(); // create a cache with some cardinality and age limits: - let mut cache = MetaCache::new(CreateMetaCacheArgs { - time_provider: Arc::clone(&time_provider) as _, - table_def, - max_cardinality: MaxCardinality::try_new(10).unwrap(), - max_age: MaxAge::from(Duration::from_nanos(100)), - column_ids, - }) + let mut cache = MetaCache::new( + Arc::clone(&time_provider) as _, + CreateMetaCacheArgs { + table_def, + max_cardinality: MaxCardinality::try_from(10).unwrap(), + max_age: MaxAge::from(Duration::from_nanos(100)), + column_ids, + }, + ) .expect("create cache"); // push a bunch of rows with incrementing times and varying tag values: (0..10).for_each(|mult| { time_provider.set(Time::from_timestamp_nanos(mult * 20)); - let rows = writer.write_lp( + let rows = writer.write_lp_to_rows( format!( "\ cpu,region=us-east-{mult},host=a-{mult} usage=100\n\ @@ -805,7 +300,7 @@ mod tests { // check the cache before prune: // NOTE: this does not include entries that have surpassed the max_age of the cache, though, // there are still more than the cache's max cardinality, as it has not yet been pruned. - let records = cache.to_record_batch(&[]).unwrap(); + let records = cache.to_record_batch(&Default::default()).unwrap(); assert_batches_sorted_eq!( [ "+-----------+------+", @@ -831,7 +326,7 @@ mod tests { &[records] ); cache.prune(); - let records = cache.to_record_batch(&[]).unwrap(); + let records = cache.to_record_batch(&Default::default()).unwrap(); assert_batches_sorted_eq!( [ "+-----------+------+", @@ -851,4 +346,448 @@ mod tests { &[records] ); } + + /// This test sets up a [`MetaCacheProvider`], creates a [`MetaCache`] using the `region` and + /// `host` column, and then writes several different unique combinations of values into it. + /// It then sets up a DataFusion [`SessionContext`], registers our [`MetaCacheFunction`] as a + /// UDTF, and then runs a series of test cases to verify queries against the function. + /// + /// The purpose of this is to see that the cache works as intended, and importantly, that the + /// predicate pushdown is happening and being leveraged by the underlying [`MetaCache`], vs. + /// DataFusion doing it for us with a higher level FilterExec. + /// + /// Each test case verifies the `RecordBatch` output of the query as well as the output for + /// EXPLAIN on the same query. The EXPLAIN output contains a line for the MetaCacheExec, which + /// is the custom execution plan impl for the metadata cache that captures the predicates that + /// are pushed down to the underlying [`MetaCacahe::to_record_batch`] method, if any. + #[tokio::test] + async fn test_datafusion_meta_cache_udtf() { + // create a test writer and do a write in to populate the catalog with a db/table: + let writer = TestWriter::new(); + let _ = writer.write_lp_to_write_batch( + "\ + cpu,region=us-east,host=a usage=100\n\ + ", + 0, + ); + + // create a meta provider and a cache on tag columns 'region' and 'host': + let db_schema = writer.db_schema(); + let table_def = db_schema.table_definition("cpu").unwrap(); + let column_ids: Vec = ["region", "host"] + .into_iter() + .map(|name| table_def.column_name_to_id_unchecked(name)) + .collect(); + let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0))); + let meta_provider = + MetaCacheProvider::new_from_catalog(time_provider, writer.catalog()).unwrap(); + meta_provider + .create_cache( + db_schema.id, + None, + CreateMetaCacheArgs { + table_def, + max_cardinality: MaxCardinality::default(), + max_age: MaxAge::default(), + column_ids, + }, + ) + .unwrap(); + + // do some writes to generate a write batch and send it into the cache: + let write_batch = writer.write_lp_to_write_batch( + "\ + cpu,region=us-east,host=a usage=100\n\ + cpu,region=us-east,host=b usage=100\n\ + cpu,region=us-west,host=c usage=100\n\ + cpu,region=us-west,host=d usage=100\n\ + cpu,region=ca-east,host=e usage=100\n\ + cpu,region=ca-cent,host=f usage=100\n\ + cpu,region=ca-west,host=g usage=100\n\ + cpu,region=ca-west,host=h usage=100\n\ + cpu,region=eu-cent,host=i usage=100\n\ + cpu,region=eu-cent,host=j usage=100\n\ + cpu,region=eu-west,host=k usage=100\n\ + cpu,region=eu-west,host=l usage=100\n\ + ", + 0, + ); + let wal_contents = influxdb3_wal::create::wal_contents( + (0, 1, 0), + [influxdb3_wal::create::write_batch_op(write_batch)], + ); + meta_provider.write_wal_contents_to_cache(&wal_contents); + + // Spin up a DataFusion session context and add the meta_cache function to it so we + // can query for data in the cache we created and populated above: + let ctx = SessionContext::new(); + let meta_func = MetaCacheFunction::new(db_schema.id, Arc::clone(&meta_provider)); + ctx.register_udtf(META_CACHE_UDTF_NAME, Arc::new(meta_func)); + + struct TestCase<'a> { + /// A short description of the test + _desc: &'a str, + /// A SQL expression to evaluate using the datafusion session context, should be of + /// the form: + /// ```sql + /// SELECT * FROM meta_cache('cpu') ... + /// ``` + sql: &'a str, + /// Expected record batch output + expected: &'a [&'a str], + /// Expected EXPLAIN output contains this. + /// + /// For checking the `MetaCacheExec` portion of the EXPLAIN output for the given `sql` + /// query. A "contains" is used instead of matching the whole EXPLAIN output to prevent + /// flakyness from upstream changes to other parts of the query plan. + explain_contains: &'a str, + /// Use `assert_batches_sorted_eq!` to check the outputted record batches instead of + /// `assert_batches_eq!`. + /// + /// # Note + /// + /// The cache should produce results in a sorted order as-is, however, some queries + /// that process the results after they are emitted from the cache may have their order + /// altered by DataFusion, e.g., `SELECT DISTINCT(column_name) FROM meta_cache('table')` + /// or queries that project columns that are not at the top level of the cache. + use_sorted_assert: bool, + } + + let test_cases = [ + TestCase { + _desc: "no predicates", + sql: "SELECT * FROM meta_cache('cpu')", + expected: &[ + "+---------+------+", + "| region | host |", + "+---------+------+", + "| ca-cent | f |", + "| ca-east | e |", + "| ca-west | g |", + "| ca-west | h |", + "| eu-cent | i |", + "| eu-cent | j |", + "| eu-west | k |", + "| eu-west | l |", + "| us-east | a |", + "| us-east | b |", + "| us-west | c |", + "| us-west | d |", + "+---------+------+", + ], + explain_contains: "MetaCacheExec: inner=MemoryExec: partitions=1, partition_sizes=[1]", + use_sorted_assert: false, + }, + TestCase { + _desc: "eq predicate on region", + sql: "SELECT * FROM meta_cache('cpu') WHERE region = 'us-east'", + expected: &[ + "+---------+------+", + "| region | host |", + "+---------+------+", + "| us-east | a |", + "| us-east | b |", + "+---------+------+", + ], + explain_contains: "MetaCacheExec: predicates=[[0 IN (us-east)]] inner=MemoryExec: partitions=1, partition_sizes=[1]", + use_sorted_assert: false, + }, + TestCase { + _desc: "eq predicate on region and host", + sql: "SELECT * FROM meta_cache('cpu') \ + WHERE region = 'us-east' AND host = 'a'", + expected: &[ + "+---------+------+", + "| region | host |", + "+---------+------+", + "| us-east | a |", + "+---------+------+", + ], + explain_contains: "MetaCacheExec: predicates=[[0 IN (us-east)], [1 IN (a)]] inner=MemoryExec: partitions=1, partition_sizes=[1]", + use_sorted_assert: false, + }, + TestCase { + _desc: "eq predicate on region; in predicate on host", + sql: "SELECT * FROM meta_cache('cpu') \ + WHERE region = 'us-east' AND host IN ('a', 'b')", + expected: &[ + "+---------+------+", + "| region | host |", + "+---------+------+", + "| us-east | a |", + "| us-east | b |", + "+---------+------+", + ], + explain_contains: "MetaCacheExec: predicates=[[0 IN (us-east)], [1 IN (a,b)]] inner=MemoryExec: partitions=1, partition_sizes=[1]", + use_sorted_assert: false, + }, + TestCase { + _desc: "eq predicate on region; not in predicate on host", + sql: "SELECT * FROM meta_cache('cpu') \ + WHERE region = 'us-east' AND host != 'a'", + expected: &[ + "+---------+------+", + "| region | host |", + "+---------+------+", + "| us-east | b |", + "+---------+------+", + ], + explain_contains: "MetaCacheExec: predicates=[[0 IN (us-east)], [1 NOT IN (a)]] inner=MemoryExec: partitions=1, partition_sizes=[1]", + use_sorted_assert: false, + }, + TestCase { + _desc: "in predicate on region", + sql: "SELECT * FROM meta_cache('cpu') \ + WHERE region IN ('ca-cent', 'ca-east', 'us-east', 'us-west')", + expected: &[ + "+---------+------+", + "| region | host |", + "+---------+------+", + "| ca-cent | f |", + "| ca-east | e |", + "| us-east | a |", + "| us-east | b |", + "| us-west | c |", + "| us-west | d |", + "+---------+------+", + ], + explain_contains: "MetaCacheExec: predicates=[[0 IN (ca-cent,ca-east,us-east,us-west)]] inner=MemoryExec: partitions=1, partition_sizes=[1]", + use_sorted_assert: false, + }, + TestCase { + _desc: "not in predicate on region", + sql: "SELECT * FROM meta_cache('cpu') \ + WHERE region NOT IN ('ca-cent', 'ca-east', 'us-east', 'us-west')", + expected: &[ + "+---------+------+", + "| region | host |", + "+---------+------+", + "| ca-west | g |", + "| ca-west | h |", + "| eu-cent | i |", + "| eu-cent | j |", + "| eu-west | k |", + "| eu-west | l |", + "+---------+------+", + ], + explain_contains: "MetaCacheExec: predicates=[[0 NOT IN (ca-cent,ca-east,us-east,us-west)]] inner=MemoryExec: partitions=1, partition_sizes=[1]", + use_sorted_assert: false, + }, + TestCase { + _desc: "or eq predicates on region", + sql: "SELECT * FROM meta_cache('cpu') \ + WHERE region = 'us-east' OR region = 'ca-east'", + expected: &[ + "+---------+------+", + "| region | host |", + "+---------+------+", + "| ca-east | e |", + "| us-east | a |", + "| us-east | b |", + "+---------+------+", + ], + explain_contains: "MetaCacheExec: predicates=[[0 IN (ca-east,us-east)]] inner=MemoryExec: partitions=1, partition_sizes=[1]", + use_sorted_assert: false, + }, + TestCase { + _desc: "or eq predicate on host", + sql: "SELECT * FROM meta_cache('cpu') \ + WHERE host = 'd' OR host = 'e'", + expected: &[ + "+---------+------+", + "| region | host |", + "+---------+------+", + "| ca-east | e |", + "| us-west | d |", + "+---------+------+", + ], + explain_contains: "MetaCacheExec: predicates=[[1 IN (d,e)]] inner=MemoryExec: partitions=1, partition_sizes=[1]", + use_sorted_assert: false, + }, + TestCase { + _desc: "un-grouped host conditions are not handled in predicate pushdown", + sql: "SELECT * FROM meta_cache('cpu') \ + WHERE region = 'us-east' AND host = 'a' OR host = 'b'", + expected: &[ + "+---------+------+", + "| region | host |", + "+---------+------+", + "| us-east | a |", + "| us-east | b |", + "+---------+------+", + ], + explain_contains: "MetaCacheExec: inner=MemoryExec: partitions=1, partition_sizes=[1]", + use_sorted_assert: false, + }, + TestCase { + _desc: "grouped host conditions are handled in predicate pushdown", + sql: "SELECT * FROM meta_cache('cpu') \ + WHERE region = 'us-east' AND (host = 'a' OR host = 'b')", + expected: &[ + "+---------+------+", + "| region | host |", + "+---------+------+", + "| us-east | a |", + "| us-east | b |", + "+---------+------+", + ], + explain_contains: "MetaCacheExec: predicates=[[0 IN (us-east)], [1 IN (a,b)]] inner=MemoryExec: partitions=1, partition_sizes=[1]", + use_sorted_assert: false, + }, + TestCase { + _desc: "project region column", + sql: "SELECT region FROM meta_cache('cpu')", + expected: &[ + "+---------+", + "| region |", + "+---------+", + "| ca-cent |", + "| ca-east |", + "| ca-west |", + "| ca-west |", + "| eu-cent |", + "| eu-cent |", + "| eu-west |", + "| eu-west |", + "| us-east |", + "| us-east |", + "| us-west |", + "| us-west |", + "+---------+", + ], + explain_contains: "MetaCacheExec: inner=MemoryExec: partitions=1, partition_sizes=[1]", + use_sorted_assert: false, + }, + TestCase { + _desc: "project region column taking distinct", + sql: "SELECT DISTINCT(region) FROM meta_cache('cpu')", + expected: &[ + "+---------+", + "| region |", + "+---------+", + "| ca-cent |", + "| ca-east |", + "| ca-west |", + "| eu-cent |", + "| eu-west |", + "| us-east |", + "| us-west |", + "+---------+", + ], + explain_contains: "MetaCacheExec: inner=MemoryExec: partitions=1, partition_sizes=[1]", + // it seems that DISTINCT changes around the order of results + use_sorted_assert: true, + }, + TestCase { + _desc: "project host column", + sql: "SELECT host FROM meta_cache('cpu')", + expected: &[ + "+------+", // commenting for no new line + "| host |", // commenting for no new line + "+------+", // commenting for no new line + "| a |", // commenting for no new line + "| b |", // commenting for no new line + "| c |", // commenting for no new line + "| d |", // commenting for no new line + "| e |", // commenting for no new line + "| f |", // commenting for no new line + "| g |", // commenting for no new line + "| h |", // commenting for no new line + "| i |", // commenting for no new line + "| j |", // commenting for no new line + "| k |", // commenting for no new line + "| l |", // commenting for no new line + "+------+", // commenting for no new line + ], + explain_contains: "MetaCacheExec: inner=MemoryExec: partitions=1, partition_sizes=[1]", + // this column will not be sorted since the order of elements depends on the next level + // up in the cache, so the `region` column is iterated over in order, but the nested + // `host` values, although sorted within `region`s, will not be globally sorted. + use_sorted_assert: true, + }, + TestCase { + _desc: "limit clause", + sql: "SELECT * FROM meta_cache('cpu') LIMIT 8", + expected: &[ + "+---------+------+", + "| region | host |", + "+---------+------+", + "| ca-cent | f |", + "| ca-east | e |", + "| ca-west | g |", + "| ca-west | h |", + "| eu-cent | i |", + "| eu-cent | j |", + "| eu-west | k |", + "| eu-west | l |", + "+---------+------+", + ], + explain_contains: "MetaCacheExec: inner=MemoryExec: partitions=1, partition_sizes=[1]", + use_sorted_assert: false, + }, + TestCase { + _desc: "limit and offset", + sql: "SELECT * FROM meta_cache('cpu') LIMIT 8 OFFSET 8", + expected: &[ + "+---------+------+", + "| region | host |", + "+---------+------+", + "| us-east | a |", + "| us-east | b |", + "| us-west | c |", + "| us-west | d |", + "+---------+------+", + ], + explain_contains: "MetaCacheExec: inner=MemoryExec: partitions=1, partition_sizes=[1]", + use_sorted_assert: false, + }, + TestCase { + _desc: "like clause", + sql: "SELECT * FROM meta_cache('cpu') \ + WHERE region LIKE 'u%'", + expected: &[ + "+---------+------+", + "| region | host |", + "+---------+------+", + "| us-east | a |", + "| us-east | b |", + "| us-west | c |", + "| us-west | d |", + "+---------+------+", + ], + explain_contains: "MetaCacheExec: inner=MemoryExec: partitions=1, partition_sizes=[1]", + use_sorted_assert: false, + }, + ]; + + for tc in test_cases { + let results = ctx.sql(tc.sql).await.unwrap().collect().await.unwrap(); + // Uncommenting may make figuring out which test case is failing easier: + println!("test case: {}", tc._desc); + if tc.use_sorted_assert { + assert_batches_sorted_eq!(tc.expected, &results); + } else { + assert_batches_eq!(tc.expected, &results); + } + let explain = ctx + .sql(format!("EXPLAIN {sql}", sql = tc.sql).as_str()) + .await + .unwrap() + .collect() + .await + .unwrap() + .pop() + .unwrap(); + + // NOTE(hiltontj): this probably can be done a better way? + // The EXPLAIN output will have two columns, the one we are interested in that contains + // the details of the MetaCacheExec is called `plan`... + assert!(explain + .column_by_name("plan") + .unwrap() + .as_string::() + .iter() + .any(|plan| plan.is_some_and(|plan| plan.contains(tc.explain_contains))),); + } + } } diff --git a/influxdb3_cache/src/meta_cache/provider.rs b/influxdb3_cache/src/meta_cache/provider.rs new file mode 100644 index 00000000000..99269fbaf2b --- /dev/null +++ b/influxdb3_cache/src/meta_cache/provider.rs @@ -0,0 +1,310 @@ +use std::{collections::HashMap, sync::Arc, time::Duration}; + +use anyhow::Context; +use arrow::datatypes::SchemaRef; +use influxdb3_catalog::catalog::{Catalog, TableDefinition}; +use influxdb3_id::{DbId, TableId}; +use influxdb3_wal::{MetaCacheDefinition, WalContents, WalOp}; +use iox_time::TimeProvider; +use parking_lot::RwLock; + +use crate::meta_cache::cache::{MaxAge, MaxCardinality}; + +use super::cache::{CreateMetaCacheArgs, MetaCache}; + +#[derive(Debug, thiserror::Error)] +#[error("metadata cache provider error: {0:#}")] +pub struct Error(#[from] anyhow::Error); + +/// Triple nested map for storing a multiple metadata caches per table. +/// +/// That is, the map nesting is `database -> table -> cache name` +type CacheMap = RwLock, MetaCache>>>>; + +/// Provides the metadata caches for the running instance of InfluxDB +#[derive(Debug)] +pub struct MetaCacheProvider { + pub(crate) time_provider: Arc, + pub(crate) catalog: Arc, + pub(crate) cache_map: CacheMap, +} + +impl MetaCacheProvider { + /// Initialize a [`MetaCacheProvider`] from a [`Catalog`], populating the provider's + /// `cache_map` from the definitions in the catalog. + pub fn new_from_catalog( + time_provider: Arc, + catalog: Arc, + ) -> Result, Error> { + let provider = Arc::new(MetaCacheProvider { + time_provider, + catalog: Arc::clone(&catalog), + cache_map: Default::default(), + }); + for db_schema in catalog.list_db_schema() { + for table_def in db_schema.tables() { + for (cache_name, cache_def) in table_def.meta_caches() { + assert!( + provider + .create_cache( + db_schema.id, + Some(cache_name), + CreateMetaCacheArgs { + table_def: Arc::clone(&table_def), + max_cardinality: MaxCardinality::try_from( + cache_def.max_cardinality + )?, + max_age: MaxAge::from(Duration::from_secs( + cache_def.max_age_seconds + )), + column_ids: cache_def.column_ids.to_vec() + } + )? + .is_some(), + "there should not be duplicated cache definitions in the catalog" + ) + } + } + } + Ok(provider) + } + + /// Initialize a [`MetaCacheProvider`] from a [`Catalog`], populating the provider's + /// `cache_map` from the definitions in the catalog. This starts a background process that + /// runs on the provided `eviction_interval` to perform eviction on all of the caches + /// in the created [`MetaCacheProvider`]'s `cache_map`. + pub fn new_from_catalog_with_background_eviction( + time_provider: Arc, + catalog: Arc, + eviction_interval: Duration, + ) -> Result, Error> { + let provider = Self::new_from_catalog(time_provider, catalog)?; + + background_eviction_process(Arc::clone(&provider), eviction_interval); + + Ok(provider) + } + + /// Get a particular cache's name and arrow schema + /// + /// This is used for the implementation of DataFusion's `TableFunctionImpl` and + /// `TableProvider` traits as a convenience method for the scenario where there is only a + /// single cache on a table, and therefore one does not need to specify the cache name + /// in addition to the db/table identifiers. + pub(crate) fn get_cache_name_and_schema( + &self, + db_id: DbId, + table_id: TableId, + cache_name: Option<&str>, + ) -> Option<(Arc, SchemaRef)> { + self.cache_map + .read() + .get(&db_id) + .and_then(|db| db.get(&table_id)) + .and_then(|table| { + if let Some(cache_name) = cache_name { + table + .get_key_value(cache_name) + .map(|(name, mc)| (Arc::clone(name), mc.arrow_schema())) + } else if table.len() == 1 { + table + .iter() + .map(|(name, mc)| (Arc::clone(name), mc.arrow_schema())) + .next() + } else { + None + } + }) + } + + /// Create a new entry in the metadata cache for a given database and parameters. + /// + /// If a new cache is created, this will return the [`MetaCacheDefinition`] for the created + /// cache; otherwise, if the provided arguments are identical to an existing cache, along with + /// any defaults, then `None` will be returned. It is an error to attempt to create a cache that + /// overwite an existing one with different parameters. + /// + /// The cache name is optional; if not provided, it will be of the form: + /// ```text + /// __meta_cache + /// ``` + /// Where `` is an `_`-separated list of the column names used in the cache. + pub fn create_cache( + &self, + db_id: DbId, + cache_name: Option>, + CreateMetaCacheArgs { + table_def, + max_cardinality, + max_age, + column_ids, + }: CreateMetaCacheArgs, + ) -> Result, Error> { + let cache_name = if let Some(cache_name) = cache_name { + cache_name + } else { + format!( + "{table_name}_{cols}_meta_cache", + table_name = table_def.table_name, + cols = column_ids + .iter() + .map( + |id| table_def.column_id_to_name(id).with_context(|| format!( + "invalid column id ({id}) encountered in cache creation arguments" + )) + ) + .collect::, anyhow::Error>>()? + .join("_") + ) + .into() + }; + + let new_cache = MetaCache::new( + Arc::clone(&self.time_provider), + CreateMetaCacheArgs { + table_def: Arc::clone(&table_def), + max_cardinality, + max_age, + column_ids: column_ids.clone(), + }, + )?; + + let mut lock = self.cache_map.write(); + if let Some(cache) = lock + .get(&db_id) + .and_then(|db| db.get(&table_def.table_id)) + .and_then(|table| table.get(&cache_name)) + { + return cache + .compare_config(&new_cache) + .map(|_| None) + .map_err(Into::into); + } + + lock.entry(db_id) + .or_default() + .entry(table_def.table_id) + .or_default() + .insert(Arc::clone(&cache_name), new_cache); + + Ok(Some(MetaCacheDefinition { + table_id: table_def.table_id, + table_name: Arc::clone(&table_def.table_name), + cache_name, + column_ids, + max_cardinality: max_cardinality.into(), + max_age_seconds: max_age.as_seconds(), + })) + } + + /// Create a new cache given the database schema and WAL definition. This is useful during WAL + /// replay. + pub fn create_from_definition( + &self, + db_id: DbId, + table_def: Arc, + definition: &MetaCacheDefinition, + ) { + let meta_cache = MetaCache::new( + Arc::clone(&self.time_provider), + CreateMetaCacheArgs { + table_def, + max_cardinality: definition + .max_cardinality + .try_into() + .expect("definition should have a valid max_cardinality"), + max_age: MaxAge::from(Duration::from_secs(definition.max_age_seconds)), + column_ids: definition.column_ids.to_vec(), + }, + ) + .expect("definition should be valid coming from the WAL"); + self.cache_map + .write() + .entry(db_id) + .or_default() + .entry(definition.table_id) + .or_default() + .insert(Arc::clone(&definition.cache_name), meta_cache); + } + + /// Delete a cache from the provider + /// + /// This also cleans up the provider hierarchy, so if the delete leaves a branch for a given + /// table or its parent database empty, this will remove that branch. + pub fn delete_cache( + &self, + db_id: DbId, + table_id: TableId, + cache_name: &str, + ) -> Result<(), Error> { + let mut lock = self.cache_map.write(); + let db = lock.get_mut(&db_id).context("database does not exist")?; + let table = db.get_mut(&table_id).context("table does not exist")?; + table.remove(cache_name).context("cache does not exist")?; + if table.is_empty() { + db.remove(&table_id); + } + if db.is_empty() { + lock.remove(&db_id); + } + Ok(()) + } + + /// Write the contents of a WAL file to the cache by iterating over its database and table + /// batches to find entries that belong in the cache. + pub fn write_wal_contents_to_cache(&self, wal_contents: &WalContents) { + let mut lock = self.cache_map.write(); + for op in &wal_contents.ops { + let WalOp::Write(write_batch) = op else { + continue; + }; + let Some(db_caches) = lock.get_mut(&write_batch.database_id) else { + continue; + }; + if db_caches.is_empty() { + continue; + } + for (table_id, table_chunks) in &write_batch.table_chunks { + let Some(table_caches) = db_caches.get_mut(table_id) else { + continue; + }; + if table_caches.is_empty() { + continue; + } + for (_, cache) in table_caches.iter_mut() { + for chunk in table_chunks.chunk_time_to_chunk.values() { + for row in &chunk.rows { + cache.push(row); + } + } + } + } + } + } + + /// Run eviction across all caches in the provider. + pub fn evict_cache_entries(&self) { + let mut lock = self.cache_map.write(); + lock.iter_mut().for_each(|(_, db_caches)| { + db_caches.iter_mut().for_each(|(_, table_caches)| { + table_caches.iter_mut().for_each(|(_, cache)| cache.prune()) + }) + }); + } +} + +fn background_eviction_process( + provider: Arc, + eviction_interval: Duration, +) -> tokio::task::JoinHandle<()> { + tokio::spawn(async move { + let mut interval = tokio::time::interval(eviction_interval); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + loop { + interval.tick().await; + + provider.evict_cache_entries(); + } + }) +} diff --git a/influxdb3_cache/src/meta_cache/table_function.rs b/influxdb3_cache/src/meta_cache/table_function.rs new file mode 100644 index 00000000000..7150695a4ba --- /dev/null +++ b/influxdb3_cache/src/meta_cache/table_function.rs @@ -0,0 +1,359 @@ +use std::{any::Any, sync::Arc}; + +use arrow::{array::RecordBatch, datatypes::SchemaRef}; +use async_trait::async_trait; +use datafusion::{ + catalog::{Session, TableProvider}, + common::{internal_err, plan_err, DFSchema, Result}, + datasource::{function::TableFunctionImpl, TableType}, + execution::context::ExecutionProps, + logical_expr::TableProviderFilterPushDown, + physical_expr::{ + create_physical_expr, + utils::{Guarantee, LiteralGuarantee}, + }, + physical_plan::{memory::MemoryExec, DisplayAs, DisplayFormatType, ExecutionPlan}, + prelude::Expr, + scalar::ScalarValue, +}; +use indexmap::IndexMap; +use influxdb3_catalog::catalog::TableDefinition; +use influxdb3_id::{ColumnId, DbId}; + +use super::{cache::Predicate, MetaCacheProvider}; + +/// The name used to call the metadata cache in SQL queries +pub const META_CACHE_UDTF_NAME: &str = "meta_cache"; + +/// Implementor of the [`TableProvider`] trait that is produced a call to the [`MetaCacheFunction`] +struct MetaCacheFunctionProvider { + /// Reference to the [`MetaCache`][super::cache::MetaCache] being queried's schema + schema: SchemaRef, + /// Forwarded ref to the [`MetaCacheProvider`] which is used to get the + /// [`MetaCache`][super::cache::MetaCache] for the query, along with the `db_id` and + /// `table_def`. This is done instead of passing forward a reference to the `MetaCache` + /// directly because doing so is not easy or possible with the Rust borrow checker. + provider: Arc, + /// The database ID that the called cache is related to + db_id: DbId, + /// The table definition that the called cache is related to + table_def: Arc, + /// The name of the cache, which is determined when calling the `meta_cache` function + cache_name: Arc, +} + +#[async_trait] +impl TableProvider for MetaCacheFunctionProvider { + fn as_any(&self) -> &dyn Any { + self as &dyn Any + } + + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } + + fn table_type(&self) -> TableType { + TableType::Temporary + } + + fn supports_filters_pushdown( + &self, + filters: &[&Expr], + ) -> Result> { + Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()]) + } + + async fn scan( + &self, + ctx: &dyn Session, + projection: Option<&Vec>, + filters: &[Expr], + _limit: Option, + ) -> Result> { + let read = self.provider.cache_map.read(); + let (batches, predicates) = if let Some(cache) = read + .get(&self.db_id) + .and_then(|db| db.get(&self.table_def.table_id)) + .and_then(|tbl| tbl.get(&self.cache_name)) + { + let predicates = convert_filter_exprs(&self.table_def, self.schema(), filters)?; + ( + cache + .to_record_batch(&predicates) + .map(|batch| vec![batch])?, + (!predicates.is_empty()).then_some(predicates), + ) + } else { + (vec![], None) + }; + let mut exec = + MetaCacheExec::try_new(predicates, &[batches], self.schema(), projection.cloned())?; + + let show_sizes = ctx.config_options().explain.show_sizes; + exec = exec.with_show_sizes(show_sizes); + + Ok(Arc::new(exec)) + } +} + +/// Convert the given list of filter expressions to a map of [`ColumnId`] to [`Predicate`] +/// +/// The resulting map uses [`IndexMap`] to ensure consistent ordering of the map. This makes testing +/// the filter conversion significantly easier using EXPLAIN queries. +fn convert_filter_exprs( + table_def: &TableDefinition, + cache_schema: SchemaRef, + filters: &[Expr], +) -> Result> { + let mut predicate_map: IndexMap> = IndexMap::new(); + + // for create_physical_expr: + let schema: DFSchema = cache_schema.try_into()?; + let props = ExecutionProps::new(); + + // The set of `filters` that are passed in from DataFusion varies: 1) based on how they are + // defined in the query, and 2) based on some decisions that DataFusion makes when parsing the + // query into the `Expr` syntax tree. For example, the predicate: + // + // WHERE foo IN ('bar', 'baz') + // + // instead of being expressed as an `InList`, would be simplified to the following `Expr` tree: + // + // [ + // BinaryExpr { + // left: BinaryExpr { left: "foo", op: Eq, right: "bar" }, + // op: Or, + // right: BinaryExpr { left: "foo", op: Eq, right: "baz" } + // } + // ] + // + // while the predicate: + // + // WHERE foo = 'bar' OR foo = 'baz' OR foo = 'bop' OR foo = 'bla' + // + // instead of being expressed as a tree of `BinaryExpr`s, is expressed as an `InList` with four + // entries: + // + // [ + // InList { col: "foo", values: ["bar", "baz", "bop", "bla"], negated: false } + // ] + // + // Instead of handling all the combinations of `Expr`s that may be passed by the caller of + // `TableProider::scan`, we can use the cache's schema to convert each `Expr` to a `PhysicalExpr` + // and analyze it using DataFusion's `LiteralGuarantee`. + // + // This will distill the provided set of `Expr`s down to either an IN list, or a NOT IN list + // which we can convert to the `Predicate` type for the metadata cache. + // + // The main caveat is that if for some reason there are multiple `Expr`s that apply predicates + // on a given column, i.e., leading to multiple `LiteralGuarantee`s on a specific column, we + // discard those predicates and have DataFusion handle the filtering. + // + // This is a conservative approach; it may be that we can combine multiple literal guarantees on + // a single column, but thusfar, from testing in the parent module, this does not seem necessary. + + for expr in filters { + let physical_expr = create_physical_expr(expr, &schema, &props)?; + let literal_guarantees = LiteralGuarantee::analyze(&physical_expr); + for LiteralGuarantee { + column, + guarantee, + literals, + } in literal_guarantees + { + let Some(column_id) = table_def.column_name_to_id(column.name()) else { + return plan_err!( + "invalid column name in filter expression: {}", + column.name() + ); + }; + let value_iter = literals.into_iter().filter_map(|l| match l { + ScalarValue::Utf8(Some(s)) | ScalarValue::Utf8View(Some(s)) => Some(s), + _ => None, + }); + + let predicate = match guarantee { + Guarantee::In => Predicate::new_in(value_iter), + Guarantee::NotIn => Predicate::new_not_in(value_iter), + }; + predicate_map + .entry(column_id) + .and_modify(|e| { + // We do not currently support multiple literal guarantees per column. + // + // In this case we replace the predicate with None so that it does not filter + // any records from the cache downstream. Datafusion will still do filtering at + // a higher level, once _all_ records are produced from the cache. + e.take(); + }) + .or_insert_with(|| Some(predicate)); + } + } + + Ok(predicate_map + .into_iter() + .filter_map(|(column_id, predicate)| predicate.map(|predicate| (column_id, predicate))) + .collect()) +} + +/// Implementor of the [`TableFunctionImpl`] trait, to be registered as a user-defined table function +/// in the Datafusion `SessionContext`. +#[derive(Debug)] +pub struct MetaCacheFunction { + db_id: DbId, + provider: Arc, +} + +impl MetaCacheFunction { + pub fn new(db_id: DbId, provider: Arc) -> Self { + Self { db_id, provider } + } +} + +impl TableFunctionImpl for MetaCacheFunction { + fn call(&self, args: &[Expr]) -> Result> { + let Some(Expr::Literal(ScalarValue::Utf8(Some(table_name)))) = args.first() else { + return plan_err!("first argument must be the table name as a string"); + }; + let cache_name = match args.get(1) { + Some(Expr::Literal(ScalarValue::Utf8(Some(name)))) => Some(name), + Some(_) => { + return plan_err!("second argument, if passed, must be the cache name as a string") + } + None => None, + }; + + let Some(table_def) = self + .provider + .catalog + .db_schema_by_id(&self.db_id) + .and_then(|db| db.table_definition(table_name.as_str())) + else { + return plan_err!("provided table name ({}) is invalid", table_name); + }; + let Some((cache_name, schema)) = self.provider.get_cache_name_and_schema( + self.db_id, + table_def.table_id, + cache_name.map(|n| n.as_str()), + ) else { + return plan_err!("could not find meta cache for the given arguments"); + }; + Ok(Arc::new(MetaCacheFunctionProvider { + schema, + provider: Arc::clone(&self.provider), + db_id: self.db_id, + table_def, + cache_name, + })) + } +} + +/// Custom implementor of the [`ExecutionPlan`] trait for use by the metadata cache +/// +/// Wraps a [`MemoryExec`] from DataFusion, and mostly re-uses that. The special functionality +/// provided by this type is to track the predicates that are pushed down to the underlying cache +/// during query planning/execution. +/// +/// # Example +/// +/// For a query that does not provide any predicates, or one that does provide predicates, but they +/// do no get pushed down, the `EXPLAIN` for said query will contain a line for the `MetaCacheExec` +/// with no predicates, including what is emitted by the inner `MemoryExec`: +/// +/// ```text +/// MetaCacheExec: inner=MemoryExec: partitions=1, partition_sizes=[1] +/// ``` +/// +/// For queries that do have predicates that get pushed down, the output will include them, e.g.: +/// +/// ```text +/// MetaCacheExec: predicates=[[0 IN (us-east)], [1 IN (a,b)]] inner=MemoryExec: partitions=1, partition_sizes=[1] +/// ``` +#[derive(Debug)] +struct MetaCacheExec { + inner: MemoryExec, + predicates: Option>, +} + +impl MetaCacheExec { + fn try_new( + predicates: Option>, + partitions: &[Vec], + schema: SchemaRef, + projection: Option>, + ) -> Result { + Ok(Self { + inner: MemoryExec::try_new(partitions, schema, projection)?, + predicates, + }) + } + + fn with_show_sizes(self, show_sizes: bool) -> Self { + Self { + inner: self.inner.with_show_sizes(show_sizes), + ..self + } + } +} + +impl DisplayAs for MetaCacheExec { + fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!(f, "MetaCacheExec:")?; + if let Some(predicates) = self.predicates.as_ref() { + write!(f, " predicates=[")?; + let mut p_iter = predicates.iter(); + while let Some((col_id, predicate)) = p_iter.next() { + write!(f, "[{col_id} {predicate}]")?; + if p_iter.size_hint().0 > 0 { + write!(f, ", ")?; + } + } + write!(f, "]")?; + } + write!(f, " inner=")?; + self.inner.fmt_as(t, f) + } + } + } +} + +impl ExecutionPlan for MetaCacheExec { + fn name(&self) -> &str { + "MetaCacheExec" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &datafusion::physical_plan::PlanProperties { + self.inner.properties() + } + + fn children(&self) -> Vec<&Arc> { + self.inner.children() + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result> { + // (copied from MemoryExec): + // MemoryExec has no children + if children.is_empty() { + Ok(self) + } else { + internal_err!("Children cannot be replaced in {self:?}") + } + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> Result { + self.inner.execute(partition, context) + } +} diff --git a/influxdb3_catalog/src/catalog.rs b/influxdb3_catalog/src/catalog.rs index b0c9f0bfbf8..76dcd5699d0 100644 --- a/influxdb3_catalog/src/catalog.rs +++ b/influxdb3_catalog/src/catalog.rs @@ -7,6 +7,7 @@ use indexmap::IndexMap; use influxdb3_id::{ColumnId, DbId, SerdeVecMap, TableId}; use influxdb3_wal::{ CatalogBatch, CatalogOp, FieldAdditions, LastCacheDefinition, LastCacheDelete, + MetaCacheDefinition, }; use influxdb_line_protocol::FieldValue; use observability_deps::tracing::info; @@ -651,6 +652,7 @@ pub struct TableDefinition { pub column_map: BiHashMap>, pub series_key: Option>, pub last_caches: HashMap, LastCacheDefinition>, + pub meta_caches: HashMap, MetaCacheDefinition>, } impl TableDefinition { @@ -705,6 +707,7 @@ impl TableDefinition { column_map, series_key, last_caches: HashMap::new(), + meta_caches: HashMap::new(), }) } @@ -933,6 +936,12 @@ impl TableDefinition { .map(|(name, def)| (Arc::clone(name), def)) } + pub fn meta_caches(&self) -> impl Iterator, &MetaCacheDefinition)> { + self.meta_caches + .iter() + .map(|(name, def)| (Arc::clone(name), def)) + } + pub fn column_name_to_id(&self, name: impl Into>) -> Option { self.column_map.get_by_right(&name.into()).copied() } diff --git a/influxdb3_wal/src/create.rs b/influxdb3_wal/src/create.rs index c91cf2ddfe8..915afaebfa6 100644 --- a/influxdb3_wal/src/create.rs +++ b/influxdb3_wal/src/create.rs @@ -19,6 +19,10 @@ pub fn wal_contents( } } +pub fn write_batch_op(write_batch: WriteBatch) -> WalOp { + WalOp::Write(write_batch) +} + pub fn catalog_batch_op( db_id: DbId, db_name: impl Into>, diff --git a/influxdb3_wal/src/lib.rs b/influxdb3_wal/src/lib.rs index eb49426e0c3..3b1a018db3d 100644 --- a/influxdb3_wal/src/lib.rs +++ b/influxdb3_wal/src/lib.rs @@ -475,6 +475,23 @@ pub struct LastCacheDelete { pub name: Arc, } +/// Defines a metadata cache in a given table and database +#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)] +pub struct MetaCacheDefinition { + /// The id of the associated table + pub table_id: TableId, + /// The name of the associated table + pub table_name: Arc, + /// The name of the cache, is unique within the associated table + pub cache_name: Arc, + /// The ids of columns tracked by this metadata cache, in the defined order + pub column_ids: Vec, + /// The maximum number of distinct value combintions the cache will hold + pub max_cardinality: usize, + /// The maximum age in seconds, similar to a time-to-live (TTL), for entries in the cache + pub max_age_seconds: u64, +} + #[serde_as] #[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] pub struct WriteBatch { diff --git a/influxdb3_write/src/write_buffer/validator.rs b/influxdb3_write/src/write_buffer/validator.rs index e0632781612..845ef8cd5c0 100644 --- a/influxdb3_write/src/write_buffer/validator.rs +++ b/influxdb3_write/src/write_buffer/validator.rs @@ -723,7 +723,7 @@ fn validate_and_qualify_v1_line( /// Result of conversion from line protocol to valid chunked data /// for the buffer. #[derive(Debug)] -pub(crate) struct ValidatedLines { +pub struct ValidatedLines { /// Number of lines passed in pub(crate) line_count: usize, /// Number of fields passed in @@ -738,6 +738,12 @@ pub(crate) struct ValidatedLines { pub(crate) catalog_updates: Option, } +impl From for WriteBatch { + fn from(value: ValidatedLines) -> Self { + value.valid_data + } +} + impl WriteValidator { /// Convert this into the inner [`LinesParsed`] /// @@ -752,7 +758,7 @@ impl WriteValidator { /// This involves splitting out the writes into different batches for each chunk, which will /// map to the `Gen1Duration`. This function should be infallible, because /// the schema for incoming writes has been fully validated. - pub(crate) fn convert_lines_to_buffer(self, gen1_duration: Gen1Duration) -> ValidatedLines { + pub fn convert_lines_to_buffer(self, gen1_duration: Gen1Duration) -> ValidatedLines { let mut table_chunks = IndexMap::new(); let line_count = self.state.lines.len(); let mut field_count = 0;