Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include static chunks in all-component queries #9044

Merged
merged 7 commits into from
Feb 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 120 additions & 42 deletions crates/store/re_chunk_store/src/query.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{collections::BTreeSet, sync::Arc};

use itertools::Itertools;
use itertools::{Either, Itertools};
use nohash_hasher::IntSet;

use re_chunk::{Chunk, LatestAtQuery, RangeQuery};
Expand Down Expand Up @@ -587,40 +587,78 @@ impl ChunkStore {
chunks
}

/// Returns the most-relevant _temporal_ chunk(s) for the given [`LatestAtQuery`].
/// Returns the most-relevant chunk(s) for the given [`LatestAtQuery`].
///
/// The returned vector is guaranteed free of duplicates, by definition.
/// Optionally include static data.
///
/// The [`ChunkStore`] always work at the [`Chunk`] level (as opposed to the row level): it is
/// oblivious to the data therein.
/// For that reason, and because [`Chunk`]s are allowed to temporally overlap, it is possible
/// that a query has more than one relevant chunk.
///
/// The returned vector is free of duplicates.
///
/// The caller should filter the returned chunks further (see [`Chunk::latest_at`]) in order to
/// determine what exact row contains the final result.
///
/// **This ignores static data.**
pub fn latest_at_relevant_chunks_for_all_components(
&self,
query: &LatestAtQuery,
entity_path: &EntityPath,
include_static: bool,
) -> Vec<Arc<Chunk>> {
re_tracing::profile_function!(format!("{query:?}"));

let chunks = self
.temporal_chunk_ids_per_entity
.get(entity_path)
.and_then(|temporal_chunk_ids_per_timeline| {
temporal_chunk_ids_per_timeline.get(&query.timeline())
})
.and_then(|temporal_chunk_ids_per_time| {
self.latest_at(query, temporal_chunk_ids_per_time)
})
.unwrap_or_default();

debug_assert!(chunks.iter().map(|chunk| chunk.id()).all_unique());

chunks
if include_static {
let empty = Default::default();
let static_chunks_per_component = self
.static_chunk_ids_per_entity
.get(entity_path)
.unwrap_or(&empty);

// All static chunks for the given entity
let static_chunks = static_chunks_per_component
.values()
.filter_map(|chunk_id| self.chunks_per_chunk_id.get(chunk_id))
.cloned();

// All temporal chunks for the given entity, filtered by components
// for which we already have static chunks.
let temporal_chunks = self
.temporal_chunk_ids_per_entity_per_component
.get(entity_path)
.and_then(|temporal_chunk_ids_per_timeline_per_component| {
temporal_chunk_ids_per_timeline_per_component.get(&query.timeline())
})
.map(|temporal_chunk_ids_per_component| {
temporal_chunk_ids_per_component
.iter()
.filter(|(component_name, _)| {
!static_chunks_per_component.contains_key(component_name)
})
.map(|(_, chunk_id_set)| chunk_id_set)
})
.into_iter()
.flatten()
.filter_map(|temporal_chunk_ids_per_time| {
self.latest_at(query, temporal_chunk_ids_per_time)
})
.flatten()
// The latest_at queries may yield duplicate chunks, and it's unlikely
// anyone will use this without also deduplicating first.
.unique_by(|chunk| chunk.id());
jprochazk marked this conversation as resolved.
Show resolved Hide resolved

static_chunks.chain(temporal_chunks).collect_vec()
} else {
self.temporal_chunk_ids_per_entity
.get(entity_path)
.and_then(|temporal_chunk_ids_per_timeline| {
temporal_chunk_ids_per_timeline.get(&query.timeline())
})
.and_then(|temporal_chunk_ids_per_time| {
self.latest_at(query, temporal_chunk_ids_per_time)
})
.unwrap_or_default()
}
}

