Skip to content

Commit

Permalink
chore: some LayoutReader cleanups (#1623)
Browse files Browse the repository at this point in the history
- Introduced `PollRead<T>` patterned after `std::task::Poll`
- Added an enum for pruning to make it more clear than true/false (is
true must prune or must read? etc.)
- Implemented short-circuiting for pruning in ColumnarLayoutReader
  • Loading branch information
a10y authored Dec 10, 2024
1 parent e556ee2 commit 90a23de
Show file tree
Hide file tree
Showing 9 changed files with 233 additions and 206 deletions.
16 changes: 8 additions & 8 deletions vortex-file/src/read/buffered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use vortex_array::ArrayData;
use vortex_error::{vortex_err, vortex_panic, VortexExpect, VortexResult};
use vortex_io::{Dispatch, IoDispatcher, VortexReadAt, VortexReadRanges};

use crate::{LayoutMessageCache, LayoutReader, Message, MessageLocator, MessageRead, RowMask};
use crate::{LayoutMessageCache, LayoutReader, Message, MessageLocator, PollRead, RowMask};

const NUM_TO_COALESCE: usize = 8;

Expand All @@ -21,7 +21,7 @@ pub(crate) trait ReadMasked {

/// Read a Layout into a `V`, applying the given bitmask. Only entries corresponding to positions
/// where mask is `true` will be included in the output.
fn read_masked(&self, mask: &RowMask) -> VortexResult<Option<MessageRead<Self::Value>>>;
fn read_masked(&self, mask: &RowMask) -> VortexResult<Option<PollRead<Self::Value>>>;
}

/// Read an array with a [`RowMask`].
Expand All @@ -39,8 +39,8 @@ impl ReadMasked for ReadArray {
type Value = ArrayData;

/// Read given mask out of the reader
fn read_masked(&self, mask: &RowMask) -> VortexResult<Option<MessageRead<ArrayData>>> {
self.layout.read_selection(mask)
fn read_masked(&self, mask: &RowMask) -> VortexResult<Option<PollRead<ArrayData>>> {
self.layout.poll_read(mask)
}
}

Expand Down Expand Up @@ -107,11 +107,11 @@ where
RowMaskState::Pending(pending_mask) => {
if let Some(pending_read) = self.row_mask_reader.read_masked(pending_mask)? {
match pending_read {
MessageRead::ReadMore(m) => {
PollRead::ReadMore(m) => {
to_read.extend(m);
read_more_count += 1;
}
MessageRead::Value(v) => {
PollRead::Value(v) => {
*queued_res = RowMaskState::Ready(v);
}
}
Expand All @@ -130,12 +130,12 @@ where
Poll::Ready(Some(Ok(next_mask))) => {
if let Some(read_result) = self.row_mask_reader.read_masked(&next_mask)? {
match read_result {
MessageRead::ReadMore(m) => {
PollRead::ReadMore(m) => {
self.queued.push_back(RowMaskState::Pending(next_mask));
to_read.extend(m);
read_more_count += 1;
}
MessageRead::Value(v) => {
PollRead::Value(v) => {
self.queued.push_back(RowMaskState::Ready(v));
}
}
Expand Down
Loading

0 comments on commit 90a23de

Please sign in to comment.