diff --git a/vortex-file/src/read/buffered.rs b/vortex-file/src/read/buffered.rs index 365e8305c6..e207498dd4 100644 --- a/vortex-file/src/read/buffered.rs +++ b/vortex-file/src/read/buffered.rs @@ -12,7 +12,7 @@ use vortex_array::ArrayData; use vortex_error::{vortex_err, vortex_panic, VortexExpect, VortexResult}; use vortex_io::{Dispatch, IoDispatcher, VortexReadAt, VortexReadRanges}; -use crate::{LayoutMessageCache, LayoutReader, Message, MessageLocator, MessageRead, RowMask}; +use crate::{LayoutMessageCache, LayoutReader, Message, MessageLocator, PollRead, RowMask}; const NUM_TO_COALESCE: usize = 8; @@ -21,7 +21,7 @@ pub(crate) trait ReadMasked { /// Read a Layout into a `V`, applying the given bitmask. Only entries corresponding to positions /// where mask is `true` will be included in the output. - fn read_masked(&self, mask: &RowMask) -> VortexResult>>; + fn read_masked(&self, mask: &RowMask) -> VortexResult>>; } /// Read an array with a [`RowMask`]. @@ -39,8 +39,8 @@ impl ReadMasked for ReadArray { type Value = ArrayData; /// Read given mask out of the reader - fn read_masked(&self, mask: &RowMask) -> VortexResult>> { - self.layout.read_selection(mask) + fn read_masked(&self, mask: &RowMask) -> VortexResult>> { + self.layout.poll_read(mask) } } @@ -107,11 +107,11 @@ where RowMaskState::Pending(pending_mask) => { if let Some(pending_read) = self.row_mask_reader.read_masked(pending_mask)? { match pending_read { - MessageRead::ReadMore(m) => { + PollRead::ReadMore(m) => { to_read.extend(m); read_more_count += 1; } - MessageRead::Value(v) => { + PollRead::Value(v) => { *queued_res = RowMaskState::Ready(v); } } @@ -130,12 +130,12 @@ where Poll::Ready(Some(Ok(next_mask))) => { if let Some(read_result) = self.row_mask_reader.read_masked(&next_mask)? { match read_result { - MessageRead::ReadMore(m) => { + PollRead::ReadMore(m) => { self.queued.push_back(RowMaskState::Pending(next_mask)); to_read.extend(m); read_more_count += 1; } - MessageRead::Value(v) => { + PollRead::Value(v) => { self.queued.push_back(RowMaskState::Ready(v)); } } diff --git a/vortex-file/src/read/layouts/chunked.rs b/vortex-file/src/read/layouts/chunked.rs index 39b1be0027..4f28a85611 100644 --- a/vortex-file/src/read/layouts/chunked.rs +++ b/vortex-file/src/read/layouts/chunked.rs @@ -8,7 +8,9 @@ use vortex_array::compute::{scalar_at, take}; use vortex_array::stats::{stats_from_bitset_bytes, ArrayStatistics as _, Stat}; use vortex_array::{ArrayDType, ArrayData, IntoArrayData}; use vortex_dtype::{DType, Nullability, StructDType}; -use vortex_error::{vortex_bail, vortex_err, vortex_panic, VortexExpect as _, VortexResult}; +use vortex_error::{ + vortex_bail, vortex_err, vortex_panic, VortexExpect as _, VortexResult, VortexUnwrap, +}; use vortex_expr::Select; use vortex_flatbuffers::footer as fb; @@ -17,14 +19,14 @@ use crate::pruning::PruningPredicate; use crate::read::cache::RelativeLayoutCache; use crate::read::mask::RowMask; use crate::{ - BatchRead, Layout, LayoutDeserializer, LayoutId, LayoutPartId, LayoutReader, LazyDType, - MessageLocator, MetadataRead, PruningRead, Scan, CHUNKED_LAYOUT_ID, + Layout, LayoutDeserializer, LayoutId, LayoutPartId, LayoutReader, LazyDType, MessageLocator, + PollRead, Prune, Scan, CHUNKED_LAYOUT_ID, }; #[derive(Default, Debug)] pub struct ChunkedLayout; -/// In memory representation of Chunked NestedLayout. +/// In-memory representation of Chunked layout. /// /// First child in the list is the metadata table /// Subsequent children are consecutive chunks of this layout @@ -66,76 +68,67 @@ struct ChunkedLayoutBuilder<'a> { } impl ChunkedLayoutBuilder<'_> { - fn metadata_layout(&self) -> VortexResult>> { - self.layout - .metadata() - .map(|m| { - let set_stats = stats_from_bitset_bytes(m.bytes()); - let metadata_fb = self - .layout - .children() - .ok_or_else(|| vortex_err!("Must have children if layout has metadata"))? - .get(0); - self.layout_builder.read_layout( - metadata_fb, - Scan::new(Some(Arc::new(Select::include( - set_stats.iter().map(|s| s.to_string().into()).collect(), - )))), - self.message_cache.relative( - METADATA_LAYOUT_PART_ID, - Arc::new(LazyDType::from_dtype(stats_table_dtype( - &set_stats, - self.message_cache.dtype().value()?, - ))), - ), - ) - }) - .transpose() - } + pub fn build(&self) -> VortexResult { + // If the metadata bytes of the layout are present, interpret them as a bitset of `Stat`s, + // and read the first child layout as a table with each stat as a column and each row + // as the stat value for the N-th chunk. + let stats_layout = if let Some(metadata) = self.layout.metadata() { + let set_stats = stats_from_bitset_bytes(metadata.bytes()); + let metadata_fb = self + .layout + .children() + .ok_or_else(|| vortex_err!("Must have children if layout has metadata"))? + .get(0); + Some(self.layout_builder.read_layout( + metadata_fb, + Scan::new(Some(Arc::new(Select::include( + set_stats.iter().map(|s| s.to_string().into()).collect(), + )))), + self.message_cache.relative( + METADATA_LAYOUT_PART_ID, + Arc::new(LazyDType::from_dtype(stats_table_dtype( + &set_stats, + self.message_cache.dtype().value()?, + ))), + ), + )?) + } else { + None + }; - fn children(&self) -> impl Iterator { - self.layout + // Prepare the layouts for each of the children (chunks). + // This will start at the 0th child if there are no chunk stats, and the 1st child otherwise. + let chunk_layouts: Vec = self + .layout .children() .unwrap_or_default() - .iter() + .into_iter() .enumerate() - .skip(if self.layout.metadata().is_some() { - 1 - } else { - 0 - }) - } - - fn children_ranges(&self) -> Vec<(usize, usize)> { - self.children() - .map(|(_, c)| c.row_count()) - .scan(0u64, |acc, row_count| { - let current = *acc; - *acc += row_count; - Some((current as usize, *acc as usize)) - }) - .collect::>() - } - - fn children_layouts(&self) -> VortexResult> { - self.children() - .zip_eq(self.children_ranges()) - .map(|((i, c), (begin, end))| { - let layout = self.layout_builder.read_layout( - c, - self.scan.clone(), - self.message_cache - .relative(i as u16, self.message_cache.dtype().clone()), - )?; - Ok(((begin, end), layout)) + .skip(if stats_layout.is_some() { 1 } else { 0 }) + .scan(0usize, |total_rows, (child_idx, next_chunk)| { + // Calculate the start/end range of the chunk in the global row offset range. + let chunk_start = *total_rows; + *total_rows += usize::try_from(next_chunk.row_count()).vortex_expect("row_count"); + let chunk_end = *total_rows; + + // Relative layout cache for the `child_idx`-th child. + let child_cache = self.message_cache.relative( + child_idx.try_into().vortex_unwrap(), + self.message_cache.dtype().clone(), + ); + + // Construct the ranged layout. + Some( + self.layout_builder + .read_layout(next_chunk, self.scan.clone(), child_cache) + .map(|layout| RangedLayoutReader((chunk_start, chunk_end), layout)), + ) }) - .collect::>>() - } + .try_collect()?; - pub fn build(&self) -> VortexResult { Ok(ChunkedLayoutReader::new( - self.children_layouts()?, - self.metadata_layout()?, + chunk_layouts, + stats_layout, self.scan.clone(), )) } @@ -207,25 +200,25 @@ impl ChunkedLayoutReader { let (layout_idxs, in_progress_range) = in_progress_guard .entry((mask.begin(), mask.end())) .or_insert_with(|| { - let layouts_in_range = self.layouts_in_range_by_index(mask.begin(), mask.end()); + let layouts_in_range = self.children_for_row_range(mask.begin(), mask.end()); let num_layouts = layouts_in_range.len(); (layouts_in_range, vec![ChildRead::default(); num_layouts]) }); let mut messages_to_fetch = Vec::new(); - for (((begin, end), layout), array_slot) in layout_idxs + for (RangedLayoutReader((begin, end), layout), array_slot) in layout_idxs .iter() .map(|i| &self.layouts[*i]) .zip(in_progress_range) .filter(|(_, cr)| !cr.finished()) { let layout_selection = mask.slice(*begin, *end)?.shift(*begin)?; - if let Some(rr) = layout.read_selection(&layout_selection)? { + if let Some(rr) = layout.poll_read(&layout_selection)? { match rr { - BatchRead::ReadMore(m) => { + PollRead::ReadMore(m) => { messages_to_fetch.extend(m); } - BatchRead::Value(a) => { + PollRead::Value(a) => { *array_slot = ChildRead::Finished(Some(a)); } } @@ -245,11 +238,13 @@ impl ChunkedLayoutReader { self.metadata_layout.as_deref() } - fn layouts_in_range_by_index(&self, begin: usize, end: usize) -> Vec { + /// Return the index for all chunks which contain rows begin + /// `begin` (inclusive) and `end` (exclusive). + fn children_for_row_range(&self, begin: usize, end: usize) -> Vec { self.layouts .iter() .enumerate() - .filter_map(|(i, ((child_begin, child_end), _))| { + .filter_map(|(i, RangedLayoutReader((child_begin, child_end), _))| { (end > *child_begin && begin < *child_end).then_some(i) }) .collect::>() @@ -260,9 +255,9 @@ impl ChunkedLayoutReader { chunk_prunability: &ArrayData, begin: usize, end: usize, - ) -> VortexResult { + ) -> VortexResult { let layouts = self - .layouts_in_range_by_index(begin, end) + .children_for_row_range(begin, end) .iter() .map(|x| *x as u64) .collect::>(); @@ -273,7 +268,7 @@ impl ChunkedLayoutReader { .compute_as::(Stat::IsConstant) .vortex_expect("all boolean arrays must support is constant") { - return Ok(false); + return Ok(Prune::CannotPrune); } // if the expression is constant null, this slice of chunks is not prunable @@ -281,22 +276,26 @@ impl ChunkedLayoutReader { .as_bool() .value() .unwrap_or(false); - Ok(prunable) + Ok(if prunable { + Prune::CanPrune + } else { + Prune::CannotPrune + }) } } impl LayoutReader for ChunkedLayoutReader { fn add_splits(&self, row_offset: usize, splits: &mut BTreeSet) -> VortexResult<()> { - for ((begin, _), child) in &self.layouts { + for RangedLayoutReader((begin, _), child) in &self.layouts { child.add_splits(row_offset + begin, splits)? } Ok(()) } - fn read_selection(&self, selector: &RowMask) -> VortexResult> { + fn poll_read(&self, selector: &RowMask) -> VortexResult>> { let messages_to_fetch = self.buffer_read(selector)?; if !messages_to_fetch.is_empty() { - return Ok(Some(BatchRead::ReadMore(messages_to_fetch))); + return Ok(Some(PollRead::ReadMore(messages_to_fetch))); } if let Some((_, arrays_in_range)) = self @@ -310,10 +309,10 @@ impl LayoutReader for ChunkedLayoutReader { .filter_map(ChildRead::into_value) .collect::>(); match child_arrays.len() { - 0 | 1 => Ok(child_arrays.pop().map(BatchRead::Value)), + 0 | 1 => Ok(child_arrays.pop().map(PollRead::Value)), _ => { let dtype = child_arrays[0].dtype().clone(); - Ok(Some(BatchRead::Value( + Ok(Some(PollRead::Value( ChunkedArray::try_new(child_arrays, dtype)?.into_array(), ))) } @@ -323,34 +322,31 @@ impl LayoutReader for ChunkedLayoutReader { } } - fn read_metadata(&self) -> VortexResult> { - match self.metadata_layout() { - None => Ok(None), - Some(metadata_layout) => { - if let Some(md) = self.cached_metadata.get() { - return Ok(Some(MetadataRead::Value(vec![Some(md.clone())]))); - } + fn poll_metadata(&self) -> VortexResult>>>> { + // Every chunked layout contains an optional "metadata" layout, which contains the + // per-chunk statistics table. + let Some(metadata_layout) = self.metadata_layout() else { + return Ok(None); + }; - match metadata_layout - .read_selection(&RowMask::new_valid_between(0, self.n_chunks()))? - { - Some(BatchRead::Value(array)) => { - // We don't care if the write failed - _ = self.cached_metadata.set(array.clone()); - Ok(Some(MetadataRead::Value(vec![Some(array)]))) - } - Some(BatchRead::ReadMore(messages)) => { - Ok(Some(MetadataRead::ReadMore(messages))) - } - None => Ok(None), - } + if let Some(md) = self.cached_metadata.get() { + return Ok(Some(PollRead::Value(vec![Some(md.clone())]))); + } + + match metadata_layout.poll_read(&RowMask::new_valid_between(0, self.n_chunks()))? { + Some(PollRead::Value(array)) => { + // We don't care if the write failed + _ = self.cached_metadata.set(array.clone()); + Ok(Some(PollRead::Value(vec![Some(array)]))) } + Some(PollRead::ReadMore(messages)) => Ok(Some(PollRead::ReadMore(messages))), + None => Ok(None), } } - fn can_prune(&self, begin: usize, end: usize) -> VortexResult { + fn poll_prune(&self, begin: usize, end: usize) -> VortexResult> { if let Some(chunk_prunability) = self.cached_prunability.get() { - return Ok(PruningRead::Value(self.can_prune_overlapping_chunks( + return Ok(PollRead::Value(self.can_prune_overlapping_chunks( chunk_prunability, begin, end, @@ -358,13 +354,13 @@ impl LayoutReader for ChunkedLayoutReader { } let Some(predicate_expression) = self.scan.expr.as_ref() else { - return Ok(PruningRead::Value(false)); + return Ok(PollRead::Value(Prune::CannotPrune)); }; - if let Some(mr) = self.read_metadata()? { + if let Some(mr) = self.poll_metadata()? { Ok(match mr { - MetadataRead::ReadMore(messages) => PruningRead::ReadMore(messages), - MetadataRead::Value(mut batches) => { + PollRead::ReadMore(messages) => PollRead::ReadMore(messages), + PollRead::Value(mut batches) => { if batches.len() != 1 { vortex_bail!("chunked layout should have exactly one metadata array"); } @@ -381,14 +377,14 @@ impl LayoutReader for ChunkedLayoutReader { let is_selection_pruned = self.can_prune_overlapping_chunks(&chunk_prunability, begin, end)?; let _ = self.cached_prunability.set(chunk_prunability); // Losing the race is fine - PruningRead::Value(is_selection_pruned) + PollRead::Value(is_selection_pruned) } - None => PruningRead::Value(false), + None => PollRead::Value(Prune::CannotPrune), } } }) } else { - Ok(PruningRead::Value(false)) + Ok(PollRead::Value(Prune::CannotPrune)) } } } diff --git a/vortex-file/src/read/layouts/columnar.rs b/vortex-file/src/read/layouts/columnar.rs index fe97c7b866..2cb6e04e45 100644 --- a/vortex-file/src/read/layouts/columnar.rs +++ b/vortex-file/src/read/layouts/columnar.rs @@ -17,8 +17,8 @@ use crate::read::cache::{LazyDType, RelativeLayoutCache}; use crate::read::expr_project::expr_project; use crate::read::mask::RowMask; use crate::{ - BatchRead, Layout, LayoutDeserializer, LayoutId, LayoutReader, MetadataRead, PruningRead, - RowFilter, Scan, COLUMNAR_LAYOUT_ID, + Layout, LayoutDeserializer, LayoutId, LayoutReader, PollRead, Prune, RowFilter, Scan, + COLUMNAR_LAYOUT_ID, }; #[derive(Debug)] @@ -179,7 +179,7 @@ impl ColumnarLayoutBuilder<'_> { } type InProgressRanges = RwLock>>>; -type InProgressPrunes = RwLock>>>; +type InProgressPrunes = RwLock>>>; /// In memory representation of Columnar NestedLayout. /// @@ -228,7 +228,7 @@ impl LayoutReader for ColumnarLayoutReader { Ok(()) } - fn read_selection(&self, selection: &RowMask) -> VortexResult> { + fn poll_read(&self, selection: &RowMask) -> VortexResult>> { let mut in_progress_guard = self .in_progress_ranges .write() @@ -243,12 +243,12 @@ impl LayoutReader for ColumnarLayoutReader { .enumerate() .filter(|(_, a)| a.is_none()) { - match self.children[i].read_selection(selection)? { + match self.children[i].poll_read(selection)? { Some(rr) => match rr { - BatchRead::ReadMore(message) => { + PollRead::ReadMore(message) => { messages.extend(message); } - BatchRead::Value(arr) => { + PollRead::Value(arr) => { if self.shortcircuit_siblings && arr .statistics() @@ -292,14 +292,14 @@ impl LayoutReader for ColumnarLayoutReader { .as_ref() .map(|e| e.evaluate(&array)) .unwrap_or_else(|| Ok(array)) - .map(BatchRead::Value) + .map(PollRead::Value) .map(Some) } else { - Ok(Some(BatchRead::ReadMore(messages))) + Ok(Some(PollRead::ReadMore(messages))) } } - fn read_metadata(&self) -> VortexResult> { + fn poll_metadata(&self) -> VortexResult>>>> { let mut in_progress_metadata = self .in_progress_metadata .write() @@ -307,15 +307,15 @@ impl LayoutReader for ColumnarLayoutReader { let mut messages = Vec::default(); for (name, child_reader) in self.names.iter().zip(self.children.iter()) { - if let Some(child_metadata) = child_reader.read_metadata()? { + if let Some(child_metadata) = child_reader.poll_metadata()? { match child_metadata { - MetadataRead::Value(data) => { + PollRead::Value(data) => { if data.len() != 1 { vortex_bail!("expected exactly one metadata array per-child"); } in_progress_metadata.insert(name.clone(), data[0].clone()); } - MetadataRead::ReadMore(rm) => { + PollRead::ReadMore(rm) => { messages.extend(rm); } } @@ -332,49 +332,63 @@ impl LayoutReader for ColumnarLayoutReader { .map(|name| in_progress_metadata[name].clone()) // TODO(Adam): Some columns might not have statistics .collect::>(); - Ok(Some(MetadataRead::Value(child_arrays))) + Ok(Some(PollRead::Value(child_arrays))) } else { - Ok(Some(MetadataRead::ReadMore(messages))) + Ok(Some(PollRead::ReadMore(messages))) } } - fn can_prune(&self, begin: usize, end: usize) -> VortexResult { + fn poll_prune(&self, begin: usize, end: usize) -> VortexResult> { let mut in_progress_guard = self .in_progress_prunes .write() .unwrap_or_else(|poison| vortex_panic!("Failed to write to message cache: {poison}")); let selection_range = (begin, end); - let in_progress_selection = in_progress_guard + let column_prunes = in_progress_guard .entry(selection_range) .or_insert_with(|| vec![None; self.children.len()]); let mut messages = Vec::new(); - for (i, can_prune_child) in in_progress_selection + + // Check all children to see if they can be pruned. + for (i, child_prune_state) in column_prunes .iter_mut() .enumerate() .filter(|(_, a)| a.is_none()) { - match self.children[i].can_prune(begin, end)? { - PruningRead::ReadMore(message) => messages.extend(message), - PruningRead::Value(is_pruned) => *can_prune_child = Some(is_pruned), + match self.children[i].poll_prune(begin, end)? { + PollRead::ReadMore(message) => messages.extend(message), + PollRead::Value(is_pruned) => { + // If we are short-circuiting and one of the child prunes have been + // pruned, we can immediately prune the range. + if is_pruned == Prune::CanPrune && self.shortcircuit_siblings { + // If any of the children prune, we short-circuit the entire pruning operation. + // We clear the in-progress pruning state and immediately prune this entire + // row range. + in_progress_guard.remove(&selection_range).ok_or_else(|| { + vortex_err!("There were no pruning results and no messages") + })?; + return Ok(PollRead::Value(Prune::CanPrune)); + } else { + // Update the state. + *child_prune_state = Some(is_pruned) + } + } } } - if messages.is_empty() { - // Each child's scan expression evaluates to false if and only if our scan expression - // would evaluate to false. Examples: - // - // 1. `x = 3 AND y = 3`: The child scan expressions will be `x = 3` and `y = 3`. If - // either expression is false, the row must be excluded. - // - // 2. `x = 3 OR y = 3`: The child scan expressions will both be None. - let any_child_is_pruned = in_progress_guard - .remove(&selection_range) - .ok_or_else(|| vortex_err!("There were no can_prune results and no messages"))? - .into_iter() - .any(|x| x.vortex_expect("all pruned-ness should be available")); - Ok(PruningRead::Value(any_child_is_pruned)) + if !messages.is_empty() { + // Read more + Ok(PollRead::ReadMore(messages)) } else { - Ok(PruningRead::ReadMore(messages)) + // If any children were pruned, we can short-circuit pruning of the child arrays. + let any_can_prune = column_prunes.iter().any(|col_prune| { + col_prune.vortex_expect("prune state must be initialized") == Prune::CanPrune + }); + Ok(PollRead::Value(if any_can_prune { + Prune::CanPrune + } else { + Prune::CannotPrune + })) } } } diff --git a/vortex-file/src/read/layouts/flat.rs b/vortex-file/src/read/layouts/flat.rs index 75917b6241..59e47b4b1d 100644 --- a/vortex-file/src/read/layouts/flat.rs +++ b/vortex-file/src/read/layouts/flat.rs @@ -11,8 +11,8 @@ use vortex_ipc::stream_writer::ByteRange; use crate::read::cache::RelativeLayoutCache; use crate::read::mask::RowMask; use crate::{ - BatchRead, Layout, LayoutDeserializer, LayoutId, LayoutReader, MessageLocator, MetadataRead, - PruningRead, Scan, FLAT_LAYOUT_ID, + Layout, LayoutDeserializer, LayoutId, LayoutReader, MessageLocator, PollRead, Scan, + FLAT_LAYOUT_ID, }; #[derive(Debug)] @@ -91,13 +91,13 @@ impl LayoutReader for FlatLayoutReader { Ok(()) } - fn read_selection(&self, selection: &RowMask) -> VortexResult> { + fn poll_read(&self, selection: &RowMask) -> VortexResult>> { if let Some(buf) = self.message_cache.get(&[]) { let array = self.array_from_bytes(buf)?; selection .filter_array(array)? .map(|s| { - Ok(BatchRead::Value( + Ok(PollRead::Value( self.scan .expr .as_ref() @@ -108,17 +108,9 @@ impl LayoutReader for FlatLayoutReader { }) .transpose() } else { - Ok(Some(BatchRead::ReadMore(vec![self.own_message()]))) + Ok(Some(PollRead::ReadMore(vec![self.own_message()]))) } } - - fn read_metadata(&self) -> VortexResult> { - Ok(None) - } - - fn can_prune(&self, _begin: usize, _end: usize) -> VortexResult { - Ok(PruningRead::Value(false)) - } } #[cfg(test)] diff --git a/vortex-file/src/read/layouts/mod.rs b/vortex-file/src/read/layouts/mod.rs index 338e7f914c..50d78a7a5f 100644 --- a/vortex-file/src/read/layouts/mod.rs +++ b/vortex-file/src/read/layouts/mod.rs @@ -10,4 +10,6 @@ pub use flat::FlatLayout; use crate::LayoutReader; -type RangedLayoutReader = ((usize, usize), Box); +// TODO(aduffy): make this container more useful +#[derive(Debug)] +pub struct RangedLayoutReader((usize, usize), Box); diff --git a/vortex-file/src/read/layouts/test_read.rs b/vortex-file/src/read/layouts/test_read.rs index e6dcd785a6..8de47b7d5a 100644 --- a/vortex-file/src/read/layouts/test_read.rs +++ b/vortex-file/src/read/layouts/test_read.rs @@ -7,7 +7,7 @@ use vortex_error::VortexUnwrap; use crate::read::mask::RowMask; use crate::read::splits::FixedSplitIterator; -use crate::{BatchRead, LayoutMessageCache, LayoutReader, MessageLocator}; +use crate::{LayoutMessageCache, LayoutReader, MessageLocator, PollRead}; fn layout_splits( layouts: &[&mut dyn LayoutReader], @@ -28,15 +28,15 @@ pub fn read_layout_data( buf: &Bytes, selector: &RowMask, ) -> Option { - while let Some(rr) = layout.read_selection(selector).unwrap() { + while let Some(rr) = layout.poll_read(selector).unwrap() { match rr { - BatchRead::ReadMore(m) => { + PollRead::ReadMore(m) => { let mut write_cache_guard = cache.write().unwrap(); for MessageLocator(id, range) in m { write_cache_guard.set(id, buf.slice(range.to_range())); } } - BatchRead::Value(a) => return Some(a), + PollRead::Value(a) => return Some(a), } } None @@ -48,15 +48,15 @@ pub fn read_filters( buf: &Bytes, selector: &RowMask, ) -> Option { - while let Some(rr) = layout.read_selection(selector).unwrap() { + while let Some(rr) = layout.poll_read(selector).unwrap() { match rr { - BatchRead::ReadMore(m) => { + PollRead::ReadMore(m) => { let mut write_cache_guard = cache.write().unwrap(); for MessageLocator(id, range) in m { write_cache_guard.set(id, buf.slice(range.to_range())); } } - BatchRead::Value(a) => { + PollRead::Value(a) => { return Some( RowMask::from_mask_array(&a, selector.begin(), selector.end()).unwrap(), ); diff --git a/vortex-file/src/read/metadata.rs b/vortex-file/src/read/metadata.rs index 96ecbb8954..c18dddb48d 100644 --- a/vortex-file/src/read/metadata.rs +++ b/vortex-file/src/read/metadata.rs @@ -8,7 +8,7 @@ use vortex_io::{IoDispatcher, VortexReadAt}; use super::{LayoutMessageCache, LayoutReader}; use crate::read::buffered::{BufferedLayoutReader, ReadMasked}; -use crate::{MessageRead, RowMask}; +use crate::{PollRead, RowMask}; struct MetadataMaskReader { layout: Box, @@ -26,8 +26,8 @@ impl ReadMasked for MetadataMaskReader { fn read_masked( &self, _mask: &RowMask, - ) -> VortexResult>>>> { - self.layout.read_metadata() + ) -> VortexResult>>>> { + self.layout.poll_metadata() } } diff --git a/vortex-file/src/read/mod.rs b/vortex-file/src/read/mod.rs index 4e166ee0aa..65526b2309 100644 --- a/vortex-file/src/read/mod.rs +++ b/vortex-file/src/read/mod.rs @@ -65,25 +65,40 @@ pub struct MessageLocator(pub MessageId, pub ByteRange); #[derive(Debug, Clone)] pub struct Message(pub MessageId, pub Bytes); -pub type BatchRead = MessageRead; -pub type SplitRead = MessageRead; -pub type MetadataRead = MessageRead>>; -pub type PruningRead = MessageRead; - +/// A polling interface for reading a value from a [`LayoutReader`]. #[derive(Debug)] -pub enum MessageRead { +pub enum PollRead { ReadMore(Vec), Value(T), } +/// Result type for an attempt to prune rows from a [`LayoutReader`]. +/// +/// The default value is `CannotPrune` so that layouts which do not implement pruning can default +/// to performing full scans of their data. +#[derive(Default, Debug, Clone, Copy, PartialEq, Eq)] +pub enum Prune { + /// It is unsafe for the layout to prune the requested row range. + #[default] + CannotPrune, + /// It is safe for the layout to prune the requested row range. + CanPrune, +} + /// A reader for a layout, a serialized sequence of Vortex arrays. /// -/// Some layouts are _horizontally divisble_: they can read a sub-sequence of rows independently of -/// other sub-sequences. A layout advertises its sub-divisions in its [add_splits][Self::add_splits] -/// method. Any layout which is or contains a chunked layout is horizontally divisble. +/// Some layouts are _horizontally divisible_: they can read a sub-sequence of rows independently of +/// other sub-sequences. A layout advertises its subdivisions in its [add_splits][Self::add_splits] +/// method. Any layout which is or contains a chunked layout is horizontally divisible. +/// +/// The [poll_read][Self::poll_read] method accepts and applies a [RowMask], reading only +/// the subdivisions which contain the selected rows. /// -/// The [read_selection][Self::read_selection] method accepts and applies a [RowMask], reading only -/// the sub-divisions which contain the selected (i.e. masked) rows. +/// # State management +/// +/// Layout readers are **synchronous** and **stateful**. A request to read a given row range may +/// trigger a request for more messages, which will be handled by the caller, placing the messages +/// back into the message cache for this layout as a result. pub trait LayoutReader: Debug + Send { /// Register all horizontal row boundaries of this layout. /// @@ -99,11 +114,19 @@ pub trait LayoutReader: Debug + Send { /// creating the invoked instance of this trait and then call back into this function. /// /// The layout is finished producing data for selection when it returns None - fn read_selection(&self, selector: &RowMask) -> VortexResult>; + fn poll_read(&self, selector: &RowMask) -> VortexResult>>; /// Reads the metadata of the layout, if it exists. - fn read_metadata(&self) -> VortexResult>; + /// + /// `LayoutReader`s can override the default behavior, which is to return no metadata. + fn poll_metadata(&self) -> VortexResult>>>> { + Ok(None) + } - /// Returns true if this range contains no rows passing the filter condition. - fn can_prune(&self, begin: usize, end: usize) -> VortexResult; + /// Introspect to determine if we can prune the given [begin, end) row range. + /// + /// `LayoutReader`s can opt out of the default implementation, which is to not prune. + fn poll_prune(&self, _begin: usize, _end: usize) -> VortexResult> { + Ok(PollRead::Value(Prune::CannotPrune)) + } } diff --git a/vortex-file/src/read/splits.rs b/vortex-file/src/read/splits.rs index d37aa783a8..f340bcbc42 100644 --- a/vortex-file/src/read/splits.rs +++ b/vortex-file/src/read/splits.rs @@ -9,7 +9,7 @@ use vortex_array::stats::ArrayStatistics; use vortex_error::{vortex_bail, VortexResult, VortexUnwrap}; use crate::read::buffered::ReadMasked; -use crate::{BatchRead, LayoutReader, MessageRead, PruningRead, RowMask, SplitRead}; +use crate::{LayoutReader, PollRead, Prune, RowMask}; /// Reads an array out of a [`LayoutReader`] as a [`RowMask`]. /// @@ -29,21 +29,21 @@ impl ReadMasked for ReadRowMask { type Value = RowMask; /// Read given mask out of the reader - fn read_masked(&self, mask: &RowMask) -> VortexResult>> { - let can_prune = self.layout.can_prune(mask.begin(), mask.end())?; + fn read_masked(&self, mask: &RowMask) -> VortexResult>> { + let can_prune = self.layout.poll_prune(mask.begin(), mask.end())?; match can_prune { - PruningRead::ReadMore(messages) => { - return Ok(Some(SplitRead::ReadMore(messages))); + PollRead::ReadMore(messages) => { + return Ok(Some(PollRead::ReadMore(messages))); } - PruningRead::Value(true) => return Ok(None), - PruningRead::Value(false) => {} + PollRead::Value(Prune::CanPrune) => return Ok(None), + PollRead::Value(Prune::CannotPrune) => {} }; - if let Some(rs) = self.layout.read_selection(mask)? { + if let Some(rs) = self.layout.poll_read(mask)? { return match rs { - BatchRead::ReadMore(messages) => Ok(Some(SplitRead::ReadMore(messages))), - BatchRead::Value(batch) => { + PollRead::ReadMore(messages) => Ok(Some(PollRead::ReadMore(messages))), + PollRead::Value(batch) => { // If the mask is all FALSE we can safely discard it if batch .statistics() @@ -54,7 +54,7 @@ impl ReadMasked for ReadRowMask { return Ok(None); } // Combine requested mask with the result of filter read - Ok(Some(SplitRead::Value(mask.and_bitmask(batch)?))) + Ok(Some(PollRead::Value(mask.and_bitmask(batch)?))) } }; }