Skip to content

Commit

Permalink
Add general re_sorbet::SorbetSchema
Browse files Browse the repository at this point in the history
  • Loading branch information
emilk committed Feb 12, 2025
1 parent 18b4bac commit e6eb965
Show file tree
Hide file tree
Showing 9 changed files with 432 additions and 242 deletions.
2 changes: 1 addition & 1 deletion crates/store/re_chunk/src/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ pub enum ChunkError {
MismatchedChunkSchemaError(#[from] re_sorbet::MismatchedChunkSchemaError),

#[error(transparent)]
InvalidChunkSchema(#[from] re_sorbet::InvalidChunkSchema),
InvalidChunkSchema(#[from] re_sorbet::InvalidSorbetSchema),
}

pub type ChunkResult<T> = Result<T, ChunkError>;
Expand Down
2 changes: 1 addition & 1 deletion crates/store/re_grpc_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ pub enum StreamError {
InvalidUri(String),

#[error(transparent)]
InvalidChunkSchema(#[from] re_sorbet::InvalidChunkSchema),
InvalidChunkSchema(#[from] re_sorbet::InvalidSorbetSchema),
}

#[cfg(target_arch = "wasm32")]
Expand Down
4 changes: 2 additions & 2 deletions crates/store/re_grpc_client/src/redap/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,8 +335,8 @@ async fn stream_catalog_async(
let mut metadata = record_batch.schema_ref().metadata.clone();

for (key, value) in [
re_sorbet::ChunkSchema::chunk_id_metadata(&ChunkId::new()),
re_sorbet::ChunkSchema::entity_path_metadata(&entity_path),
re_sorbet::SorbetSchema::chunk_id_metadata(&ChunkId::new()),
re_sorbet::SorbetSchema::entity_path_metadata(&entity_path),
] {
metadata.entry(key).or_insert(value);
}
Expand Down
28 changes: 14 additions & 14 deletions crates/store/re_sorbet/src/chunk_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ use re_log_types::EntityPath;
use re_types_core::ChunkId;

use crate::{
chunk_schema::InvalidChunkSchema, ArrowBatchMetadata, ChunkSchema, ComponentColumnDescriptor,
IndexColumnDescriptor, RowIdColumnDescriptor, WrongDatatypeError,
ArrowBatchMetadata, ChunkSchema, ComponentColumnDescriptor, IndexColumnDescriptor,
InvalidSorbetSchema, RowIdColumnDescriptor, WrongDatatypeError,
};

#[derive(thiserror::Error, Debug)]
Expand Down Expand Up @@ -52,18 +52,18 @@ impl ChunkBatch {
let row_count = row_ids.len();

WrongDatatypeError::compare_expected_actual(
&schema.row_id_column.datatype(),
&schema.row_id_column().datatype(),
row_ids.data_type(),
)?;

if index_arrays.len() != schema.index_columns.len() {
if index_arrays.len() != schema.index_columns().len() {
return Err(MismatchedChunkSchemaError::custom(format!(
"Schema had {} index columns, but got {}",
schema.index_columns.len(),
schema.index_columns().len(),
index_arrays.len()
)));
}
for (schema, array) in itertools::izip!(&schema.index_columns, &index_arrays) {
for (schema, array) in itertools::izip!(schema.index_columns(), &index_arrays) {
WrongDatatypeError::compare_expected_actual(schema.datatype(), array.data_type())?;
if array.len() != row_count {
return Err(MismatchedChunkSchemaError::custom(format!(
Expand All @@ -75,14 +75,14 @@ impl ChunkBatch {
}
}

if data_arrays.len() != schema.data_columns.len() {
if data_arrays.len() != schema.data_columns().len() {
return Err(MismatchedChunkSchemaError::custom(format!(
"Schema had {} data columns, but got {}",
schema.data_columns.len(),
schema.data_columns().len(),
data_arrays.len()
)));
}
for (schema, array) in itertools::izip!(&schema.data_columns, &data_arrays) {
for (schema, array) in itertools::izip!(schema.data_columns(), &data_arrays) {
WrongDatatypeError::compare_expected_actual(&schema.store_datatype, array.data_type())?;
if array.len() != row_count {
return Err(MismatchedChunkSchemaError::custom(format!(
Expand Down Expand Up @@ -155,7 +155,7 @@ impl ChunkBatch {
pub fn row_id_column(&self) -> (&RowIdColumnDescriptor, &ArrowStructArray) {
// The first column is always the row IDs.
(
&self.schema.row_id_column,
self.schema.row_id_column(),
self.batch.columns()[0]
.as_struct_opt()
.expect("Row IDs should be encoded as struct"),
Expand All @@ -165,7 +165,7 @@ impl ChunkBatch {
/// The columns of the indices (timelines).
pub fn index_columns(&self) -> impl Iterator<Item = (&IndexColumnDescriptor, &ArrowArrayRef)> {
itertools::izip!(
&self.schema.index_columns,
self.schema.index_columns(),
self.batch.columns().iter().skip(1) // skip row IDs
)
}
Expand All @@ -175,11 +175,11 @@ impl ChunkBatch {
&self,
) -> impl Iterator<Item = (&ComponentColumnDescriptor, &ArrowArrayRef)> {
itertools::izip!(
&self.schema.data_columns,
self.schema.data_columns(),
self.batch
.columns()
.iter()
.skip(1 + self.schema.index_columns.len()) // skip row IDs and indices
.skip(1 + self.schema.index_columns().len()) // skip row IDs and indices
)
}
}
Expand Down Expand Up @@ -222,7 +222,7 @@ impl From<&ChunkBatch> for ArrowRecordBatch {
}

impl TryFrom<&ArrowRecordBatch> for ChunkBatch {
type Error = InvalidChunkSchema;
type Error = InvalidSorbetSchema;

/// Will automatically wrap data columns in `ListArrays` if they are not already.
fn try_from(batch: &ArrowRecordBatch) -> Result<Self, Self::Error> {
Expand Down
Loading

0 comments on commit e6eb965

Please sign in to comment.