diff --git a/crates/store/re_chunk/src/chunk.rs b/crates/store/re_chunk/src/chunk.rs index 34b99c8d20e1..983a09378b18 100644 --- a/crates/store/re_chunk/src/chunk.rs +++ b/crates/store/re_chunk/src/chunk.rs @@ -56,7 +56,7 @@ pub enum ChunkError { MismatchedChunkSchemaError(#[from] re_sorbet::MismatchedChunkSchemaError), #[error(transparent)] - InvalidChunkSchema(#[from] re_sorbet::InvalidChunkSchema), + InvalidChunkSchema(#[from] re_sorbet::InvalidSorbetSchema), } pub type ChunkResult = Result; diff --git a/crates/store/re_grpc_client/src/lib.rs b/crates/store/re_grpc_client/src/lib.rs index 38511945d746..b2a2991ef762 100644 --- a/crates/store/re_grpc_client/src/lib.rs +++ b/crates/store/re_grpc_client/src/lib.rs @@ -57,7 +57,7 @@ pub enum StreamError { InvalidUri(String), #[error(transparent)] - InvalidChunkSchema(#[from] re_sorbet::InvalidChunkSchema), + InvalidChunkSchema(#[from] re_sorbet::InvalidSorbetSchema), } #[cfg(target_arch = "wasm32")] diff --git a/crates/store/re_grpc_client/src/redap/mod.rs b/crates/store/re_grpc_client/src/redap/mod.rs index 3d8a9b8e23e9..66dfc8fb9628 100644 --- a/crates/store/re_grpc_client/src/redap/mod.rs +++ b/crates/store/re_grpc_client/src/redap/mod.rs @@ -335,8 +335,8 @@ async fn stream_catalog_async( let mut metadata = record_batch.schema_ref().metadata.clone(); for (key, value) in [ - re_sorbet::ChunkSchema::chunk_id_metadata(&ChunkId::new()), - re_sorbet::ChunkSchema::entity_path_metadata(&entity_path), + re_sorbet::SorbetSchema::chunk_id_metadata(&ChunkId::new()), + re_sorbet::SorbetSchema::entity_path_metadata(&entity_path), ] { metadata.entry(key).or_insert(value); } diff --git a/crates/store/re_sorbet/src/chunk_batch.rs b/crates/store/re_sorbet/src/chunk_batch.rs index fc7398d97f38..37bd6040d03b 100644 --- a/crates/store/re_sorbet/src/chunk_batch.rs +++ b/crates/store/re_sorbet/src/chunk_batch.rs @@ -11,8 +11,8 @@ use re_log_types::EntityPath; use re_types_core::ChunkId; use crate::{ - chunk_schema::InvalidChunkSchema, ArrowBatchMetadata, ChunkSchema, ComponentColumnDescriptor, - IndexColumnDescriptor, RowIdColumnDescriptor, WrongDatatypeError, + ArrowBatchMetadata, ChunkSchema, ComponentColumnDescriptor, IndexColumnDescriptor, + InvalidSorbetSchema, RowIdColumnDescriptor, WrongDatatypeError, }; #[derive(thiserror::Error, Debug)] @@ -52,18 +52,18 @@ impl ChunkBatch { let row_count = row_ids.len(); WrongDatatypeError::compare_expected_actual( - &schema.row_id_column.datatype(), + &schema.row_id_column().datatype(), row_ids.data_type(), )?; - if index_arrays.len() != schema.index_columns.len() { + if index_arrays.len() != schema.index_columns().len() { return Err(MismatchedChunkSchemaError::custom(format!( "Schema had {} index columns, but got {}", - schema.index_columns.len(), + schema.index_columns().len(), index_arrays.len() ))); } - for (schema, array) in itertools::izip!(&schema.index_columns, &index_arrays) { + for (schema, array) in itertools::izip!(schema.index_columns(), &index_arrays) { WrongDatatypeError::compare_expected_actual(schema.datatype(), array.data_type())?; if array.len() != row_count { return Err(MismatchedChunkSchemaError::custom(format!( @@ -75,14 +75,14 @@ impl ChunkBatch { } } - if data_arrays.len() != schema.data_columns.len() { + if data_arrays.len() != schema.data_columns().len() { return Err(MismatchedChunkSchemaError::custom(format!( "Schema had {} data columns, but got {}", - schema.data_columns.len(), + schema.data_columns().len(), data_arrays.len() ))); } - for (schema, array) in itertools::izip!(&schema.data_columns, &data_arrays) { + for (schema, array) in itertools::izip!(schema.data_columns(), &data_arrays) { WrongDatatypeError::compare_expected_actual(&schema.store_datatype, array.data_type())?; if array.len() != row_count { return Err(MismatchedChunkSchemaError::custom(format!( @@ -155,7 +155,7 @@ impl ChunkBatch { pub fn row_id_column(&self) -> (&RowIdColumnDescriptor, &ArrowStructArray) { // The first column is always the row IDs. ( - &self.schema.row_id_column, + self.schema.row_id_column(), self.batch.columns()[0] .as_struct_opt() .expect("Row IDs should be encoded as struct"), @@ -165,7 +165,7 @@ impl ChunkBatch { /// The columns of the indices (timelines). pub fn index_columns(&self) -> impl Iterator { itertools::izip!( - &self.schema.index_columns, + self.schema.index_columns(), self.batch.columns().iter().skip(1) // skip row IDs ) } @@ -175,11 +175,11 @@ impl ChunkBatch { &self, ) -> impl Iterator { itertools::izip!( - &self.schema.data_columns, + self.schema.data_columns(), self.batch .columns() .iter() - .skip(1 + self.schema.index_columns.len()) // skip row IDs and indices + .skip(1 + self.schema.index_columns().len()) // skip row IDs and indices ) } } @@ -222,7 +222,7 @@ impl From<&ChunkBatch> for ArrowRecordBatch { } impl TryFrom<&ArrowRecordBatch> for ChunkBatch { - type Error = InvalidChunkSchema; + type Error = InvalidSorbetSchema; /// Will automatically wrap data columns in `ListArrays` if they are not already. fn try_from(batch: &ArrowRecordBatch) -> Result { diff --git a/crates/store/re_sorbet/src/chunk_schema.rs b/crates/store/re_sorbet/src/chunk_schema.rs index d52d4bcec2ff..fb4517e269a1 100644 --- a/crates/store/re_sorbet/src/chunk_schema.rs +++ b/crates/store/re_sorbet/src/chunk_schema.rs @@ -1,79 +1,26 @@ use arrow::datatypes::{Field as ArrowField, Schema as ArrowSchema}; -use itertools::Itertools as _; use re_log_types::EntityPath; use re_types_core::ChunkId; use crate::{ - ArrowBatchMetadata, ColumnDescriptor, ColumnError, ComponentColumnDescriptor, - IndexColumnDescriptor, MetadataExt as _, MissingMetadataKey, RowIdColumnDescriptor, - WrongDatatypeError, + ArrowBatchMetadata, ComponentColumnDescriptor, IndexColumnDescriptor, InvalidSorbetSchema, + RowIdColumnDescriptor, SorbetColumnDescriptors, SorbetSchema, }; -#[derive(thiserror::Error, Debug)] -pub enum InvalidChunkSchema { - #[error(transparent)] - MissingMetadataKey(#[from] MissingMetadataKey), - - #[error("Bad RowId columns: {0}")] - BadRowIdColumn(WrongDatatypeError), - - #[error("Bad column '{field_name}': {error}")] - BadColumn { - field_name: String, - error: ColumnError, - }, - - #[error("Bad chunk schema: {reason}")] - Custom { reason: String }, - - #[error("The data columns were not the last columns. Index columns must come before any data columns.")] - UnorderedIndexAndDataColumns, -} - -impl InvalidChunkSchema { - fn custom(reason: impl Into) -> Self { - Self::Custom { - reason: reason.into(), - } - } -} - /// The parsed schema of a Rerun chunk, i.e. multiple columns of data for a single entity. /// /// This does NOT preserve custom arrow metadata. /// It only contains the metadata used by Rerun. #[derive(Debug, Clone, PartialEq, Eq)] pub struct ChunkSchema { - /// The globally unique ID of this chunk. - pub chunk_id: ChunkId, + sorbet: SorbetSchema, - /// Which entity is this chunk for? + // Some things here are also in [`SorbetSchema]`, but are duplicated + // here because they are non-optional: + pub row_id: RowIdColumnDescriptor, + pub chunk_id: ChunkId, pub entity_path: EntityPath, - - /// The heap size of this chunk in bytes, if known. - pub heap_size_bytes: Option, - - /// Are we sorted by the row id column? - pub is_sorted: bool, - - /// The primary row id column. - pub row_id_column: RowIdColumnDescriptor, - - /// Index columns (timelines). - pub index_columns: Vec, - - /// The actual component data - pub data_columns: Vec, -} - -/// ## Metadata keys for the record batch metadata -impl ChunkSchema { - /// The key used to identify the version of the Rerun schema. - const CHUNK_METADATA_KEY_VERSION: &'static str = "rerun.version"; - - /// The version of the Rerun schema. - const CHUNK_METADATA_VERSION: &'static str = "1"; } /// ## Builders @@ -81,30 +28,37 @@ impl ChunkSchema { pub fn new( chunk_id: ChunkId, entity_path: EntityPath, - row_id_column: RowIdColumnDescriptor, - index_columns: Vec, - data_columns: Vec, + row_id: RowIdColumnDescriptor, + indices: Vec, + components: Vec, ) -> Self { Self { + sorbet: SorbetSchema { + columns: SorbetColumnDescriptors { + row_id: Some(row_id.clone()), + indices, + components, + }, + chunk_id: Some(chunk_id), + entity_path: Some(entity_path.clone()), + heap_size_bytes: None, + is_sorted: false, // assume the worst + }, + row_id, chunk_id, entity_path, - heap_size_bytes: None, - is_sorted: false, // assume the worst - row_id_column, - index_columns, - data_columns, } } #[inline] pub fn with_heap_size_bytes(mut self, heap_size_bytes: u64) -> Self { - self.heap_size_bytes = Some(heap_size_bytes); + self.sorbet.heap_size_bytes = Some(heap_size_bytes); self } #[inline] pub fn with_sorted(mut self, sorted: bool) -> Self { - self.is_sorted = sorted; + self.sorbet.is_sorted = sorted; self } } @@ -126,78 +80,43 @@ impl ChunkSchema { /// The heap size of this chunk in bytes, if known. #[inline] pub fn heap_size_bytes(&self) -> Option { - self.heap_size_bytes + self.sorbet.heap_size_bytes } /// Are we sorted by the row id column? #[inline] pub fn is_sorted(&self) -> bool { - self.is_sorted + self.sorbet.is_sorted } /// Total number of columns in this chunk, /// including the row id column, the index columns, /// and the data columns. pub fn num_columns(&self) -> usize { - 1 + self.index_columns.len() + self.data_columns.len() + self.sorbet.columns.num_columns() } - pub fn chunk_id_metadata(chunk_id: &ChunkId) -> (String, String) { - ("rerun.id".to_owned(), format!("{:X}", chunk_id.as_u128())) + #[inline] + pub fn row_id_column(&self) -> &RowIdColumnDescriptor { + &self.row_id } - pub fn entity_path_metadata(entity_path: &EntityPath) -> (String, String) { - ("rerun.entity_path".to_owned(), entity_path.to_string()) + #[inline] + pub fn index_columns(&self) -> &[IndexColumnDescriptor] { + &self.sorbet.columns.indices } - pub fn arrow_batch_metadata(&self) -> ArrowBatchMetadata { - let Self { - chunk_id, - entity_path, - heap_size_bytes, - is_sorted, - row_id_column: _, - index_columns: _, - data_columns: _, - } = self; - - let mut arrow_metadata = ArrowBatchMetadata::from([ - ( - Self::CHUNK_METADATA_KEY_VERSION.to_owned(), - Self::CHUNK_METADATA_VERSION.to_owned(), - ), - Self::chunk_id_metadata(chunk_id), - Self::entity_path_metadata(entity_path), - ]); - if let Some(heap_size_bytes) = heap_size_bytes { - arrow_metadata.insert( - "rerun.heap_size_bytes".to_owned(), - heap_size_bytes.to_string(), - ); - } - if *is_sorted { - arrow_metadata.insert("rerun.is_sorted".to_owned(), "true".to_owned()); - } + #[inline] + pub fn data_columns(&self) -> &[ComponentColumnDescriptor] { + &self.sorbet.columns.components + } - arrow_metadata + pub fn arrow_batch_metadata(&self) -> ArrowBatchMetadata { + self.sorbet.arrow_batch_metadata() } pub fn arrow_fields(&self) -> Vec { - let Self { - row_id_column, - index_columns, - data_columns, - .. - } = self; - let mut fields: Vec = Vec::with_capacity(self.num_columns()); - fields.push(row_id_column.to_arrow_field()); - fields.extend(index_columns.iter().map(|column| column.to_arrow_field())); - fields.extend( - data_columns - .iter() - .map(|column| column.to_arrow_field(crate::BatchType::Chunk)), - ); - fields + self.sorbet.columns.arrow_fields() } } @@ -211,98 +130,26 @@ impl From<&ChunkSchema> for ArrowSchema { } impl TryFrom<&ArrowSchema> for ChunkSchema { - type Error = InvalidChunkSchema; + type Error = InvalidSorbetSchema; fn try_from(arrow_schema: &ArrowSchema) -> Result { - let ArrowSchema { metadata, fields } = arrow_schema; - - let chunk_id = { - let chunk_id_str = metadata.get_or_err("rerun.id")?; - chunk_id_str.parse().map_err(|err| { - InvalidChunkSchema::custom(format!( - "Failed to deserialize chunk id {chunk_id_str:?}: {err}" - )) - })? - }; - - let entity_path = EntityPath::parse_forgiving(metadata.get_or_err("rerun.entity_path")?); - let is_sorted = metadata.get_bool("rerun.is_sorted"); - let heap_size_bytes = if let Some(heap_size_bytes) = metadata.get("rerun.heap_size_bytes") { - heap_size_bytes - .parse() - .map_err(|err| { - re_log::warn_once!( - "Failed to parse heap_size_bytes {heap_size_bytes:?} in chunk: {err}" - ); - }) - .ok() - } else { - None - }; - - // Verify version - if let Some(batch_version) = metadata.get(Self::CHUNK_METADATA_KEY_VERSION) { - if batch_version != Self::CHUNK_METADATA_VERSION { - re_log::warn_once!( - "ChunkSchema version mismatch. Expected {:?}, got {batch_version:?}", - Self::CHUNK_METADATA_VERSION - ); - } - } - - // The first field must be the row id column: - let Some(first_field) = fields.first() else { - return Err(InvalidChunkSchema::custom("No fields in schema")); - }; - - let row_id_column = RowIdColumnDescriptor::try_from(first_field.as_ref()) - .map_err(InvalidChunkSchema::BadRowIdColumn)?; - - let index_and_data_columns: Result, _> = fields - .iter() - .skip(1) - .map(|field| { - ColumnDescriptor::try_from_arrow_field(Some(&entity_path), field.as_ref()).map_err( - |err| InvalidChunkSchema::BadColumn { - field_name: field.name().to_owned(), - error: err, - }, - ) - }) - .collect(); - let index_and_data_columns = index_and_data_columns?; - - // Index columns should always come first: - let num_index_columns = - index_and_data_columns.partition_point(|p| matches!(p, ColumnDescriptor::Time(_))); - - let index_columns = index_and_data_columns[0..num_index_columns] - .iter() - .filter_map(|c| match c { - ColumnDescriptor::Time(column) => Some(column.clone()), - ColumnDescriptor::Component(_) => None, - }) - .collect_vec(); - let data_columns = index_and_data_columns[num_index_columns..] - .iter() - .filter_map(|c| match c { - ColumnDescriptor::Time(_) => None, - ColumnDescriptor::Component(column) => Some(column.clone()), - }) - .collect_vec(); - - if index_columns.len() + data_columns.len() != index_and_data_columns.len() { - return Err(InvalidChunkSchema::UnorderedIndexAndDataColumns); - } + let sorbet_schema = SorbetSchema::try_from(arrow_schema)?; Ok(Self { - chunk_id, - entity_path, - heap_size_bytes, - is_sorted, - row_id_column, - index_columns, - data_columns, + row_id: sorbet_schema + .columns + .row_id + .clone() + .ok_or_else(|| InvalidSorbetSchema::custom("Missing row_id column"))?, + chunk_id: sorbet_schema + .chunk_id + .ok_or_else(|| InvalidSorbetSchema::custom("Missing chunk_id"))?, + entity_path: sorbet_schema + .entity_path + .clone() + .ok_or_else(|| InvalidSorbetSchema::custom("Missing entity_path"))?, + + sorbet: sorbet_schema, }) } } diff --git a/crates/store/re_sorbet/src/component_column_descriptor.rs b/crates/store/re_sorbet/src/component_column_descriptor.rs index 5c7120d50a8d..ee7b9364bc76 100644 --- a/crates/store/re_sorbet/src/component_column_descriptor.rs +++ b/crates/store/re_sorbet/src/component_column_descriptor.rs @@ -20,12 +20,20 @@ pub struct ComponentColumnDescriptor { /// we introduce mono-type optimization, this might be a native type instead. pub store_datatype: ArrowDatatype, + /// Semantic name associated with this data. + /// + /// This is fully implied by `archetype_name` and `archetype_field`, but + /// included for semantic convenience. + /// + /// Example: `rerun.components.Position3D`. + pub component_name: ComponentName, + /// The path of the entity. /// /// If this column is part of a chunk batch, /// this is the same for all columns in the batch, /// and will also be set in the schema for the whole chunk. - pub entity_path: EntityPath, + pub entity_path: EntityPath, // TODO(#8744): make optional for sorbet batches /// Optional name of the `Archetype` associated with this data. /// @@ -41,14 +49,6 @@ pub struct ComponentColumnDescriptor { /// Example: `positions`. pub archetype_field_name: Option, - /// Semantic name associated with this data. - /// - /// This is fully implied by `archetype_name` and `archetype_field`, but - /// included for semantic convenience. - /// - /// Example: `rerun.components.Position3D`. - pub component_name: ComponentName, - /// Whether this column represents static data. pub is_static: bool, diff --git a/crates/store/re_sorbet/src/lib.rs b/crates/store/re_sorbet/src/lib.rs index 12c2b12cb92a..d834821b351e 100644 --- a/crates/store/re_sorbet/src/lib.rs +++ b/crates/store/re_sorbet/src/lib.rs @@ -2,10 +2,36 @@ //! //! Handles the structure of arrow record batches and their meta data for different use cases for Rerun. //! -//! An arrow record batch needs to follow a specific schema to be compatible with Rerun, +//! An arrow record batch that follows a specific schema is called a `SorbetBatch`. +//! +//! Some `SorbetBatch`es has even more constrained requirements, such as `ChunkBatch` and `DataframeBatch`. +//! * Every `ChunkBatch` is a `SorbetBatch`. +//! * Every `DataframeBatch` is a `SorbetBatch`. +//! +//! //! and that schema is defined in [`ChunkSchema`]. //! If a record batch matches the schema, it can be converted to a [`ChunkBatch`]. +/* +Sorbet record batch phylogenetic tree. +* Every ChunkBatch is a SorbetBatch. +* Every DataframeBatch is a SorbetBatch. + +SorbetBatch superset: + Optional RowId + 0-N IndexColumns + 0-N ComponentColumns + + ChunkBatch specialization: + Always has row ids + Always is of a single entity + Each `ComponentColumns` is a `ListArray` + + DataframeBatch specialization: + Always has row ids + Each component column may have a different entity +*/ + mod chunk_batch; mod chunk_schema; mod column_descriptor; @@ -14,10 +40,11 @@ mod index_column_descriptor; mod ipc; mod metadata; mod row_id_column_descriptor; +mod sorbet_schema; pub use self::{ chunk_batch::{ChunkBatch, MismatchedChunkSchemaError}, - chunk_schema::{ChunkSchema, InvalidChunkSchema}, + chunk_schema::ChunkSchema, column_descriptor::{ColumnDescriptor, ColumnError}, component_column_descriptor::ComponentColumnDescriptor, index_column_descriptor::{IndexColumnDescriptor, UnsupportedTimeType}, @@ -27,6 +54,7 @@ pub use self::{ MissingMetadataKey, }, row_id_column_descriptor::{RowIdColumnDescriptor, WrongDatatypeError}, + sorbet_schema::{ColumnKind, InvalidSorbetSchema, SorbetColumnDescriptors, SorbetSchema}, }; #[derive(Clone, Copy, Debug, PartialEq, Eq)] diff --git a/crates/store/re_sorbet/src/sorbet_schema.rs b/crates/store/re_sorbet/src/sorbet_schema.rs new file mode 100644 index 000000000000..3deaabc7d8f3 --- /dev/null +++ b/crates/store/re_sorbet/src/sorbet_schema.rs @@ -0,0 +1,313 @@ +use arrow::datatypes::{Field as ArrowField, Fields as ArrowFields, Schema as ArrowSchema}; + +use re_log_types::EntityPath; +use re_types_core::ChunkId; + +use crate::{ + ArrowBatchMetadata, ColumnError, ComponentColumnDescriptor, IndexColumnDescriptor, + MetadataExt as _, RowIdColumnDescriptor, +}; + +#[derive(thiserror::Error, Debug)] +pub enum InvalidSorbetSchema { + #[error(transparent)] + MissingMetadataKey(#[from] crate::MissingMetadataKey), + + #[error(transparent)] + MissingFieldMetadata(#[from] crate::MissingFieldMetadata), + + #[error(transparent)] + UnsupportedTimeType(#[from] crate::UnsupportedTimeType), + + #[error(transparent)] + WrongDatatypeError(#[from] crate::WrongDatatypeError), + + #[error("Bad column '{field_name}': {error}")] + BadColumn { + field_name: String, + error: ColumnError, + }, + + #[error("Bad chunk schema: {reason}")] + Custom { reason: String }, + + #[error("The data columns were not the last columns. Index columns must come before any data columns.")] + UnorderedIndexAndDataColumns, +} + +impl InvalidSorbetSchema { + pub fn custom(reason: impl Into) -> Self { + Self::Custom { + reason: reason.into(), + } + } +} + +// ---------------------------------------------------------------------------- + +pub enum ColumnKind { + RowId, + Index, + Component, +} + +impl TryFrom<&ArrowField> for ColumnKind { + type Error = InvalidSorbetSchema; + + fn try_from(fields: &ArrowField) -> Result { + let kind = fields.get_or_err("rerun.kind")?; + match kind { + "control" | "row_id" => Ok(Self::RowId), + "index" | "time" => Ok(Self::Index), + "component" | "data" => Ok(Self::Component), + + _ => Err(InvalidSorbetSchema::custom(format!( + "Unknown column kind: {kind}" + ))), + } + } +} + +// ---------------------------------------------------------------------------- + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct SorbetColumnDescriptors { + /// The primary row id column. + /// If present, it is always the first column. + pub row_id: Option, + + /// Index columns (timelines). + pub indices: Vec, + + /// The actual component data + pub components: Vec, +} + +impl SorbetColumnDescriptors { + /// Total number of columns in this chunk, + /// including the row id column, the index columns, + /// and the data columns. + pub fn num_columns(&self) -> usize { + let Self { + row_id, + indices, + components, + } = self; + row_id.is_some() as usize + indices.len() + components.len() + } + + pub fn arrow_fields(&self) -> Vec { + let Self { + row_id, + indices, + components, + } = self; + let mut fields: Vec = Vec::with_capacity(self.num_columns()); + if let Some(row_id) = row_id { + fields.push(row_id.to_arrow_field()); + } + fields.extend(indices.iter().map(|column| column.to_arrow_field())); + fields.extend( + components + .iter() + .map(|column| column.to_arrow_field(crate::BatchType::Chunk)), + ); + fields + } +} + +impl SorbetColumnDescriptors { + fn try_from_arrow_fields( + chunk_entity_path: Option<&EntityPath>, + fields: &ArrowFields, + ) -> Result { + let mut row_ids = Vec::new(); + let mut indices = Vec::new(); + let mut components = Vec::new(); + + for field in fields { + let field = field.as_ref(); + let column_kind = ColumnKind::try_from(field)?; + match column_kind { + ColumnKind::RowId => { + if indices.is_empty() && components.is_empty() { + row_ids.push(RowIdColumnDescriptor::try_from(field)?); + } else { + return Err(InvalidSorbetSchema::custom( + "RowId column must be the first column", + )); + } + } + + ColumnKind::Index => { + if components.is_empty() { + indices.push(IndexColumnDescriptor::try_from(field)?); + } else { + return Err(InvalidSorbetSchema::custom( + "Index columns must come before any data columns", + )); + } + } + + ColumnKind::Component => { + components.push(ComponentColumnDescriptor::try_from_arrow_field( + chunk_entity_path, + field, + )?); + } + } + } + + if row_ids.len() > 1 { + return Err(InvalidSorbetSchema::custom( + "Multiple row_id columns are not supported", + )); + } + + Ok(Self { + row_id: row_ids.pop(), + indices, + components, + }) + } +} + +// ---------------------------------------------------------------------------- + +/// The parsed schema of a `SorbetBatch`. +/// +/// This does NOT contain custom arrow metadata. +/// It only contains the metadata used by Rerun. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct SorbetSchema { + pub columns: SorbetColumnDescriptors, + + /// The globally unique ID of this chunk, + /// if this is a chunk. + pub chunk_id: Option, + + /// Which entity is this chunk for? + pub entity_path: Option, + + /// The heap size of this batch in bytes, if known. + pub heap_size_bytes: Option, + + /// Are we sorted by the row id column? + pub is_sorted: bool, // TODO(emilk): move to `RowIdColumnDescriptor`. +} + +/// ## Metadata keys for the record batch metadata +impl SorbetSchema { + /// The key used to identify the version of the Rerun schema. + const METADATA_KEY_VERSION: &'static str = "rerun.version"; + + /// The version of the Rerun schema. + const METADATA_VERSION: &'static str = "1"; +} + +impl SorbetSchema { + #[inline] + pub fn with_heap_size_bytes(mut self, heap_size_bytes: u64) -> Self { + self.heap_size_bytes = Some(heap_size_bytes); + self + } + + #[inline] + pub fn with_sorted(mut self, sorted_by_row_id: bool) -> Self { + self.is_sorted = sorted_by_row_id; + self + } + + pub fn chunk_id_metadata(chunk_id: &ChunkId) -> (String, String) { + ("rerun.id".to_owned(), format!("{:X}", chunk_id.as_u128())) + } + + pub fn entity_path_metadata(entity_path: &EntityPath) -> (String, String) { + ("rerun.entity_path".to_owned(), entity_path.to_string()) + } + + pub fn arrow_batch_metadata(&self) -> ArrowBatchMetadata { + let Self { + columns: _, + chunk_id, + entity_path, + heap_size_bytes, + is_sorted, + } = self; + + [ + Some(( + Self::METADATA_KEY_VERSION.to_owned(), + Self::METADATA_VERSION.to_owned(), + )), + chunk_id.as_ref().map(Self::chunk_id_metadata), + entity_path.as_ref().map(Self::entity_path_metadata), + heap_size_bytes.as_ref().map(|heap_size_bytes| { + ( + "rerun.heap_size_bytes".to_owned(), + heap_size_bytes.to_string(), + ) + }), + is_sorted.then(|| ("rerun.is_sorted".to_owned(), "true".to_owned())), + ] + .into_iter() + .flatten() + .collect() + } +} + +impl TryFrom<&ArrowSchema> for SorbetSchema { + type Error = InvalidSorbetSchema; + + fn try_from(arrow_schema: &ArrowSchema) -> Result { + let ArrowSchema { metadata, fields } = arrow_schema; + + let entity_path = metadata + .get("rerun.entity_path") + .map(|s| EntityPath::parse_forgiving(s)); + + let columns = SorbetColumnDescriptors::try_from_arrow_fields(entity_path.as_ref(), fields)?; + + let chunk_id = if let Some(chunk_id_str) = metadata.get("rerun.id") { + Some(chunk_id_str.parse().map_err(|err| { + InvalidSorbetSchema::custom(format!( + "Failed to deserialize chunk id {chunk_id_str:?}: {err}" + )) + })?) + } else { + None + }; + + let sorted_by_row_id = metadata.get_bool("rerun.is_sorted"); + let heap_size_bytes = if let Some(heap_size_bytes) = metadata.get("rerun.heap_size_bytes") { + heap_size_bytes + .parse() + .map_err(|err| { + re_log::warn_once!( + "Failed to parse heap_size_bytes {heap_size_bytes:?} in chunk: {err}" + ); + }) + .ok() + } else { + None + }; + + // Verify version + if let Some(batch_version) = metadata.get(Self::METADATA_KEY_VERSION) { + if batch_version != Self::METADATA_VERSION { + re_log::warn_once!( + "Sorbet batch version mismatch. Expected {:?}, got {batch_version:?}", + Self::METADATA_VERSION + ); + } + } + + Ok(Self { + columns, + chunk_id, + entity_path, + heap_size_bytes, + is_sorted: sorted_by_row_id, + }) + } +} diff --git a/rerun_py/src/remote.rs b/rerun_py/src/remote.rs index 8e24a7785c26..b43738c244a8 100644 --- a/rerun_py/src/remote.rs +++ b/rerun_py/src/remote.rs @@ -294,12 +294,14 @@ impl PyStorageNodeClient { let column_descriptors = itertools::chain!( chunk_schema - .index_columns - .into_iter() + .index_columns() + .iter() + .cloned() .map(re_sorbet::ColumnDescriptor::Time), chunk_schema - .data_columns - .into_iter() + .data_columns() + .iter() + .cloned() .map(re_sorbet::ColumnDescriptor::Component), ) .collect();