Skip to content

Commit

Permalink
Split into files
Browse files Browse the repository at this point in the history
  • Loading branch information
emilk committed Feb 12, 2025
1 parent fd7e6f2 commit ef157a7
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 134 deletions.
28 changes: 28 additions & 0 deletions crates/store/re_sorbet/src/column_kind.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use arrow::datatypes::Field as ArrowField;

use crate::{InvalidSorbetSchema, MetadataExt as _};

/// The type of column in a sorbet batch.
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub enum ColumnKind {
RowId,
Index,
Component,
}

impl TryFrom<&ArrowField> for ColumnKind {
type Error = InvalidSorbetSchema;

fn try_from(fields: &ArrowField) -> Result<Self, Self::Error> {
let kind = fields.get_or_err("rerun.kind")?;
match kind {
"control" | "row_id" => Ok(Self::RowId),
"index" | "time" => Ok(Self::Index),
"component" | "data" => Ok(Self::Component),

_ => Err(InvalidSorbetSchema::custom(format!(
"Unknown column kind: {kind}"
))),
}
}
}
4 changes: 4 additions & 0 deletions crates/store/re_sorbet/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,20 @@ SorbetBatch superset:
mod chunk_batch;
mod chunk_schema;
mod column_descriptor;
mod column_kind;
mod component_column_descriptor;
mod index_column_descriptor;
mod ipc;
mod metadata;
mod row_id_column_descriptor;
mod sorbet_columns;
mod sorbet_schema;

pub use self::{
chunk_batch::{ChunkBatch, MismatchedChunkSchemaError},
chunk_schema::ChunkSchema,
column_descriptor::{ColumnDescriptor, ColumnError},
column_kind::ColumnKind,
component_column_descriptor::ComponentColumnDescriptor,
index_column_descriptor::{IndexColumnDescriptor, UnsupportedTimeType},
ipc::{ipc_from_schema, schema_from_ipc},
Expand All @@ -54,6 +57,7 @@ pub use self::{
MissingMetadataKey,
},
row_id_column_descriptor::{RowIdColumnDescriptor, WrongDatatypeError},
sorbet_columns::SorbetColumnDescriptors,
sorbet_schema::{ColumnKind, InvalidSorbetSchema, SorbetColumnDescriptors, SorbetSchema},
};

Expand Down
110 changes: 110 additions & 0 deletions crates/store/re_sorbet/src/sorbet_columns.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
use arrow::datatypes::{Field as ArrowField, Fields as ArrowFields};

use re_log_types::EntityPath;

use crate::{
ColumnKind, ComponentColumnDescriptor, IndexColumnDescriptor, InvalidSorbetSchema,
RowIdColumnDescriptor,
};

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SorbetColumnDescriptors {
/// The primary row id column.
/// If present, it is always the first column.
pub row_id: Option<RowIdColumnDescriptor>,

/// Index columns (timelines).
pub indices: Vec<IndexColumnDescriptor>,

/// The actual component data
pub components: Vec<ComponentColumnDescriptor>,
}

impl SorbetColumnDescriptors {
/// Total number of columns in this chunk,
/// including the row id column, the index columns,
/// and the data columns.
pub fn num_columns(&self) -> usize {
let Self {
row_id,
indices,
components,
} = self;
row_id.is_some() as usize + indices.len() + components.len()
}

pub fn arrow_fields(&self) -> Vec<ArrowField> {
let Self {
row_id,
indices,
components,
} = self;
let mut fields: Vec<ArrowField> = Vec::with_capacity(self.num_columns());
if let Some(row_id) = row_id {
fields.push(row_id.to_arrow_field());
}
fields.extend(indices.iter().map(|column| column.to_arrow_field()));
fields.extend(
components
.iter()
.map(|column| column.to_arrow_field(crate::BatchType::Chunk)),
);
fields
}
}

impl SorbetColumnDescriptors {
fn try_from_arrow_fields(
chunk_entity_path: Option<&EntityPath>,
fields: &ArrowFields,
) -> Result<Self, InvalidSorbetSchema> {
let mut row_ids = Vec::new();
let mut indices = Vec::new();
let mut components = Vec::new();

for field in fields {
let field = field.as_ref();
let column_kind = ColumnKind::try_from(field)?;
match column_kind {
ColumnKind::RowId => {
if indices.is_empty() && components.is_empty() {
row_ids.push(RowIdColumnDescriptor::try_from(field)?);
} else {
return Err(InvalidSorbetSchema::custom(
"RowId column must be the first column",
));
}
}

ColumnKind::Index => {
if components.is_empty() {
indices.push(IndexColumnDescriptor::try_from(field)?);
} else {
return Err(InvalidSorbetSchema::custom(
"Index columns must come before any data columns",
));
}
}

ColumnKind::Component => {
components.push(ComponentColumnDescriptor::from_arrow_field(
chunk_entity_path,
field,
));
}
}
}

if row_ids.len() > 1 {
return Err(InvalidSorbetSchema::custom(
"Multiple row_id columns are not supported",
));
}

Ok(Self {
row_id: row_ids.pop(),
indices,
components,
})
}
}
136 changes: 2 additions & 134 deletions crates/store/re_sorbet/src/sorbet_schema.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
use arrow::datatypes::{Field as ArrowField, Fields as ArrowFields, Schema as ArrowSchema};
use arrow::datatypes::Schema as ArrowSchema;

