Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/develop' into aduffy/fix-bench
Browse files Browse the repository at this point in the history
  • Loading branch information
a10y committed Jun 11, 2024
2 parents e8e5557 + 0e83666 commit 9939bcc
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 262 deletions.
5 changes: 5 additions & 0 deletions vortex-ipc/src/stream_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ impl<R: VortexRead> StreamArrayReader<R> {
Ok(self)
}

/// Retrieve the loaded view_context
pub fn view_context(&self) -> Option<Arc<ViewContext>> {
self.view_context.clone()
}

pub fn with_dtype(self, dtype: DType) -> Self {
assert!(self.dtype.is_none(), "DType already set");
Self {
Expand Down
15 changes: 15 additions & 0 deletions vortex-ipc/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use futures_util::{Stream, TryStreamExt};
use vortex::array::chunked::ChunkedArray;
use vortex::stream::ArrayStream;
use vortex::{Array, IntoArrayData, ViewContext};
use vortex_buffer::Buffer;
use vortex_dtype::DType;
use vortex_error::{vortex_bail, VortexResult};

Expand All @@ -14,6 +15,7 @@ pub struct ArrayWriter<W: VortexWrite> {

view_ctx_range: Option<ByteRange>,
array_layouts: Vec<ArrayLayout>,
page_ranges: Vec<ByteRange>,
}

impl<W: VortexWrite> ArrayWriter<W> {
Expand All @@ -23,6 +25,7 @@ impl<W: VortexWrite> ArrayWriter<W> {
view_ctx,
view_ctx_range: None,
array_layouts: vec![],
page_ranges: vec![],
}
}

Expand All @@ -34,6 +37,10 @@ impl<W: VortexWrite> ArrayWriter<W> {
&self.array_layouts
}

pub fn page_ranges(&self) -> &[ByteRange] {
&self.page_ranges
}

pub fn into_inner(self) -> W {
self.msgs.into_inner()
}
Expand Down Expand Up @@ -102,6 +109,14 @@ impl<W: VortexWrite> ArrayWriter<W> {
self.write_array_stream(array.into_array_stream()).await
}
}

pub async fn write_page(mut self, buffer: Buffer) -> VortexResult<Self> {
let begin = self.msgs.tell();
self.msgs.write_page(buffer).await?;
let end = self.msgs.tell();
self.page_ranges.push(ByteRange { begin, end });
Ok(self)
}
}

#[derive(Copy, Clone, Debug)]
Expand Down
19 changes: 18 additions & 1 deletion vortex-scalar/proto/vortex/scalar/scalar.proto
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,29 @@ package vortex.scalar;

import "vortex/dtype/dtype.proto";
import "google/protobuf/struct.proto";
import "google/protobuf/wrappers.proto";

message Scalar {
vortex.dtype.DType dtype = 1;
ScalarValue value = 2;
}

message ScalarValue {
google.protobuf.Value value = 1;
oneof kind {
google.protobuf.NullValue null_value = 1;
bool bool_value = 2;
int32 int32_value = 3;
int64 int64_value = 4;
uint32 uint32_value = 5;
uint64 uint64_value = 6;
float float_value = 7;
double double_value = 8;
string string_value = 9;
bytes bytes_value = 10;
ListValue list_value = 12;
}
}

message ListValue {
repeated ScalarValue values = 1;
}
Loading

0 comments on commit 9939bcc

Please sign in to comment.