diff --git a/crates/store/re_sorbet/src/chunk_batch.rs b/crates/store/re_sorbet/src/chunk_batch.rs index bcd648a2b952..7d447a178106 100644 --- a/crates/store/re_sorbet/src/chunk_batch.rs +++ b/crates/store/re_sorbet/src/chunk_batch.rs @@ -45,6 +45,7 @@ impl ChunkBatch { data_arrays: Vec, ) -> Result { Self::try_from(SorbetBatch::try_new( + crate::BatchType::Chunk, schema.into(), Some(row_ids), index_arrays, @@ -131,7 +132,10 @@ impl TryFrom<&ArrowRecordBatch> for ChunkBatch { fn try_from(batch: &ArrowRecordBatch) -> Result { re_tracing::profile_function!(); - Self::try_from(SorbetBatch::try_from(batch)?) + Self::try_from(SorbetBatch::try_from_record_batch( + batch, + crate::BatchType::Chunk, + )?) } } diff --git a/crates/store/re_sorbet/src/chunk_schema.rs b/crates/store/re_sorbet/src/chunk_schema.rs index eaba85a8b1cd..9779a7912212 100644 --- a/crates/store/re_sorbet/src/chunk_schema.rs +++ b/crates/store/re_sorbet/src/chunk_schema.rs @@ -121,7 +121,7 @@ impl ChunkSchema { } pub fn arrow_fields(&self) -> Vec { - self.sorbet.columns.arrow_fields() + self.sorbet.columns.arrow_fields(crate::BatchType::Chunk) } } diff --git a/crates/store/re_sorbet/src/sorbet_batch.rs b/crates/store/re_sorbet/src/sorbet_batch.rs index 976a09ea6c6c..24a7ab5c0a73 100644 --- a/crates/store/re_sorbet/src/sorbet_batch.rs +++ b/crates/store/re_sorbet/src/sorbet_batch.rs @@ -25,6 +25,7 @@ pub struct SorbetBatch { impl SorbetBatch { pub fn try_new( + batch_type: crate::BatchType, schema: SorbetSchema, row_ids: Option, index_arrays: Vec, @@ -33,7 +34,7 @@ impl SorbetBatch { let arrow_columns = itertools::chain!(row_ids, index_arrays, data_arrays).collect(); let batch = ArrowRecordBatch::try_new( - std::sync::Arc::new(ArrowSchema::from(&schema)), + std::sync::Arc::new(schema.to_arrow(batch_type)), arrow_columns, )?; @@ -143,20 +144,22 @@ impl From<&SorbetBatch> for ArrowRecordBatch { } } -impl TryFrom<&ArrowRecordBatch> for SorbetBatch { - type Error = SorbetError; - +impl SorbetBatch { /// Will automatically wrap data columns in `ListArrays` if they are not already. - fn try_from(batch: &ArrowRecordBatch) -> Result { + pub fn try_from_record_batch( + batch: &ArrowRecordBatch, + batch_type: crate::BatchType, + ) -> Result { re_tracing::profile_function!(); let batch = make_all_data_columns_list_arrays(batch); let sorbet_schema = SorbetSchema::try_from(batch.schema_ref().as_ref())?; - for (field, column) in - itertools::izip!(sorbet_schema.columns.arrow_fields(), batch.columns()) - { + for (field, column) in itertools::izip!( + sorbet_schema.columns.arrow_fields(batch_type), + batch.columns() + ) { debug_assert_eq!(field.data_type(), column.data_type()); } diff --git a/crates/store/re_sorbet/src/sorbet_columns.rs b/crates/store/re_sorbet/src/sorbet_columns.rs index 062656d585bd..3f50d8c5d2a9 100644 --- a/crates/store/re_sorbet/src/sorbet_columns.rs +++ b/crates/store/re_sorbet/src/sorbet_columns.rs @@ -96,7 +96,7 @@ impl SorbetColumnDescriptors { } } - pub fn arrow_fields(&self) -> Vec { + pub fn arrow_fields(&self, batch_type: crate::BatchType) -> Vec { let Self { row_id, indices, @@ -110,7 +110,7 @@ impl SorbetColumnDescriptors { fields.extend( components .iter() - .map(|column| column.to_arrow_field(crate::BatchType::Chunk)), + .map(|column| column.to_arrow_field(batch_type)), ); fields } diff --git a/crates/store/re_sorbet/src/sorbet_schema.rs b/crates/store/re_sorbet/src/sorbet_schema.rs index 099960e2819e..4bd0514f36e9 100644 --- a/crates/store/re_sorbet/src/sorbet_schema.rs +++ b/crates/store/re_sorbet/src/sorbet_schema.rs @@ -85,11 +85,11 @@ impl From for SorbetColumnDescriptors { } } -impl From<&SorbetSchema> for ArrowSchema { - fn from(sorbet_schema: &SorbetSchema) -> Self { - Self { - metadata: sorbet_schema.arrow_batch_metadata(), - fields: sorbet_schema.columns.arrow_fields().into(), +impl SorbetSchema { + pub fn to_arrow(&self, batch_type: crate::BatchType) -> ArrowSchema { + ArrowSchema { + metadata: self.arrow_batch_metadata(), + fields: self.columns.arrow_fields(batch_type).into(), } } }