diff --git a/crates/store/re_chunk_store/src/query.rs b/crates/store/re_chunk_store/src/query.rs index f6bc4cc2a13e..444041be912d 100644 --- a/crates/store/re_chunk_store/src/query.rs +++ b/crates/store/re_chunk_store/src/query.rs @@ -1,6 +1,6 @@ use std::{collections::BTreeSet, sync::Arc}; -use itertools::Itertools; +use itertools::{Either, Itertools}; use nohash_hasher::IntSet; use re_chunk::{Chunk, LatestAtQuery, RangeQuery}; @@ -587,40 +587,78 @@ impl ChunkStore { chunks } - /// Returns the most-relevant _temporal_ chunk(s) for the given [`LatestAtQuery`]. + /// Returns the most-relevant chunk(s) for the given [`LatestAtQuery`]. /// - /// The returned vector is guaranteed free of duplicates, by definition. + /// Optionally include static data. /// /// The [`ChunkStore`] always work at the [`Chunk`] level (as opposed to the row level): it is /// oblivious to the data therein. /// For that reason, and because [`Chunk`]s are allowed to temporally overlap, it is possible /// that a query has more than one relevant chunk. /// + /// The returned vector is free of duplicates. + /// /// The caller should filter the returned chunks further (see [`Chunk::latest_at`]) in order to /// determine what exact row contains the final result. - /// - /// **This ignores static data.** pub fn latest_at_relevant_chunks_for_all_components( &self, query: &LatestAtQuery, entity_path: &EntityPath, + include_static: bool, ) -> Vec> { re_tracing::profile_function!(format!("{query:?}")); - let chunks = self - .temporal_chunk_ids_per_entity - .get(entity_path) - .and_then(|temporal_chunk_ids_per_timeline| { - temporal_chunk_ids_per_timeline.get(&query.timeline()) - }) - .and_then(|temporal_chunk_ids_per_time| { - self.latest_at(query, temporal_chunk_ids_per_time) - }) - .unwrap_or_default(); - - debug_assert!(chunks.iter().map(|chunk| chunk.id()).all_unique()); - - chunks + if include_static { + let empty = Default::default(); + let static_chunks_per_component = self + .static_chunk_ids_per_entity + .get(entity_path) + .unwrap_or(&empty); + + // All static chunks for the given entity + let static_chunks = static_chunks_per_component + .values() + .filter_map(|chunk_id| self.chunks_per_chunk_id.get(chunk_id)) + .cloned(); + + // All temporal chunks for the given entity, filtered by components + // for which we already have static chunks. + let temporal_chunks = self + .temporal_chunk_ids_per_entity_per_component + .get(entity_path) + .and_then(|temporal_chunk_ids_per_timeline_per_component| { + temporal_chunk_ids_per_timeline_per_component.get(&query.timeline()) + }) + .map(|temporal_chunk_ids_per_component| { + temporal_chunk_ids_per_component + .iter() + .filter(|(component_name, _)| { + !static_chunks_per_component.contains_key(component_name) + }) + .map(|(_, chunk_id_set)| chunk_id_set) + }) + .into_iter() + .flatten() + .filter_map(|temporal_chunk_ids_per_time| { + self.latest_at(query, temporal_chunk_ids_per_time) + }) + .flatten() + // The latest_at queries may yield duplicate chunks, and it's unlikely + // anyone will use this without also deduplicating first. + .unique_by(|chunk| chunk.id()); + + static_chunks.chain(temporal_chunks).collect_vec() + } else { + self.temporal_chunk_ids_per_entity + .get(entity_path) + .and_then(|temporal_chunk_ids_per_timeline| { + temporal_chunk_ids_per_timeline.get(&query.timeline()) + }) + .and_then(|temporal_chunk_ids_per_time| { + self.latest_at(query, temporal_chunk_ids_per_time) + }) + .unwrap_or_default() + } } fn latest_at( @@ -743,49 +781,89 @@ impl ChunkStore { chunks } - /// Returns the most-relevant _temporal_ chunk(s) for the given [`RangeQuery`]. - /// - /// The returned vector is guaranteed free of duplicates, by definition. + /// Returns the most-relevant chunk(s) for the given [`RangeQuery`]. /// /// The criteria for returning a chunk is only that it may contain data that overlaps with - /// the queried range. + /// the queried range, or that it is static. + /// + /// The returned vector is free of duplicates. /// /// The caller should filter the returned chunks further (see [`Chunk::range`]) in order to /// determine how exactly each row of data fit with the rest. - /// - /// **This ignores static data.** pub fn range_relevant_chunks_for_all_components( &self, query: &RangeQuery, entity_path: &EntityPath, + include_static: bool, ) -> Vec> { re_tracing::profile_function!(format!("{query:?}")); - let chunks = self - .range( - query, - self.temporal_chunk_ids_per_entity - .get(entity_path) - .and_then(|temporal_chunk_ids_per_timeline| { - temporal_chunk_ids_per_timeline.get(&query.timeline()) - }) - .into_iter(), + let empty = Default::default(); + let chunks = if include_static { + let static_chunks_per_component = self + .static_chunk_ids_per_entity + .get(entity_path) + .unwrap_or(&empty); + + // All static chunks for the given entity + let static_chunks = static_chunks_per_component + .values() + .filter_map(|chunk_id| self.chunks_per_chunk_id.get(chunk_id)) + .cloned(); + + // All temporal chunks for the given entity, filtered by components + // for which we already have static chunks. + let temporal_chunks = self + .range( + query, + self.temporal_chunk_ids_per_entity_per_component + .get(entity_path) + .and_then(|temporal_chunk_ids_per_timeline_per_component| { + temporal_chunk_ids_per_timeline_per_component.get(&query.timeline()) + }) + .map(|temporal_chunk_ids_per_component| { + temporal_chunk_ids_per_component + .iter() + .filter(|(component_name, _)| { + !static_chunks_per_component.contains_key(component_name) + }) + .map(|(_, chunk_id_set)| chunk_id_set) + }) + .into_iter() + .flatten(), + ) + .into_iter() + // The range query may yield duplicate chunks, and it's unlikely + // anyone will use this without deduplicating first. + .unique_by(|chunk| chunk.id()); + + Either::Left(static_chunks.chain(temporal_chunks)) + } else { + Either::Right( + self.range( + query, + self.temporal_chunk_ids_per_entity + .get(entity_path) + .and_then(|temporal_chunk_ids_per_timeline| { + temporal_chunk_ids_per_timeline.get(&query.timeline()) + }) + .into_iter(), + ), ) + }; + + // Post-processing: `Self::range` doesn't have access to the chunk metadata, so now we + // need to make sure that the resulting chunks' global time ranges intersect with the + // time range of the query itself. + chunks .into_iter() - // Post-processing: `Self::range` doesn't have access to the chunk metadata, so now we - // need to make sure that the resulting chunks' global time ranges intersect with the - // time range of the query itself. .filter(|chunk| { chunk .timelines() .get(&query.timeline()) .is_some_and(|time_column| time_column.time_range().intersects(query.range())) }) - .collect_vec(); - - debug_assert!(chunks.iter().map(|chunk| chunk.id()).all_unique()); - - chunks + .collect_vec() } fn range<'a>( diff --git a/crates/store/re_chunk_store/tests/reads.rs b/crates/store/re_chunk_store/tests/reads.rs index 7e5760ee9fc6..ab3882113980 100644 --- a/crates/store/re_chunk_store/tests/reads.rs +++ b/crates/store/re_chunk_store/tests/reads.rs @@ -322,6 +322,7 @@ fn latest_at() -> anyhow::Result<()> { .latest_at_relevant_chunks_for_all_components( &LatestAtQuery::new(timeline_frame_nr, frame_nr), &entity_path, + false, /* don't include static data */ ) .into_iter() .map(|chunk| chunk.id()) @@ -442,6 +443,7 @@ fn latest_at_sparse_component_edge_case() -> anyhow::Result<()> { .latest_at_relevant_chunks_for_all_components( &LatestAtQuery::new(timeline_frame_nr, frame_nr), &entity_path, + false, /* don't include static data */ ) .into_iter() .map(|chunk| { @@ -602,6 +604,7 @@ fn latest_at_overlapped_chunks() -> anyhow::Result<()> { .latest_at_relevant_chunks_for_all_components( &LatestAtQuery::new(timeline_frame_nr, frame_nr), &entity_path, + false, /* don't include static data */ ) .into_iter() .map(|chunk| { @@ -899,6 +902,7 @@ fn range() -> anyhow::Result<()> { .range_relevant_chunks_for_all_components( &RangeQuery::new(timeline_frame_nr, time_range), &entity_path, + false, /* don't include static data */ ) .into_iter() .map(|chunk| { @@ -1074,6 +1078,7 @@ fn range_overlapped_chunks() -> anyhow::Result<()> { .range_relevant_chunks_for_all_components( &RangeQuery::new(timeline_frame_nr, time_range), &entity_path, + false, /* don't include static data */ ) .into_iter() .map(|chunk| {