fn latest_at(
Expand Down Expand Up @@ -743,49 +781,89 @@ impl ChunkStore {
chunks
}

/// Returns the most-relevant _temporal_ chunk(s) for the given [`RangeQuery`].
///
/// The returned vector is guaranteed free of duplicates, by definition.
/// Returns the most-relevant chunk(s) for the given [`RangeQuery`].
///
/// The criteria for returning a chunk is only that it may contain data that overlaps with
/// the queried range.
/// the queried range, or that it is static.
///
/// The returned vector is free of duplicates.
///
/// The caller should filter the returned chunks further (see [`Chunk::range`]) in order to
/// determine how exactly each row of data fit with the rest.
///
/// **This ignores static data.**
pub fn range_relevant_chunks_for_all_components(
&self,
query: &RangeQuery,
entity_path: &EntityPath,
include_static: bool,
) -> Vec<Arc<Chunk>> {
re_tracing::profile_function!(format!("{query:?}"));

let chunks = self
.range(
query,
self.temporal_chunk_ids_per_entity
.get(entity_path)
.and_then(|temporal_chunk_ids_per_timeline| {
temporal_chunk_ids_per_timeline.get(&query.timeline())
})
.into_iter(),
let empty = Default::default();
let chunks = if include_static {
let static_chunks_per_component = self
.static_chunk_ids_per_entity
.get(entity_path)
.unwrap_or(&empty);

// All static chunks for the given entity
let static_chunks = static_chunks_per_component
.values()
.filter_map(|chunk_id| self.chunks_per_chunk_id.get(chunk_id))
.cloned();

// All temporal chunks for the given entity, filtered by components
// for which we already have static chunks.
let temporal_chunks = self
.range(
query,
self.temporal_chunk_ids_per_entity_per_component
.get(entity_path)
.and_then(|temporal_chunk_ids_per_timeline_per_component| {
temporal_chunk_ids_per_timeline_per_component.get(&query.timeline())
})
.map(|temporal_chunk_ids_per_component| {
temporal_chunk_ids_per_component
.iter()
.filter(|(component_name, _)| {
!static_chunks_per_component.contains_key(component_name)
jprochazk marked this conversation as resolved.
Show resolved Hide resolved
})
.map(|(_, chunk_id_set)| chunk_id_set)
})
.into_iter()
.flatten(),
)
.into_iter()
// The range query may yield duplicate chunks, and it's unlikely
// anyone will use this without deduplicating first.
.unique_by(|chunk| chunk.id());

Either::Left(static_chunks.chain(temporal_chunks))
} else {
Either::Right(
self.range(
query,
self.temporal_chunk_ids_per_entity
.get(entity_path)
.and_then(|temporal_chunk_ids_per_timeline| {
temporal_chunk_ids_per_timeline.get(&query.timeline())
})
.into_iter(),
),
)
};

// Post-processing: `Self::range` doesn't have access to the chunk metadata, so now we
// need to make sure that the resulting chunks' global time ranges intersect with the
// time range of the query itself.
chunks
.into_iter()
// Post-processing: `Self::range` doesn't have access to the chunk metadata, so now we
// need to make sure that the resulting chunks' global time ranges intersect with the
// time range of the query itself.
.filter(|chunk| {
chunk
.timelines()
.get(&query.timeline())
.is_some_and(|time_column| time_column.time_range().intersects(query.range()))
})
.collect_vec();

debug_assert!(chunks.iter().map(|chunk| chunk.id()).all_unique());

chunks
.collect_vec()
}

fn range<'a>(
Expand Down
5 changes: 5 additions & 0 deletions crates/store/re_chunk_store/tests/reads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ fn latest_at() -> anyhow::Result<()> {
.latest_at_relevant_chunks_for_all_components(
&LatestAtQuery::new(timeline_frame_nr, frame_nr),
&entity_path,
false, /* don't include static data */
)
.into_iter()
.map(|chunk| chunk.id())
Expand Down Expand Up @@ -442,6 +443,7 @@ fn latest_at_sparse_component_edge_case() -> anyhow::Result<()> {
.latest_at_relevant_chunks_for_all_components(
&LatestAtQuery::new(timeline_frame_nr, frame_nr),
&entity_path,
false, /* don't include static data */
)
.into_iter()
.map(|chunk| {
Expand Down Expand Up @@ -602,6 +604,7 @@ fn latest_at_overlapped_chunks() -> anyhow::Result<()> {
.latest_at_relevant_chunks_for_all_components(
&LatestAtQuery::new(timeline_frame_nr, frame_nr),
&entity_path,
false, /* don't include static data */
)
.into_iter()
.map(|chunk| {
Expand Down Expand Up @@ -899,6 +902,7 @@ fn range() -> anyhow::Result<()> {
.range_relevant_chunks_for_all_components(
&RangeQuery::new(timeline_frame_nr, time_range),
&entity_path,
false, /* don't include static data */
)
.into_iter()
.map(|chunk| {
Expand Down Expand Up @@ -1074,6 +1078,7 @@ fn range_overlapped_chunks() -> anyhow::Result<()> {
.range_relevant_chunks_for_all_components(
&RangeQuery::new(timeline_frame_nr, time_range),
&entity_path,
false, /* don't include static data */
)
.into_iter()
.map(|chunk| {
Expand Down