diff --git a/crates/store/re_sorbet/src/column_kind.rs b/crates/store/re_sorbet/src/column_kind.rs new file mode 100644 index 000000000000..22baa7c9fc50 --- /dev/null +++ b/crates/store/re_sorbet/src/column_kind.rs @@ -0,0 +1,28 @@ +use arrow::datatypes::Field as ArrowField; + +use crate::{InvalidSorbetSchema, MetadataExt as _}; + +/// The type of column in a sorbet batch. +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] +pub enum ColumnKind { + RowId, + Index, + Component, +} + +impl TryFrom<&ArrowField> for ColumnKind { + type Error = InvalidSorbetSchema; + + fn try_from(fields: &ArrowField) -> Result { + let kind = fields.get_or_err("rerun.kind")?; + match kind { + "control" | "row_id" => Ok(Self::RowId), + "index" | "time" => Ok(Self::Index), + "component" | "data" => Ok(Self::Component), + + _ => Err(InvalidSorbetSchema::custom(format!( + "Unknown column kind: {kind}" + ))), + } + } +} diff --git a/crates/store/re_sorbet/src/lib.rs b/crates/store/re_sorbet/src/lib.rs index d834821b351e..b7e1a11967c4 100644 --- a/crates/store/re_sorbet/src/lib.rs +++ b/crates/store/re_sorbet/src/lib.rs @@ -35,17 +35,20 @@ SorbetBatch superset: mod chunk_batch; mod chunk_schema; mod column_descriptor; +mod column_kind; mod component_column_descriptor; mod index_column_descriptor; mod ipc; mod metadata; mod row_id_column_descriptor; +mod sorbet_columns; mod sorbet_schema; pub use self::{ chunk_batch::{ChunkBatch, MismatchedChunkSchemaError}, chunk_schema::ChunkSchema, column_descriptor::{ColumnDescriptor, ColumnError}, + column_kind::ColumnKind, component_column_descriptor::ComponentColumnDescriptor, index_column_descriptor::{IndexColumnDescriptor, UnsupportedTimeType}, ipc::{ipc_from_schema, schema_from_ipc}, @@ -54,6 +57,7 @@ pub use self::{ MissingMetadataKey, }, row_id_column_descriptor::{RowIdColumnDescriptor, WrongDatatypeError}, + sorbet_columns::SorbetColumnDescriptors, sorbet_schema::{ColumnKind, InvalidSorbetSchema, SorbetColumnDescriptors, SorbetSchema}, }; diff --git a/crates/store/re_sorbet/src/sorbet_columns.rs b/crates/store/re_sorbet/src/sorbet_columns.rs new file mode 100644 index 000000000000..5a9465acfa10 --- /dev/null +++ b/crates/store/re_sorbet/src/sorbet_columns.rs @@ -0,0 +1,110 @@ +use arrow::datatypes::{Field as ArrowField, Fields as ArrowFields}; + +use re_log_types::EntityPath; + +use crate::{ + ColumnKind, ComponentColumnDescriptor, IndexColumnDescriptor, InvalidSorbetSchema, + RowIdColumnDescriptor, +}; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct SorbetColumnDescriptors { + /// The primary row id column. + /// If present, it is always the first column. + pub row_id: Option, + + /// Index columns (timelines). + pub indices: Vec, + + /// The actual component data + pub components: Vec, +} + +impl SorbetColumnDescriptors { + /// Total number of columns in this chunk, + /// including the row id column, the index columns, + /// and the data columns. + pub fn num_columns(&self) -> usize { + let Self { + row_id, + indices, + components, + } = self; + row_id.is_some() as usize + indices.len() + components.len() + } + + pub fn arrow_fields(&self) -> Vec { + let Self { + row_id, + indices, + components, + } = self; + let mut fields: Vec = Vec::with_capacity(self.num_columns()); + if let Some(row_id) = row_id { + fields.push(row_id.to_arrow_field()); + } + fields.extend(indices.iter().map(|column| column.to_arrow_field())); + fields.extend( + components + .iter() + .map(|column| column.to_arrow_field(crate::BatchType::Chunk)), + ); + fields + } +} + +impl SorbetColumnDescriptors { + fn try_from_arrow_fields( + chunk_entity_path: Option<&EntityPath>, + fields: &ArrowFields, + ) -> Result { + let mut row_ids = Vec::new(); + let mut indices = Vec::new(); + let mut components = Vec::new(); + + for field in fields { + let field = field.as_ref(); + let column_kind = ColumnKind::try_from(field)?; + match column_kind { + ColumnKind::RowId => { + if indices.is_empty() && components.is_empty() { + row_ids.push(RowIdColumnDescriptor::try_from(field)?); + } else { + return Err(InvalidSorbetSchema::custom( + "RowId column must be the first column", + )); + } + } + + ColumnKind::Index => { + if components.is_empty() { + indices.push(IndexColumnDescriptor::try_from(field)?); + } else { + return Err(InvalidSorbetSchema::custom( + "Index columns must come before any data columns", + )); + } + } + + ColumnKind::Component => { + components.push(ComponentColumnDescriptor::from_arrow_field( + chunk_entity_path, + field, + )); + } + } + } + + if row_ids.len() > 1 { + return Err(InvalidSorbetSchema::custom( + "Multiple row_id columns are not supported", + )); + } + + Ok(Self { + row_id: row_ids.pop(), + indices, + components, + }) + } +} diff --git a/crates/store/re_sorbet/src/sorbet_schema.rs b/crates/store/re_sorbet/src/sorbet_schema.rs index 9665c2a7dfe9..5cfd852b3ddf 100644 --- a/crates/store/re_sorbet/src/sorbet_schema.rs +++ b/crates/store/re_sorbet/src/sorbet_schema.rs @@ -1,12 +1,9 @@ -use arrow::datatypes::{Field as ArrowField, Fields as ArrowFields, Schema as ArrowSchema}; +use arrow::datatypes::Schema as ArrowSchema; use re_log_types::EntityPath; use re_types_core::ChunkId; -use crate::{ - ArrowBatchMetadata, ColumnError, ComponentColumnDescriptor, IndexColumnDescriptor, - MetadataExt as _, RowIdColumnDescriptor, -}; +use crate::{ArrowBatchMetadata, ColumnError, MetadataExt as _, SorbetColumnDescriptors}; #[derive(thiserror::Error, Debug)] pub enum InvalidSorbetSchema { @@ -45,135 +42,6 @@ impl InvalidSorbetSchema { // ---------------------------------------------------------------------------- -pub enum ColumnKind { - RowId, - Index, - Component, -} - -impl TryFrom<&ArrowField> for ColumnKind { - type Error = InvalidSorbetSchema; - - fn try_from(fields: &ArrowField) -> Result { - let kind = fields.get_or_err("rerun.kind")?; - match kind { - "control" | "row_id" => Ok(Self::RowId), - "index" | "time" => Ok(Self::Index), - "component" | "data" => Ok(Self::Component), - - _ => Err(InvalidSorbetSchema::custom(format!( - "Unknown column kind: {kind}" - ))), - } - } -} - -// ---------------------------------------------------------------------------- - -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct SorbetColumnDescriptors { - /// The primary row id column. - /// If present, it is always the first column. - pub row_id: Option, - - /// Index columns (timelines). - pub indices: Vec, - - /// The actual component data - pub components: Vec, -} - -impl SorbetColumnDescriptors { - /// Total number of columns in this chunk, - /// including the row id column, the index columns, - /// and the data columns. - pub fn num_columns(&self) -> usize { - let Self { - row_id, - indices, - components, - } = self; - row_id.is_some() as usize + indices.len() + components.len() - } - - pub fn arrow_fields(&self) -> Vec { - let Self { - row_id, - indices, - components, - } = self; - let mut fields: Vec = Vec::with_capacity(self.num_columns()); - if let Some(row_id) = row_id { - fields.push(row_id.to_arrow_field()); - } - fields.extend(indices.iter().map(|column| column.to_arrow_field())); - fields.extend( - components - .iter() - .map(|column| column.to_arrow_field(crate::BatchType::Chunk)), - ); - fields - } -} - -impl SorbetColumnDescriptors { - fn try_from_arrow_fields( - chunk_entity_path: Option<&EntityPath>, - fields: &ArrowFields, - ) -> Result { - let mut row_ids = Vec::new(); - let mut indices = Vec::new(); - let mut components = Vec::new(); - - for field in fields { - let field = field.as_ref(); - let column_kind = ColumnKind::try_from(field)?; - match column_kind { - ColumnKind::RowId => { - if indices.is_empty() && components.is_empty() { - row_ids.push(RowIdColumnDescriptor::try_from(field)?); - } else { - return Err(InvalidSorbetSchema::custom( - "RowId column must be the first column", - )); - } - } - - ColumnKind::Index => { - if components.is_empty() { - indices.push(IndexColumnDescriptor::try_from(field)?); - } else { - return Err(InvalidSorbetSchema::custom( - "Index columns must come before any data columns", - )); - } - } - - ColumnKind::Component => { - components.push(ComponentColumnDescriptor::from_arrow_field( - chunk_entity_path, - field, - )); - } - } - } - - if row_ids.len() > 1 { - return Err(InvalidSorbetSchema::custom( - "Multiple row_id columns are not supported", - )); - } - - Ok(Self { - row_id: row_ids.pop(), - indices, - components, - }) - } -} - -// ---------------------------------------------------------------------------- - /// The parsed schema of a `SorbetBatch`. /// /// This does NOT contain custom arrow metadata.