Skip to content

Commit

Permalink
Add general re_sorbet::SorbetSchema (#9017)
Browse files Browse the repository at this point in the history
### Related
* Part of #8744

### What
Expand `re_sorbet` with `SorbetSchema` and `SorbetBatch` - a superset of
the existing `ChunkSchema` and `ChunkBatch`.

### How
I'm using a pretty ~ugly~ interesting approach of "fake inheritance":

Define `A` -> `B` as:
* `struct A` contains a `B`
* `impl Deref<Target=B> for A` (upcasting)
* `impl TryFrom<B> for A` (“downcasting”)
* `impl From<A> for B` (“object slicing”)

Using this notation:
* `ChunkSchema` -> `SorbetSchema`
* `ChunkBatch` -> `SorbetBatch` -> `arrow::RecordBatch`

This means the `ChunkBatch` can be used as `SorbetBatch` but also as an
`arrow::RecordBatch`.

Note that there is no dynamic dispatch involved at all.
  • Loading branch information
emilk authored Feb 13, 2025
1 parent 163ce5b commit 45353c2
Show file tree
Hide file tree
Showing 16 changed files with 691 additions and 409 deletions.
2 changes: 1 addition & 1 deletion crates/store/re_chunk/src/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ pub enum ChunkError {
MismatchedChunkSchemaError(#[from] re_sorbet::MismatchedChunkSchemaError),

#[error(transparent)]
InvalidChunkSchema(#[from] re_sorbet::InvalidChunkSchema),
InvalidSorbetSchema(#[from] re_sorbet::SorbetError),
}

pub type ChunkResult<T> = Result<T, ChunkError>;
Expand Down
2 changes: 1 addition & 1 deletion crates/store/re_chunk/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ impl Chunk {
let components = {
let mut components = ChunkComponents::default();

for (schema, column) in batch.data_columns() {
for (schema, column) in batch.component_columns() {
let column = column
.downcast_array_ref::<ArrowListArray>()
.ok_or_else(|| ChunkError::Malformed {
Expand Down
2 changes: 1 addition & 1 deletion crates/store/re_grpc_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ pub enum StreamError {
InvalidUri(String),

#[error(transparent)]
InvalidChunkSchema(#[from] re_sorbet::InvalidChunkSchema),
InvalidSorbetSchema(#[from] re_sorbet::SorbetError),
}

#[cfg(target_arch = "wasm32")]
Expand Down
4 changes: 2 additions & 2 deletions crates/store/re_grpc_client/src/redap/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,8 +335,8 @@ async fn stream_catalog_async(
let mut metadata = record_batch.schema_ref().metadata.clone();

for (key, value) in [
re_sorbet::ChunkSchema::chunk_id_metadata(&ChunkId::new()),
re_sorbet::ChunkSchema::entity_path_metadata(&entity_path),
re_sorbet::SorbetSchema::chunk_id_metadata(&ChunkId::new()),
re_sorbet::SorbetSchema::entity_path_metadata(&entity_path),
] {
metadata.entry(key).or_insert(value);
}
Expand Down
204 changes: 34 additions & 170 deletions crates/store/re_sorbet/src/chunk_batch.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,15 @@
use arrow::{
array::{
Array as ArrowArray, ArrayRef as ArrowArrayRef, AsArray, ListArray as ArrowListArray,
RecordBatch as ArrowRecordBatch, RecordBatchOptions, StructArray as ArrowStructArray,
ArrayRef as ArrowArrayRef, AsArray, RecordBatch as ArrowRecordBatch,
StructArray as ArrowStructArray,
},
datatypes::{FieldRef as ArrowFieldRef, Fields as ArrowFields, Schema as ArrowSchema},
datatypes::Fields as ArrowFields,
};

use re_arrow_util::{into_arrow_ref, ArrowArrayDowncastRef};
use re_log_types::EntityPath;
use re_types_core::ChunkId;

use crate::{
chunk_schema::InvalidChunkSchema, ArrowBatchMetadata, ChunkSchema, ComponentColumnDescriptor,
IndexColumnDescriptor, RowIdColumnDescriptor, WrongDatatypeError,
};
use crate::{ChunkSchema, RowIdColumnDescriptor, SorbetBatch, SorbetError, WrongDatatypeError};

#[derive(thiserror::Error, Debug)]
pub enum MismatchedChunkSchemaError {
Expand All @@ -39,7 +35,7 @@ impl MismatchedChunkSchemaError {
#[derive(Debug, Clone)]
pub struct ChunkBatch {
schema: ChunkSchema,
batch: ArrowRecordBatch,
sorbet_batch: SorbetBatch,
}

impl ChunkBatch {
Expand All @@ -48,66 +44,13 @@ impl ChunkBatch {
row_ids: ArrowArrayRef,
index_arrays: Vec<ArrowArrayRef>,
data_arrays: Vec<ArrowArrayRef>,
) -> Result<Self, MismatchedChunkSchemaError> {
let row_count = row_ids.len();

WrongDatatypeError::compare_expected_actual(
&schema.row_id_column.datatype(),
row_ids.data_type(),
)?;

if index_arrays.len() != schema.index_columns.len() {
return Err(MismatchedChunkSchemaError::custom(format!(
"Schema had {} index columns, but got {}",
schema.index_columns.len(),
index_arrays.len()
)));
}
for (schema, array) in itertools::izip!(&schema.index_columns, &index_arrays) {
WrongDatatypeError::compare_expected_actual(schema.datatype(), array.data_type())?;
if array.len() != row_count {
return Err(MismatchedChunkSchemaError::custom(format!(
"Index column {:?} had {} rows, but we got {} row IDs",
schema.name(),
array.len(),
row_count
)));
}
}

if data_arrays.len() != schema.data_columns.len() {
return Err(MismatchedChunkSchemaError::custom(format!(
"Schema had {} data columns, but got {}",
schema.data_columns.len(),
data_arrays.len()
)));
}
for (schema, array) in itertools::izip!(&schema.data_columns, &data_arrays) {
WrongDatatypeError::compare_expected_actual(&schema.store_datatype, array.data_type())?;
if array.len() != row_count {
return Err(MismatchedChunkSchemaError::custom(format!(
"Data column {:?} had {} rows, but we got {} row IDs",
schema.column_name(crate::BatchType::Chunk),
array.len(),
row_count
)));
}
}

let arrow_columns = itertools::chain!(Some(row_ids), index_arrays, data_arrays).collect();

let batch = ArrowRecordBatch::try_new_with_options(
std::sync::Arc::new(ArrowSchema::from(&schema)),
arrow_columns,
&RecordBatchOptions::default().with_row_count(Some(row_count)),
)
.map_err(|err| {
MismatchedChunkSchemaError::custom(format!(
"Failed to create arrow record batch: {err}"
))
})?;

Ok(Self { schema, batch })
) -> Result<Self, SorbetError> {
Self::try_from(SorbetBatch::try_new(
schema.into(),
Some(row_ids),
index_arrays,
data_arrays,
)?)
}
}

Expand All @@ -130,58 +73,21 @@ impl ChunkBatch {
self.schema.entity_path()
}

/// The heap size of this chunk in bytes, if known.
#[inline]
pub fn heap_size_bytes(&self) -> Option<u64> {
self.schema.heap_size_bytes()
}

/// Are we sorted by the row id column?
#[inline]
pub fn is_sorted(&self) -> bool {
self.schema.is_sorted()
}

#[inline]
pub fn fields(&self) -> &ArrowFields {
&self.schema_ref().fields
}

#[inline]
pub fn arrow_bacth_metadata(&self) -> &ArrowBatchMetadata {
&self.batch.schema_ref().metadata
}

/// The `RowId` column.
pub fn row_id_column(&self) -> (&RowIdColumnDescriptor, &ArrowStructArray) {
// The first column is always the row IDs.
(
&self.schema.row_id_column,
self.batch.columns()[0]
self.schema.row_id_column(),
self.columns()[0]
.as_struct_opt()
.expect("Row IDs should be encoded as struct"),
)
}

/// The columns of the indices (timelines).
pub fn index_columns(&self) -> impl Iterator<Item = (&IndexColumnDescriptor, &ArrowArrayRef)> {
itertools::izip!(
&self.schema.index_columns,
self.batch.columns().iter().skip(1) // skip row IDs
)
}

/// The columns of the indices (timelines).
pub fn data_columns(
&self,
) -> impl Iterator<Item = (&ComponentColumnDescriptor, &ArrowArrayRef)> {
itertools::izip!(
&self.schema.data_columns,
self.batch
.columns()
.iter()
.skip(1 + self.schema.index_columns.len()) // skip row IDs and indices
)
}
}

impl std::fmt::Display for ChunkBatch {
Expand All @@ -191,101 +97,59 @@ impl std::fmt::Display for ChunkBatch {
}
}

impl AsRef<ArrowRecordBatch> for ChunkBatch {
impl AsRef<SorbetBatch> for ChunkBatch {
#[inline]
fn as_ref(&self) -> &ArrowRecordBatch {
&self.batch
fn as_ref(&self) -> &SorbetBatch {
&self.sorbet_batch
}
}

impl std::ops::Deref for ChunkBatch {
type Target = ArrowRecordBatch;
type Target = SorbetBatch;

#[inline]
fn deref(&self) -> &ArrowRecordBatch {
&self.batch
fn deref(&self) -> &SorbetBatch {
&self.sorbet_batch
}
}

impl From<ChunkBatch> for ArrowRecordBatch {
#[inline]
fn from(chunk: ChunkBatch) -> Self {
chunk.batch
chunk.sorbet_batch.into()
}
}

impl From<&ChunkBatch> for ArrowRecordBatch {
#[inline]
fn from(chunk: &ChunkBatch) -> Self {
chunk.batch.clone()
chunk.sorbet_batch.clone().into()
}
}

impl TryFrom<&ArrowRecordBatch> for ChunkBatch {
type Error = InvalidChunkSchema;
type Error = SorbetError;

/// Will automatically wrap data columns in `ListArrays` if they are not already.
fn try_from(batch: &ArrowRecordBatch) -> Result<Self, Self::Error> {
re_tracing::profile_function!();

let batch = make_all_data_columns_list_arrays(batch);

let chunk_schema = ChunkSchema::try_from(batch.schema_ref().as_ref())?;
Self::try_from(SorbetBatch::try_from(batch)?)
}
}

for (field, column) in itertools::izip!(chunk_schema.arrow_fields(), batch.columns()) {
debug_assert_eq!(field.data_type(), column.data_type());
}
impl TryFrom<SorbetBatch> for ChunkBatch {
type Error = SorbetError;

// Extend with any metadata that might have been missing:
let mut arrow_schema = ArrowSchema::clone(batch.schema_ref().as_ref());
arrow_schema
.metadata
.extend(chunk_schema.arrow_batch_metadata());
/// Will automatically wrap data columns in `ListArrays` if they are not already.
fn try_from(sorbet_batch: SorbetBatch) -> Result<Self, Self::Error> {
re_tracing::profile_function!();

let batch = ArrowRecordBatch::try_new_with_options(
arrow_schema.into(),
batch.columns().to_vec(),
&RecordBatchOptions::default().with_row_count(Some(batch.num_rows())),
)
.expect("Can't fail");
let chunk_schema = ChunkSchema::try_from(sorbet_batch.sorbet_schema().clone())?;

Ok(Self {
schema: chunk_schema,
batch,
sorbet_batch,
})
}
}

/// Make sure all data columns are `ListArrays`.
fn make_all_data_columns_list_arrays(batch: &ArrowRecordBatch) -> ArrowRecordBatch {
re_tracing::profile_function!();

let num_columns = batch.num_columns();
let mut fields: Vec<ArrowFieldRef> = Vec::with_capacity(num_columns);
let mut columns: Vec<ArrowArrayRef> = Vec::with_capacity(num_columns);

for (field, array) in itertools::izip!(batch.schema().fields(), batch.columns()) {
let is_list_array = array.downcast_array_ref::<ArrowListArray>().is_some();
let is_data_column = field
.metadata()
.get("rerun.kind")
.is_some_and(|kind| kind == "data");
if is_data_column && !is_list_array {
let (field, array) = re_arrow_util::wrap_in_list_array(field, array.clone());
fields.push(field.into());
columns.push(into_arrow_ref(array));
} else {
fields.push(field.clone());
columns.push(array.clone());
}
}

let schema = ArrowSchema::new_with_metadata(fields, batch.schema().metadata.clone());

ArrowRecordBatch::try_new_with_options(
schema.into(),
columns,
&RecordBatchOptions::default().with_row_count(Some(batch.num_rows())),
)
.expect("Can't fail")
}
Loading

0 comments on commit 45353c2

Please sign in to comment.