use re_log_types::EntityPath;
use re_types_core::ChunkId;

use crate::{
ArrowBatchMetadata, ColumnError, ComponentColumnDescriptor, IndexColumnDescriptor,
MetadataExt as _, RowIdColumnDescriptor,
};
use crate::{ArrowBatchMetadata, ColumnError, MetadataExt as _, SorbetColumnDescriptors};

#[derive(thiserror::Error, Debug)]
pub enum InvalidSorbetSchema {
Expand Down Expand Up @@ -45,135 +42,6 @@ impl InvalidSorbetSchema {

// ----------------------------------------------------------------------------

pub enum ColumnKind {
RowId,
Index,
Component,
}

impl TryFrom<&ArrowField> for ColumnKind {
type Error = InvalidSorbetSchema;

fn try_from(fields: &ArrowField) -> Result<Self, Self::Error> {
let kind = fields.get_or_err("rerun.kind")?;
match kind {
"control" | "row_id" => Ok(Self::RowId),
"index" | "time" => Ok(Self::Index),
"component" | "data" => Ok(Self::Component),

_ => Err(InvalidSorbetSchema::custom(format!(
"Unknown column kind: {kind}"
))),
}
}
}

// ----------------------------------------------------------------------------

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SorbetColumnDescriptors {
/// The primary row id column.
/// If present, it is always the first column.
pub row_id: Option<RowIdColumnDescriptor>,

/// Index columns (timelines).
pub indices: Vec<IndexColumnDescriptor>,

/// The actual component data
pub components: Vec<ComponentColumnDescriptor>,
}

impl SorbetColumnDescriptors {
/// Total number of columns in this chunk,
/// including the row id column, the index columns,
/// and the data columns.
pub fn num_columns(&self) -> usize {
let Self {
row_id,
indices,
components,
} = self;
row_id.is_some() as usize + indices.len() + components.len()
}

pub fn arrow_fields(&self) -> Vec<ArrowField> {
let Self {
row_id,
indices,
components,
} = self;
let mut fields: Vec<ArrowField> = Vec::with_capacity(self.num_columns());
if let Some(row_id) = row_id {
fields.push(row_id.to_arrow_field());
}
fields.extend(indices.iter().map(|column| column.to_arrow_field()));
fields.extend(
components
.iter()
.map(|column| column.to_arrow_field(crate::BatchType::Chunk)),
);
fields
}
}

impl SorbetColumnDescriptors {
fn try_from_arrow_fields(
chunk_entity_path: Option<&EntityPath>,
fields: &ArrowFields,
) -> Result<Self, InvalidSorbetSchema> {
let mut row_ids = Vec::new();
let mut indices = Vec::new();
let mut components = Vec::new();

for field in fields {
let field = field.as_ref();
let column_kind = ColumnKind::try_from(field)?;
match column_kind {
ColumnKind::RowId => {
if indices.is_empty() && components.is_empty() {
row_ids.push(RowIdColumnDescriptor::try_from(field)?);
} else {
return Err(InvalidSorbetSchema::custom(
"RowId column must be the first column",
));
}
}

ColumnKind::Index => {
if components.is_empty() {
indices.push(IndexColumnDescriptor::try_from(field)?);
} else {
return Err(InvalidSorbetSchema::custom(
"Index columns must come before any data columns",
));
}
}

ColumnKind::Component => {
components.push(ComponentColumnDescriptor::from_arrow_field(
chunk_entity_path,
field,
));
}
}
}

if row_ids.len() > 1 {
return Err(InvalidSorbetSchema::custom(
"Multiple row_id columns are not supported",
));
}

Ok(Self {
row_id: row_ids.pop(),
indices,
components,
})
}
}

// ----------------------------------------------------------------------------

/// The parsed schema of a `SorbetBatch`.
///
/// This does NOT contain custom arrow metadata.
Expand Down

0 comments on commit ef157a7

Please sign in to comment.