Skip to content

Commit

Permalink
Encode RowId as FixedSizeBinaryArray (#9038)
Browse files Browse the repository at this point in the history
### Related
* Closes #8992

### What
Change how we encode `RowId` in arrow from a weird `StructArray` to a
`FixedSizeBinaryArray` ([same as how UUID:s are usually
encoded](https://arrow.apache.org/docs/format/CanonicalExtensions.html#uuid)).

The result is code that is shorter, simpler, and faster.

And less embarrassing.

This is a breaking change (a lot of those going around anyways).
  • Loading branch information
emilk authored Feb 14, 2025
1 parent e7a9618 commit 7995dd4
Show file tree
Hide file tree
Showing 14 changed files with 138 additions and 280 deletions.
111 changes: 22 additions & 89 deletions crates/store/re_chunk/src/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use std::sync::atomic::{AtomicU64, Ordering};
use ahash::HashMap;
use arrow::{
array::{
Array as ArrowArray, ArrayRef as ArrowArrayRef, ListArray as ArrowListArray,
StructArray as ArrowStructArray, UInt64Array as ArrowUInt64Array,
Array as ArrowArray, ArrayRef as ArrowArrayRef, FixedSizeBinaryArray,
ListArray as ArrowListArray,
},
buffer::{NullBuffer as ArrowNullBuffer, ScalarBuffer as ArrowScalarBuffer},
};
Expand All @@ -15,8 +15,7 @@ use re_arrow_util::ArrowArrayDowncastRef as _;
use re_byte_size::SizeBytes as _;
use re_log_types::{EntityPath, ResolvedTimeRange, Time, TimeInt, TimePoint, Timeline};
use re_types_core::{
ComponentDescriptor, ComponentName, DeserializationError, Loggable, LoggableBatch,
SerializationError,
ComponentDescriptor, ComponentName, DeserializationError, Loggable, SerializationError,
};

use crate::{ChunkId, RowId};
Expand Down Expand Up @@ -199,7 +198,7 @@ pub struct Chunk {
pub(crate) is_sorted: bool,

/// The respective [`RowId`]s for each row of data.
pub(crate) row_ids: ArrowStructArray,
pub(crate) row_ids: FixedSizeBinaryArray,

/// The time columns.
///
Expand Down Expand Up @@ -378,18 +377,9 @@ impl Chunk {
.take(self.row_ids.len())
.collect_vec();

#[allow(clippy::unwrap_used)]
let row_ids = <RowId as Loggable>::to_arrow(&row_ids)
// Unwrap: native RowIds cannot fail to serialize.
.unwrap()
.downcast_array_ref::<ArrowStructArray>()
// Unwrap: RowId schema is known in advance to be a struct array -- always.
.unwrap()
.clone();

Self {
id,
row_ids,
row_ids: RowId::arrow_from_slice(&row_ids),
..self.clone()
}
}
Expand All @@ -407,14 +397,7 @@ impl Chunk {
.take(self.row_ids.len())
.collect_vec();

#[allow(clippy::unwrap_used)]
let row_ids = <RowId as Loggable>::to_arrow(&row_ids)
// Unwrap: native RowIds cannot fail to serialize.
.unwrap()
.downcast_array_ref::<ArrowStructArray>()
// Unwrap: RowId schema is known in advance to be a struct array -- always.
.unwrap()
.clone();
let row_ids = RowId::arrow_from_slice(&row_ids);

Self { row_ids, ..self }
}
Expand Down Expand Up @@ -740,7 +723,7 @@ impl Chunk {
id: ChunkId,
entity_path: EntityPath,
is_sorted: Option<bool>,
row_ids: ArrowStructArray,
row_ids: FixedSizeBinaryArray,
timelines: IntMap<Timeline, TimeColumn>,
components: ChunkComponents,
) -> ChunkResult<Self> {
Expand Down Expand Up @@ -779,19 +762,7 @@ impl Chunk {
components: ChunkComponents,
) -> ChunkResult<Self> {
re_tracing::profile_function!();
let row_ids = row_ids
.to_arrow()
// NOTE: impossible, but better safe than sorry.
.map_err(|err| ChunkError::Malformed {
reason: format!("RowIds failed to serialize: {err}"),
})?
.downcast_array_ref::<ArrowStructArray>()
// NOTE: impossible, but better safe than sorry.
.ok_or_else(|| ChunkError::Malformed {
reason: "RowIds failed to downcast".to_owned(),
})?
.clone();

let row_ids = RowId::arrow_from_slice(row_ids);
Self::new(id, entity_path, is_sorted, row_ids, timelines, components)
}

Expand Down Expand Up @@ -836,7 +807,7 @@ impl Chunk {
id: ChunkId,
entity_path: EntityPath,
is_sorted: Option<bool>,
row_ids: ArrowStructArray,
row_ids: FixedSizeBinaryArray,
components: ChunkComponents,
) -> ChunkResult<Self> {
Self::new(
Expand All @@ -856,11 +827,7 @@ impl Chunk {
entity_path,
heap_size_bytes: Default::default(),
is_sorted: true,
row_ids: arrow::array::StructBuilder::from_fields(
re_types_core::tuid_arrow_fields(),
0,
)
.finish(),
row_ids: RowId::arrow_from_slice(&[]),
timelines: Default::default(),
components: Default::default(),
}
Expand Down Expand Up @@ -1128,34 +1095,21 @@ impl Chunk {
}

#[inline]
pub fn row_ids_array(&self) -> &ArrowStructArray {
pub fn row_ids_array(&self) -> &FixedSizeBinaryArray {
&self.row_ids
}

/// Returns the [`RowId`]s in their raw-est form: a tuple of (times, counters) arrays.
#[inline]
pub fn row_ids_raw(&self) -> (&ArrowUInt64Array, &ArrowUInt64Array) {
let [times, counters] = self.row_ids.columns() else {
panic!("RowIds are corrupt -- this should be impossible (sanity checked)");
};

#[allow(clippy::unwrap_used)]
let times = times.downcast_array_ref::<ArrowUInt64Array>().unwrap(); // sanity checked

#[allow(clippy::unwrap_used)]
let counters = counters.downcast_array_ref::<ArrowUInt64Array>().unwrap(); // sanity checked

(times, counters)
pub fn row_ids_slice(&self) -> &[RowId] {
RowId::slice_from_arrow(&self.row_ids)
}

/// All the [`RowId`] in this chunk.
///
/// This could be in any order if this chunk is unsorted.
#[inline]
pub fn row_ids(&self) -> impl Iterator<Item = RowId> + '_ {
let (times, counters) = self.row_ids_raw();
izip!(times.values(), counters.values())
.map(|(&time, &counter)| RowId::from_u128((time as u128) << 64 | (counter as u128)))
pub fn row_ids(&self) -> impl ExactSizeIterator<Item = RowId> + '_ {
self.row_ids_slice().iter().copied()
}

/// Returns an iterator over the [`RowId`]s of a [`Chunk`], for a given component.
Expand Down Expand Up @@ -1195,41 +1149,20 @@ impl Chunk {
return None;
}

let (times, counters) = self.row_ids_raw();
let (times, counters) = (times.values(), counters.values());
let row_ids = self.row_ids_slice();

#[allow(clippy::unwrap_used)] // checked above
let (index_min, index_max) = if self.is_sorted() {
Some(if self.is_sorted() {
(
(
times.first().copied().unwrap(),
counters.first().copied().unwrap(),
),
(
times.last().copied().unwrap(),
counters.last().copied().unwrap(),
),
row_ids.first().copied().unwrap(),
row_ids.last().copied().unwrap(),
)
} else {
(
(
times.iter().min().copied().unwrap(),
counters.iter().min().copied().unwrap(),
),
(
times.iter().max().copied().unwrap(),
counters.iter().max().copied().unwrap(),
),
row_ids.iter().min().copied().unwrap(),
row_ids.iter().max().copied().unwrap(),
)
};

let (time_min, counter_min) = index_min;
let (time_max, counter_max) = index_max;

Some((
RowId::from_u128((time_min as u128) << 64 | (counter_min as u128)),
RowId::from_u128((time_max as u128) << 64 | (counter_max as u128)),
))
})
}

#[inline]
Expand Down
11 changes: 1 addition & 10 deletions crates/store/re_chunk/src/iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -702,16 +702,7 @@ impl Iterator for ChunkIndicesIter {
let i = self.index;
self.index += 1;

let row_id = {
let (times, incs) = self.chunk.row_ids_raw();
let times = times.values();
let incs = incs.values();

let time = *times.get(i)?;
let inc = *incs.get(i)?;

RowId::from_u128(((time as u128) << 64) | (inc as u128))
};
let row_id = *self.chunk.row_ids_slice().get(i)?;

if let Some(time_column) = &self.time_column {
let time = *time_column.times_raw().get(i)?;
Expand Down
4 changes: 2 additions & 2 deletions crates/store/re_chunk/src/merge.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use arrow::array::StructArray as ArrowStructArray;
use arrow::array::FixedSizeBinaryArray;
use arrow::array::{Array as ArrowArray, ListArray as ArrowListArray};
use arrow::buffer::ScalarBuffer as ArrowScalarBuffer;
use itertools::{izip, Itertools};
Expand Down Expand Up @@ -51,7 +51,7 @@ impl Chunk {
#[allow(clippy::unwrap_used)]
// concatenating 2 RowId arrays must yield another RowId array
row_ids
.downcast_array_ref::<ArrowStructArray>()
.downcast_array_ref::<FixedSizeBinaryArray>()
.unwrap()
.clone()
};
Expand Down
28 changes: 8 additions & 20 deletions crates/store/re_chunk/src/shuffle.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
use std::sync::Arc;

use arrow::{
array::{
Array as ArrowArray, ListArray as ArrowListArray, StructArray as ArrowStructArray,
UInt64Array as ArrowUInt64Array,
},
array::{Array as ArrowArray, ListArray as ArrowListArray},
buffer::{OffsetBuffer as ArrowOffsets, ScalarBuffer as ArrowScalarBuffer},
};
use itertools::Itertools as _;

use re_log_types::Timeline;

use crate::{Chunk, TimeColumn};
Expand Down Expand Up @@ -87,7 +83,7 @@ impl Chunk {

let swaps = {
re_tracing::profile_scope!("swaps");
let row_ids = self.row_ids().collect_vec();
let row_ids = self.row_ids_slice();
let mut swaps = (0..row_ids.len()).collect::<Vec<_>>();
swaps.sort_by_key(|&i| row_ids[i]);
swaps
Expand Down Expand Up @@ -134,7 +130,7 @@ impl Chunk {

let swaps = {
re_tracing::profile_scope!("swaps");
let row_ids = chunk.row_ids().collect_vec();
let row_ids = chunk.row_ids_slice();
let times = time_column.times_raw().to_vec();
let mut swaps = (0..times.len()).collect::<Vec<_>>();
swaps.sort_by_key(|&i| (times[i], row_ids[i]));
Expand Down Expand Up @@ -204,21 +200,13 @@ impl Chunk {
{
re_tracing::profile_scope!("row ids");

let (times, counters) = self.row_ids_raw();
let (times, counters) = (times.values(), counters.values());
let row_ids = self.row_ids_slice();

let mut sorted_times = times.to_vec();
let mut sorted_counters = counters.to_vec();
let mut sorted_row_ids = row_ids.to_vec();
for (to, from) in swaps.iter().copied().enumerate() {
sorted_times[to] = times[from];
sorted_counters[to] = counters[from];
sorted_row_ids[to] = row_ids[from];
}

let times = Arc::new(ArrowUInt64Array::from(sorted_times));
let counters = Arc::new(ArrowUInt64Array::from(sorted_counters));

self.row_ids =
ArrowStructArray::new(self.row_ids.fields().clone(), vec![times, counters], None);
self.row_ids = re_types_core::RowId::arrow_from_slice(&sorted_row_ids);
}

let Self {
Expand Down
24 changes: 5 additions & 19 deletions crates/store/re_chunk/src/slice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,23 +32,9 @@ impl Chunk {
.and_then(|per_desc| per_desc.get(component_desc))?;

if self.is_sorted() {
let row_id_128 = row_id.as_u128();
let row_id_time_ns = (row_id_128 >> 64) as u64;
let row_id_inc = (row_id_128 & (!0 >> 64)) as u64;

let (times, incs) = self.row_ids_raw();
let times = times.values();
let incs = incs.values();

let mut index = times.partition_point(|&time| time < row_id_time_ns);
while index < incs.len() && incs[index] < row_id_inc {
index += 1;
}

let found_it =
times.get(index) == Some(&row_id_time_ns) && incs.get(index) == Some(&row_id_inc);

(found_it && list_array.is_valid(index)).then(|| list_array.value(index))
let row_ids = self.row_ids_slice();
let index = row_ids.binary_search(&row_id).ok()?;
list_array.is_valid(index).then(|| list_array.value(index))
} else {
self.row_ids()
.find_position(|id| *id == row_id)
Expand Down Expand Up @@ -427,7 +413,7 @@ impl Chunk {
entity_path,
heap_size_bytes: _,
is_sorted: _,
row_ids,
row_ids: _,
timelines,
components,
} = self;
Expand All @@ -439,7 +425,7 @@ impl Chunk {
entity_path: entity_path.clone(),
heap_size_bytes: Default::default(),
is_sorted: true,
row_ids: arrow::array::StructBuilder::from_fields(row_ids.fields().clone(), 0).finish(),
row_ids: RowId::arrow_from_slice(&[]),
timelines: timelines
.iter()
.map(|(&timeline, time_column)| (timeline, time_column.emptied()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ ChunkStore {
│ ┌──────────────────────────────────┬────────────────────────┬───────────────────────────────┬──────────────────────────────┬──────────────────────────────┐ │
│ │ RowIdframe_nrlog_timeexample.MyColorexample.MyIndex │ │
│ │ --------------- │ │
│ │ type: "Struct[2]" type: "i64"type: "Timestamp(ns)"type: "List[u32]"type: "List[u64]" │ │
│ │ type: "FixedSizeBinary[16]"type: "i64"type: "Timestamp(ns)"type: "List[u32]"type: "List[u64]" │ │
│ │ ARROW:extension:name: "TUID"index_name: "frame_nr"index_name: "log_time"component: "example.MyColor"component: "example.MyIndex" │ │
│ │ kind: "control"is_sorted: "true"is_sorted: "true"kind: "data"kind: "data" │ │
│ │ ┆ kind: "index"kind: "index" ┆ ┆ │ │
Expand Down
23 changes: 5 additions & 18 deletions crates/store/re_dataframe/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -925,30 +925,15 @@ impl<E: StorageEngineLike> QueryHandle<E> {
.timelines()
.get(&state.filtered_index)
.map_or(cur_index_times_empty, |time_column| time_column.times_raw());
let cur_index_row_ids = cur_chunk.row_ids_raw();

// NOTE: "Deserializing" everything into a native vec is way too much for rustc to
// follow and doesn't get optimized at all -- we have to work with raw arrow data
// all the way, so this gets a bit complicated.
let cur_index_row_id_at = |at: usize| {
let (times, incs) = cur_index_row_ids;

let times = times.values();
let incs = incs.values();

let time = *times.get(at)?;
let inc = *incs.get(at)?;

Some(RowId::from_u128(((time as u128) << 64) | (inc as u128)))
};
let cur_index_row_ids = cur_chunk.row_ids_slice();

let (index_value, cur_row_id) = 'walk: loop {
let (Some(mut index_value), Some(mut cur_row_id)) = (
cur_index_times
.get(cur_cursor_value as usize)
.copied()
.map(TimeInt::new_temporal),
cur_index_row_id_at(cur_cursor_value as usize),
cur_index_row_ids.get(cur_cursor_value as usize).copied(),
) else {
continue 'overlaps;
};
Expand All @@ -962,7 +947,9 @@ impl<E: StorageEngineLike> QueryHandle<E> {
.get(cur_cursor_value as usize + 1)
.copied()
.map(TimeInt::new_temporal),
cur_index_row_id_at(cur_cursor_value as usize + 1),
cur_index_row_ids
.get(cur_cursor_value as usize + 1)
.copied(),
) {
if next_index_value == *cur_index_value {
index_value = next_index_value;
Expand Down
Loading

0 comments on commit 7995dd4

Please sign in to comment.