From 04f10ad2905007536944e139d81a76516d40bbd4 Mon Sep 17 00:00:00 2001 From: Trevor Hilton Date: Tue, 11 Feb 2025 09:22:54 -0500 Subject: [PATCH] fix: distinct cache panic on projection pushdown (#25988) Fixed a bug in the distinct cache where projection that skipped column in the cache hierarchy caused a panic. This simplifies the display of the projection in the DistinctCacheExec in EXPLAIN output to not include the column index, and only the name. --- influxdb3_cache/src/distinct_cache/cache.rs | 8 ++ influxdb3_cache/src/distinct_cache/mod.rs | 117 +++++++++++++++--- .../src/distinct_cache/table_function.rs | 21 ++-- 3 files changed, 116 insertions(+), 30 deletions(-) diff --git a/influxdb3_cache/src/distinct_cache/cache.rs b/influxdb3_cache/src/distinct_cache/cache.rs index e946d51a641..dedf78669d0 100644 --- a/influxdb3_cache/src/distinct_cache/cache.rs +++ b/influxdb3_cache/src/distinct_cache/cache.rs @@ -16,6 +16,7 @@ use influxdb3_catalog::catalog::TableDefinition; use influxdb3_id::{ColumnId, TableId}; use influxdb3_wal::{DistinctCacheDefinition, FieldData, Row}; use iox_time::TimeProvider; +use observability_deps::tracing::debug; use schema::{InfluxColumnType, InfluxFieldType}; use serde::{Deserialize, Serialize}; @@ -229,6 +230,13 @@ impl DistinctCache { projection: Option<&[usize]>, limit: Option, ) -> Result { + debug!( + ?schema, + ?predicates, + ?projection, + ?limit, + ">>> distinct cache record batches" + ); let n_columns = projection .as_ref() .and_then(|p| p.iter().max().copied()) diff --git a/influxdb3_cache/src/distinct_cache/mod.rs b/influxdb3_cache/src/distinct_cache/mod.rs index 8dad4ea46fa..cfaf9d11f6d 100644 --- a/influxdb3_cache/src/distinct_cache/mod.rs +++ b/influxdb3_cache/src/distinct_cache/mod.rs @@ -392,7 +392,7 @@ mod tests { /// EXPLAIN on the same query. The EXPLAIN output contains a line for the DistinctCacheExec, which /// is the custom execution plan impl for the distinct value cache that captures the predicates that /// are pushed down to the underlying [`DistinctCacahe::to_record_batch`] method, if any. - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_datafusion_distinct_cache_udtf() { // create a test writer and do a write in to populate the catalog with a db/table: let writer = TestWriter::new(); @@ -508,7 +508,7 @@ mod tests { "| us-west | d |", "+---------+------+", ], - explain_contains: "DistinctCacheExec: projection=[region@0, host@1] inner=MemoryExec: partitions=1, partition_sizes=[1]", + explain_contains: "DistinctCacheExec: projection=[region, host] inner=MemoryExec: partitions=1, partition_sizes=[1]", use_sorted_assert: false, }, TestCase { @@ -522,7 +522,7 @@ mod tests { "| us-east | b |", "+---------+------+", ], - explain_contains: "DistinctCacheExec: projection=[region@0, host@1] predicates=[[region@0 IN (us-east)]] inner=MemoryExec: partitions=1, partition_sizes=[1]", + explain_contains: "DistinctCacheExec: projection=[region, host] predicates=[[region@0 IN (us-east)]] inner=MemoryExec: partitions=1, partition_sizes=[1]", use_sorted_assert: false, }, TestCase { @@ -536,7 +536,7 @@ mod tests { "| us-east | a |", "+---------+------+", ], - explain_contains: "DistinctCacheExec: projection=[region@0, host@1] predicates=[[region@0 IN (us-east)], [host@1 IN (a)]] inner=MemoryExec: partitions=1, partition_sizes=[1]", + explain_contains: "DistinctCacheExec: projection=[region, host] predicates=[[region@0 IN (us-east)], [host@1 IN (a)]] inner=MemoryExec: partitions=1, partition_sizes=[1]", use_sorted_assert: false, }, TestCase { @@ -551,7 +551,7 @@ mod tests { "| us-east | b |", "+---------+------+", ], - explain_contains: "DistinctCacheExec: projection=[region@0, host@1] predicates=[[region@0 IN (us-east)], [host@1 IN (a,b)]] inner=MemoryExec: partitions=1, partition_sizes=[1]", + explain_contains: "DistinctCacheExec: projection=[region, host] predicates=[[region@0 IN (us-east)], [host@1 IN (a,b)]] inner=MemoryExec: partitions=1, partition_sizes=[1]", use_sorted_assert: false, }, TestCase { @@ -565,7 +565,7 @@ mod tests { "| us-east | b |", "+---------+------+", ], - explain_contains: "DistinctCacheExec: projection=[region@0, host@1] predicates=[[region@0 IN (us-east)], [host@1 NOT IN (a)]] inner=MemoryExec: partitions=1, partition_sizes=[1]", + explain_contains: "DistinctCacheExec: projection=[region, host] predicates=[[region@0 IN (us-east)], [host@1 NOT IN (a)]] inner=MemoryExec: partitions=1, partition_sizes=[1]", use_sorted_assert: false, }, TestCase { @@ -584,7 +584,7 @@ mod tests { "| us-west | d |", "+---------+------+", ], - explain_contains: "DistinctCacheExec: projection=[region@0, host@1] predicates=[[region@0 IN (ca-cent,ca-east,us-east,us-west)]] inner=MemoryExec: partitions=1, partition_sizes=[1]", + explain_contains: "DistinctCacheExec: projection=[region, host] predicates=[[region@0 IN (ca-cent,ca-east,us-east,us-west)]] inner=MemoryExec: partitions=1, partition_sizes=[1]", use_sorted_assert: false, }, TestCase { @@ -603,7 +603,7 @@ mod tests { "| eu-west | l |", "+---------+------+", ], - explain_contains: "DistinctCacheExec: projection=[region@0, host@1] predicates=[[region@0 NOT IN (ca-cent,ca-east,us-east,us-west)]] inner=MemoryExec: partitions=1, partition_sizes=[1]", + explain_contains: "DistinctCacheExec: projection=[region, host] predicates=[[region@0 NOT IN (ca-cent,ca-east,us-east,us-west)]] inner=MemoryExec: partitions=1, partition_sizes=[1]", use_sorted_assert: false, }, TestCase { @@ -619,7 +619,7 @@ mod tests { "| us-east | b |", "+---------+------+", ], - explain_contains: "DistinctCacheExec: projection=[region@0, host@1] predicates=[[region@0 IN (ca-east,us-east)]] inner=MemoryExec: partitions=1, partition_sizes=[1]", + explain_contains: "DistinctCacheExec: projection=[region, host] predicates=[[region@0 IN (ca-east,us-east)]] inner=MemoryExec: partitions=1, partition_sizes=[1]", use_sorted_assert: false, }, TestCase { @@ -634,7 +634,7 @@ mod tests { "| us-west | d |", "+---------+------+", ], - explain_contains: "DistinctCacheExec: projection=[region@0, host@1] predicates=[[host@1 IN (d,e)]] inner=MemoryExec: partitions=1, partition_sizes=[1]", + explain_contains: "DistinctCacheExec: projection=[region, host] predicates=[[host@1 IN (d,e)]] inner=MemoryExec: partitions=1, partition_sizes=[1]", use_sorted_assert: false, }, TestCase { @@ -649,7 +649,7 @@ mod tests { "| us-east | b |", "+---------+------+", ], - explain_contains: "DistinctCacheExec: projection=[region@0, host@1] inner=MemoryExec: partitions=1, partition_sizes=[1]", + explain_contains: "DistinctCacheExec: projection=[region, host] inner=MemoryExec: partitions=1, partition_sizes=[1]", use_sorted_assert: false, }, TestCase { @@ -664,7 +664,7 @@ mod tests { "| us-east | b |", "+---------+------+", ], - explain_contains: "DistinctCacheExec: projection=[region@0, host@1] predicates=[[region@0 IN (us-east)], [host@1 IN (a,b)]] inner=MemoryExec: partitions=1, partition_sizes=[1]", + explain_contains: "DistinctCacheExec: projection=[region, host] predicates=[[region@0 IN (us-east)], [host@1 IN (a,b)]] inner=MemoryExec: partitions=1, partition_sizes=[1]", use_sorted_assert: false, }, TestCase { @@ -683,7 +683,7 @@ mod tests { "| us-west |", "+---------+", ], - explain_contains: "DistinctCacheExec: projection=[region@0] inner=MemoryExec: partitions=1, partition_sizes=[1]", + explain_contains: "DistinctCacheExec: projection=[region] inner=MemoryExec: partitions=1, partition_sizes=[1]", use_sorted_assert: false, }, TestCase { @@ -702,7 +702,7 @@ mod tests { "| us-west |", "+---------+", ], - explain_contains: "DistinctCacheExec: projection=[region@0] inner=MemoryExec: partitions=1, partition_sizes=[1", + explain_contains: "DistinctCacheExec: projection=[region] inner=MemoryExec: partitions=1, partition_sizes=[1", // it seems that DISTINCT changes around the order of results use_sorted_assert: true, }, @@ -727,7 +727,7 @@ mod tests { "| l |", // commenting for no new line "+------+", // commenting for no new line ], - explain_contains: "DistinctCacheExec: projection=[host@1] inner=MemoryExec: partitions=1, partition_sizes=[1]", + explain_contains: "DistinctCacheExec: projection=[host] inner=MemoryExec: partitions=1, partition_sizes=[1]", // this column will not be sorted since the order of elements depends on the next level // up in the cache, so the `region` column is iterated over in order, but the nested // `host` values, although sorted within `region`s, will not be globally sorted. @@ -743,7 +743,7 @@ mod tests { "| f |", // commenting for no new line "+------+", // commenting for no new line ], - explain_contains: "DistinctCacheExec: projection=[region@0, host@1] predicates=[[region@0 IN (ca-cent)]] inner=MemoryExec: partitions=1, partition_sizes=[1]", + explain_contains: "DistinctCacheExec: projection=[region, host] predicates=[[region@0 IN (ca-cent)]] inner=MemoryExec: partitions=1, partition_sizes=[1]", use_sorted_assert: false, }, TestCase { @@ -763,7 +763,7 @@ mod tests { "| eu-west | l |", "+---------+------+", ], - explain_contains: "DistinctCacheExec: projection=[region@0, host@1] limit=8 inner=MemoryExec: partitions=1, partition_sizes=[1]", + explain_contains: "DistinctCacheExec: projection=[region, host] limit=8 inner=MemoryExec: partitions=1, partition_sizes=[1]", use_sorted_assert: false, }, TestCase { @@ -779,7 +779,7 @@ mod tests { "| us-west | d |", "+---------+------+", ], - explain_contains: "DistinctCacheExec: projection=[region@0, host@1] limit=16 inner=MemoryExec: partitions=1, partition_sizes=[1]", + explain_contains: "DistinctCacheExec: projection=[region, host] limit=16 inner=MemoryExec: partitions=1, partition_sizes=[1]", use_sorted_assert: false, }, TestCase { @@ -796,7 +796,7 @@ mod tests { "| us-west | d |", "+---------+------+", ], - explain_contains: "DistinctCacheExec: projection=[region@0, host@1] inner=MemoryExec: partitions=1, partition_sizes=[1]", + explain_contains: "DistinctCacheExec: projection=[region, host] inner=MemoryExec: partitions=1, partition_sizes=[1]", use_sorted_assert: false, }, ]; @@ -838,4 +838,83 @@ mod tests { ); } } + + #[test_log::test(tokio::test)] + async fn test_projection_pushdown_indexing() { + let writer = TestWriter::new(); + let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0))); + let _ = writer.write_lp_to_rows( + "\ + wind_data,city=Berlin,country=Germany,county=Berlin wind_speed=14.63,wind_direction=270i\n\ + ", + 0, + ); + let table_def = writer.db_schema().table_definition("wind_data").unwrap(); + let column_ids: Vec = ["country", "county", "city"] + .into_iter() + .map(|name| table_def.column_name_to_id_unchecked(name)) + .collect(); + + let distinct_provider = + DistinctCacheProvider::new_from_catalog(time_provider, writer.catalog()).unwrap(); + distinct_provider + .create_cache( + writer.db_schema().id, + None, + CreateDistinctCacheArgs { + table_def, + max_cardinality: Default::default(), + max_age: Default::default(), + column_ids, + }, + ) + .unwrap(); + + let write_batch = writer.write_lp_to_write_batch( + "\ + wind_data,city=Berlin,country=Germany,county=Berlin wind_speed=14.63,wind_direction=270i\n\ + wind_data,city=Hamburg,country=Germany,county=Hamburg wind_speed=19.8,wind_direction=26i\n\ + wind_data,city=Munich,country=Germany,county=Bavaria wind_speed=11.77,wind_direction=227i\n\ + wind_data,city=Cologne,country=Germany,county=North\\ Rhine-Westphalia wind_speed=12.44,wind_direction=339i\n\ + wind_data,city=Frankfurt,country=Germany,county=Hesse wind_speed=18.97,wind_direction=96i\n\ + wind_data,city=Stuttgart,country=Germany,county=Baden-Württemberg wind_speed=12.75,wind_direction=332i\n\ + wind_data,city=Dortmund,country=Germany,county=North\\ Rhine-Westphalia wind_speed=12.03,wind_direction=146i\n\ + wind_data,city=Paris,country=France,county=Île-de-France wind_speed=10.3,wind_direction=302i\n\ + wind_data,city=Marseille,country=France,county=Provence-Alpes-Côte\\ d'Azur wind_speed=24.65,wind_direction=288i\n\ + wind_data,city=Lyon,country=France,county=Auvergne-Rhône-Alpes wind_speed=17.83,wind_direction=288i\n\ + wind_data,city=Toulouse,country=France,county=Occitanie wind_speed=20.34,wind_direction=157i\n\ + wind_data,city=Madrid,country=Spain,county=Community\\ of\\ Madrid wind_speed=9.36,wind_direction=348i\n\ + wind_data,city=Barcelona,country=Spain,county=Catalonia wind_speed=16.52,wind_direction=14i\n\ + ", 100); + let wal_contents = influxdb3_wal::create::wal_contents( + (0, 100, 1), + [influxdb3_wal::create::write_batch_op(write_batch)], + ); + distinct_provider.write_wal_contents_to_cache(&wal_contents); + + let ctx = SessionContext::new(); + let distinct_func = + DistinctCacheFunction::new(writer.db_schema().id, Arc::clone(&distinct_provider)); + ctx.register_udtf(DISTINCT_CACHE_UDTF_NAME, Arc::new(distinct_func)); + + let results = ctx + .sql("select country, city from distinct_cache('wind_data') where country = 'Spain'") + .await + .unwrap() + .collect() + .await + .unwrap(); + + assert_batches_eq!( + [ + "+---------+-----------+", + "| country | city |", + "+---------+-----------+", + "| Spain | Barcelona |", + "| Spain | Madrid |", + "+---------+-----------+", + ], + &results + ); + } } diff --git a/influxdb3_cache/src/distinct_cache/table_function.rs b/influxdb3_cache/src/distinct_cache/table_function.rs index dd934be4b8c..485bc2d0919 100644 --- a/influxdb3_cache/src/distinct_cache/table_function.rs +++ b/influxdb3_cache/src/distinct_cache/table_function.rs @@ -102,8 +102,8 @@ impl TableProvider for DistinctCacheFunctionProvider { predicates, Arc::clone(&self.table_def), &[batches], - self.schema(), - projection, + schema, + projection.is_some(), limit, )?; @@ -292,7 +292,7 @@ struct DistinctCacheExec { inner: MemoryExec, table_def: Arc, predicates: Option>, - projection: Option>, + is_projected: bool, limit: Option, } @@ -302,7 +302,7 @@ impl DistinctCacheExec { table_def: Arc, partitions: &[Vec], schema: SchemaRef, - projection: Option<&Vec>, + is_projected: bool, limit: Option, ) -> Result { Ok(Self { @@ -310,7 +310,7 @@ impl DistinctCacheExec { inner: MemoryExec::try_new(partitions, schema, None)?, predicates, table_def, - projection: projection.cloned(), + is_projected, limit, }) } @@ -328,14 +328,13 @@ impl DisplayAs for DistinctCacheExec { match t { DisplayFormatType::Default | DisplayFormatType::Verbose => { write!(f, "DistinctCacheExec:")?; - if let Some(projection) = &self.projection { + if self.is_projected { write!(f, " projection=[")?; let schema = self.schema(); - let mut p_iter = projection.iter(); - while let Some(i) = p_iter.next() { - let name = schema.fields().get(*i).ok_or(std::fmt::Error)?.name(); - write!(f, "{name}@{i}")?; - if p_iter.size_hint().0 > 0 { + let mut field_iter = schema.fields().iter().peekable(); + while let (Some(field), next) = (field_iter.next(), field_iter.peek()) { + write!(f, "{name}", name = field.name())?; + if next.is_some() { write!(f, ", ")?; } }