From 535d8c29f9152c99d0d0dfd9cfa6b706ca0661af Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Fri, 14 Feb 2025 12:11:23 +0100 Subject: [PATCH] Encode wether or not a chunk is sorted by `RowId` in the column metadata * Part of https://github.com/rerun-io/rerun/issues/8744 It used to be encoded in the `RecrodBatch` metadata. It makes more sense to encode it on the column metadata though, and brings the RowId column closer to the time columns (I hope to unify the two in the near future). --- crates/store/re_chunk/src/transport.rs | 27 +++----- crates/store/re_chunk_store/src/dataframe.rs | 2 +- .../formatting__format_chunk_store.snap | 5 +- crates/store/re_sorbet/src/chunk_schema.rs | 13 ---- .../re_sorbet/src/index_column_descriptor.rs | 2 + .../re_sorbet/src/row_id_column_descriptor.rs | 68 +++++++++++-------- crates/store/re_sorbet/src/sorbet_batch.rs | 6 -- crates/store/re_sorbet/src/sorbet_schema.rs | 15 +--- 8 files changed, 56 insertions(+), 82 deletions(-) diff --git a/crates/store/re_chunk/src/transport.rs b/crates/store/re_chunk/src/transport.rs index 826a4bf52bde..3e4d24762a2f 100644 --- a/crates/store/re_chunk/src/transport.rs +++ b/crates/store/re_chunk/src/transport.rs @@ -6,9 +6,9 @@ use nohash_hasher::IntMap; use re_arrow_util::{into_arrow_ref, ArrowArrayDowncastRef as _}; use re_byte_size::SizeBytes as _; -use re_types_core::{arrow_helpers::as_array_ref, ComponentDescriptor, Loggable as _}; +use re_types_core::{arrow_helpers::as_array_ref, ComponentDescriptor}; -use crate::{chunk::ChunkComponents, Chunk, ChunkError, ChunkResult, RowId, TimeColumn}; +use crate::{chunk::ChunkComponents, Chunk, ChunkError, ChunkResult, TimeColumn}; // --- @@ -45,7 +45,9 @@ impl Chunk { components, } = self; - let row_id_schema = re_sorbet::RowIdColumnDescriptor::try_from(RowId::arrow_datatype())?; + let row_id_schema = re_sorbet::RowIdColumnDescriptor { + is_sorted: *is_sorted, + }; let (index_schemas, index_arrays): (Vec<_>, Vec<_>) = { re_tracing::profile_scope!("timelines"); @@ -124,8 +126,7 @@ impl Chunk { index_schemas, data_schemas, ) - .with_heap_size_bytes(heap_size_bytes) - .with_sorted(*is_sorted); + .with_heap_size_bytes(heap_size_bytes); Ok(re_sorbet::ChunkBatch::try_new( schema, @@ -151,13 +152,6 @@ impl Chunk { batch.num_rows() )); - // Metadata - let (id, entity_path, is_sorted) = ( - batch.chunk_id(), - batch.entity_path().clone(), - batch.is_sorted(), - ); - let row_ids = batch.row_id_column().1.clone(); let timelines = { @@ -224,10 +218,11 @@ impl Chunk { components }; + let is_sorted_by_row_id = batch.chunk_schema().row_id_column().is_sorted; let mut res = Self::new( - id, - entity_path, - is_sorted.then_some(true), + batch.chunk_id(), + batch.entity_path().clone(), + is_sorted_by_row_id.then_some(true), row_ids, timelines, components, @@ -277,7 +272,7 @@ mod tests { example_components::{MyColor, MyPoint}, EntityPath, Timeline, }; - use re_types_core::{ChunkId, Component as _}; + use re_types_core::{ChunkId, Component as _, Loggable as _, RowId}; use super::*; diff --git a/crates/store/re_chunk_store/src/dataframe.rs b/crates/store/re_chunk_store/src/dataframe.rs index 9a4737b56055..0a20fade621c 100644 --- a/crates/store/re_chunk_store/src/dataframe.rs +++ b/crates/store/re_chunk_store/src/dataframe.rs @@ -433,7 +433,7 @@ impl ChunkStore { .tap_mut(|components| components.sort()); SorbetColumnDescriptors { - row_id: Some(re_sorbet::RowIdColumnDescriptor::new()), + row_id: Some(re_sorbet::RowIdColumnDescriptor { is_sorted: false }), indices, components, } diff --git a/crates/store/re_chunk_store/tests/snapshots/formatting__format_chunk_store.snap b/crates/store/re_chunk_store/tests/snapshots/formatting__format_chunk_store.snap index dfebca01c7e9..ccd7ab92251e 100644 --- a/crates/store/re_chunk_store/tests/snapshots/formatting__format_chunk_store.snap +++ b/crates/store/re_chunk_store/tests/snapshots/formatting__format_chunk_store.snap @@ -17,7 +17,6 @@ ChunkStore { │ * entity_path: "/this/that" │ │ * heap_size_bytes: "1072" │ │ * id: "661EFDF2E3B19F7C045F15" │ - │ * is_sorted: "true" │ │ * version: "1" │ ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ │ ┌──────────────────────────────────┬────────────────────────┬───────────────────────────────┬──────────────────────────────┬──────────────────────────────┐ │ @@ -25,8 +24,8 @@ ChunkStore { │ │ --- ┆ --- ┆ --- ┆ --- ┆ --- │ │ │ │ type: "FixedSizeBinary[16]" ┆ type: "i64" ┆ type: "Timestamp(ns)" ┆ type: "List[u32]" ┆ type: "List[u64]" │ │ │ │ ARROW:extension:name: "TUID" ┆ index_name: "frame_nr" ┆ index_name: "log_time" ┆ component: "example.MyColor" ┆ component: "example.MyIndex" │ │ - │ │ kind: "control" ┆ is_sorted: "true" ┆ is_sorted: "true" ┆ kind: "data" ┆ kind: "data" │ │ - │ │ ┆ kind: "index" ┆ kind: "index" ┆ ┆ │ │ + │ │ is_sorted: "true" ┆ is_sorted: "true" ┆ is_sorted: "true" ┆ kind: "data" ┆ kind: "data" │ │ + │ │ kind: "control" ┆ kind: "index" ┆ kind: "index" ┆ ┆ │ │ │ ╞══════════════════════════════════╪════════════════════════╪═══════════════════════════════╪══════════════════════════════╪══════════════════════════════╡ │ │ │ 0000000067816A6Bb4b8c1254d40007b ┆ 1 ┆ 2025-01-10T18:43:42.123456789 ┆ [0, 1, 2] ┆ [0, 1, 2] │ │ │ └──────────────────────────────────┴────────────────────────┴───────────────────────────────┴──────────────────────────────┴──────────────────────────────┘ │ diff --git a/crates/store/re_sorbet/src/chunk_schema.rs b/crates/store/re_sorbet/src/chunk_schema.rs index 32e40f79e5f6..eaba85a8b1cd 100644 --- a/crates/store/re_sorbet/src/chunk_schema.rs +++ b/crates/store/re_sorbet/src/chunk_schema.rs @@ -60,7 +60,6 @@ impl ChunkSchema { chunk_id: Some(chunk_id), entity_path: Some(entity_path.clone()), heap_size_bytes: None, - is_sorted: false, // assume the worst }, row_id, chunk_id, @@ -73,12 +72,6 @@ impl ChunkSchema { self.sorbet.heap_size_bytes = Some(heap_size_bytes); self } - - #[inline] - pub fn with_sorted(mut self, sorted: bool) -> Self { - self.sorbet.is_sorted = sorted; - self - } } /// ## Accessors @@ -101,12 +94,6 @@ impl ChunkSchema { self.sorbet.heap_size_bytes } - /// Are we sorted by the row id column? - #[inline] - pub fn is_sorted(&self) -> bool { - self.sorbet.is_sorted - } - /// Total number of columns in this chunk, /// including the row id column, the index columns, /// and the data columns. diff --git a/crates/store/re_sorbet/src/index_column_descriptor.rs b/crates/store/re_sorbet/src/index_column_descriptor.rs index 1a39ea0f4387..ddabc813ec63 100644 --- a/crates/store/re_sorbet/src/index_column_descriptor.rs +++ b/crates/store/re_sorbet/src/index_column_descriptor.rs @@ -20,6 +20,8 @@ pub struct IndexColumnDescriptor { pub datatype: ArrowDatatype, /// Are the indices in this column sorted? + /// + /// `false` means either "unsorted" or "unknown". pub is_sorted: bool, } diff --git a/crates/store/re_sorbet/src/row_id_column_descriptor.rs b/crates/store/re_sorbet/src/row_id_column_descriptor.rs index d772d03b87fe..4ec3fd3cd776 100644 --- a/crates/store/re_sorbet/src/row_id_column_descriptor.rs +++ b/crates/store/re_sorbet/src/row_id_column_descriptor.rs @@ -1,6 +1,8 @@ use arrow::datatypes::{DataType as ArrowDatatype, Field as ArrowField}; use re_types_core::{Component as _, Loggable as _, RowId}; +use crate::MetadataExt as _; + #[derive(thiserror::Error, Debug)] #[error("Wrong datatype. Expected {expected:?}, got {actual:?}")] pub struct WrongDatatypeError { @@ -25,30 +27,34 @@ impl WrongDatatypeError { } /// Describes the schema of the primary [`RowId`] column. -#[derive(Clone, Debug, Default, PartialEq, Eq, Hash, PartialOrd, Ord)] -pub struct RowIdColumnDescriptor {} +#[derive(Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)] +pub struct RowIdColumnDescriptor { + /// Are the values in this column sorted? + /// + /// `false` means either "unsorted" or "unknown". + pub is_sorted: bool, +} impl RowIdColumnDescriptor { #[inline] - pub fn new() -> Self { - Self {} + pub fn from_sorted(is_sorted: bool) -> Self { + Self { is_sorted } } #[inline] pub fn to_arrow_field(&self) -> ArrowField { - let Self {} = self; + let Self { is_sorted } = self; - let metadata = [ - Some(("rerun.kind".to_owned(), "control".to_owned())), - // This ensures the RowId/Tuid is formatted correctly: - Some(( + let mut metadata = std::collections::HashMap::from([ + ("rerun.kind".to_owned(), "control".to_owned()), + ( "ARROW:extension:name".to_owned(), re_tuid::Tuid::ARROW_EXTENSION_NAME.to_owned(), - )), - ] - .into_iter() - .flatten() - .collect(); + ), + ]); + if *is_sorted { + metadata.insert("rerun.is_sorted".to_owned(), "true".to_owned()); + } let nullable = false; // All rows has an id ArrowField::new( @@ -69,24 +75,28 @@ impl TryFrom<&ArrowField> for RowIdColumnDescriptor { type Error = WrongDatatypeError; fn try_from(field: &ArrowField) -> Result { - Self::try_from(field.data_type()) + // Self::try_from(field.data_type()) + WrongDatatypeError::compare_expected_actual(&RowId::arrow_datatype(), field.data_type())?; + Ok(Self { + is_sorted: field.metadata().get_bool("rerun.is_sorted"), + }) } } -impl TryFrom<&ArrowDatatype> for RowIdColumnDescriptor { - type Error = WrongDatatypeError; +// impl TryFrom<&ArrowDatatype> for RowIdColumnDescriptor { +// type Error = WrongDatatypeError; - fn try_from(data_type: &ArrowDatatype) -> Result { - WrongDatatypeError::compare_expected_actual(&RowId::arrow_datatype(), data_type)?; - Ok(Self {}) - } -} +// fn try_from(data_type: &ArrowDatatype) -> Result { +// WrongDatatypeError::compare_expected_actual(&RowId::arrow_datatype(), data_type)?; +// Ok(Self {}) +// } +// } -impl TryFrom for RowIdColumnDescriptor { - type Error = WrongDatatypeError; +// impl TryFrom for RowIdColumnDescriptor { +// type Error = WrongDatatypeError; - fn try_from(data_type: ArrowDatatype) -> Result { - WrongDatatypeError::compare_expected_actual(&RowId::arrow_datatype(), &data_type)?; - Ok(Self {}) - } -} +// fn try_from(data_type: ArrowDatatype) -> Result { +// WrongDatatypeError::compare_expected_actual(&RowId::arrow_datatype(), &data_type)?; +// Ok(Self {}) +// } +// } diff --git a/crates/store/re_sorbet/src/sorbet_batch.rs b/crates/store/re_sorbet/src/sorbet_batch.rs index dba328b52fa8..9c8f7924a623 100644 --- a/crates/store/re_sorbet/src/sorbet_batch.rs +++ b/crates/store/re_sorbet/src/sorbet_batch.rs @@ -54,12 +54,6 @@ impl SorbetBatch { self.schema.heap_size_bytes } - /// Are we sorted by the row id column? - #[inline] - pub fn is_sorted(&self) -> bool { - self.schema.is_sorted - } - #[inline] pub fn fields(&self) -> &ArrowFields { &self.schema_ref().fields diff --git a/crates/store/re_sorbet/src/sorbet_schema.rs b/crates/store/re_sorbet/src/sorbet_schema.rs index 6d0bc47ff333..099960e2819e 100644 --- a/crates/store/re_sorbet/src/sorbet_schema.rs +++ b/crates/store/re_sorbet/src/sorbet_schema.rs @@ -3,7 +3,7 @@ use arrow::datatypes::Schema as ArrowSchema; use re_log_types::EntityPath; use re_types_core::ChunkId; -use crate::{ArrowBatchMetadata, MetadataExt as _, SorbetColumnDescriptors, SorbetError}; +use crate::{ArrowBatchMetadata, SorbetColumnDescriptors, SorbetError}; // ---------------------------------------------------------------------------- @@ -24,9 +24,6 @@ pub struct SorbetSchema { /// The heap size of this batch in bytes, if known. pub heap_size_bytes: Option, - - /// Are we sorted by the row id column? - pub is_sorted: bool, // TODO(emilk): move to `RowIdColumnDescriptor`. } /// ## Metadata keys for the record batch metadata @@ -45,12 +42,6 @@ impl SorbetSchema { self } - #[inline] - pub fn with_sorted(mut self, sorted_by_row_id: bool) -> Self { - self.is_sorted = sorted_by_row_id; - self - } - pub fn chunk_id_metadata(chunk_id: &ChunkId) -> (String, String) { ("rerun.id".to_owned(), format!("{:X}", chunk_id.as_u128())) } @@ -65,7 +56,6 @@ impl SorbetSchema { chunk_id, entity_path, heap_size_bytes, - is_sorted, } = self; [ @@ -81,7 +71,6 @@ impl SorbetSchema { heap_size_bytes.to_string(), ) }), - is_sorted.then(|| ("rerun.is_sorted".to_owned(), "true".to_owned())), ] .into_iter() .flatten() @@ -127,7 +116,6 @@ impl TryFrom<&ArrowSchema> for SorbetSchema { None }; - let sorted_by_row_id = metadata.get_bool("rerun.is_sorted"); let heap_size_bytes = if let Some(heap_size_bytes) = metadata.get("rerun.heap_size_bytes") { heap_size_bytes .parse() @@ -156,7 +144,6 @@ impl TryFrom<&ArrowSchema> for SorbetSchema { chunk_id, entity_path, heap_size_bytes, - is_sorted: sorted_by_row_id, }) } }