From b8d3ff227c3ce9e7326fc8b79dcc34f0ee562f91 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Thu, 12 Dec 2024 18:27:41 +0000 Subject: [PATCH] chore: random refactoring/renaming (#1669) --- vortex-file/src/read/builder/mod.rs | 10 +- vortex-file/src/read/layouts/chunked.rs | 18 ++-- vortex-file/src/read/layouts/columnar.rs | 16 +-- vortex-file/src/read/layouts/flat.rs | 10 +- vortex-file/src/read/layouts/test_read.rs | 8 +- vortex-file/src/read/mod.rs | 8 +- vortex-file/src/read/splits.rs | 122 ++++++++-------------- vortex-file/src/read/stream.rs | 21 ++-- vortex-file/src/tests.rs | 2 +- 9 files changed, 95 insertions(+), 120 deletions(-) diff --git a/vortex-file/src/read/builder/mod.rs b/vortex-file/src/read/builder/mod.rs index 73bf33dc63..0559949863 100644 --- a/vortex-file/src/read/builder/mod.rs +++ b/vortex-file/src/read/builder/mod.rs @@ -142,10 +142,10 @@ impl VortexReadBuilder { let message_cache = Arc::new(RwLock::new(LayoutMessageCache::default())); let layout_reader = self.layout_serde.read_layout( initial_read.fb_layout(), - Scan::new(match self.projection { - Projection::All => None, - Projection::Flat(p) => Some(Arc::new(Select::include(p))), - }), + match self.projection { + Projection::All => Scan::empty(), + Projection::Flat(p) => Scan::new(Arc::new(Select::include(p))), + }, RelativeLayoutCache::new(message_cache.clone(), lazy_dtype.clone()), )?; @@ -154,7 +154,7 @@ impl VortexReadBuilder { .map(|row_filter| { self.layout_serde.read_layout( initial_read.fb_layout(), - Scan::new(Some(Arc::new(row_filter))), + Scan::new(Arc::new(row_filter)), RelativeLayoutCache::new(message_cache.clone(), lazy_dtype), ) }) diff --git a/vortex-file/src/read/layouts/chunked.rs b/vortex-file/src/read/layouts/chunked.rs index d312ad3f12..990807828e 100644 --- a/vortex-file/src/read/layouts/chunked.rs +++ b/vortex-file/src/read/layouts/chunked.rs @@ -86,9 +86,9 @@ impl ChunkedLayoutBuilder<'_> { }; Some(self.layout_builder.read_layout( metadata_fb, - Scan::new(Some(Arc::new(Select::include( + Scan::new(Arc::new(Select::include( s.names().iter().map(|s| Field::Name(s.clone())).collect(), - )))), + ))), self.message_cache.relative( METADATA_LAYOUT_PART_ID, Arc::new(LazyDType::from_dtype(stats_dtype)), @@ -289,7 +289,7 @@ impl ChunkedLayoutReader { impl LayoutReader for ChunkedLayoutReader { fn add_splits(&self, row_offset: usize, splits: &mut BTreeSet) -> VortexResult<()> { for RangedLayoutReader((begin, _), child) in &self.layouts { - child.add_splits(row_offset + begin, splits)? + child.add_splits(row_offset + begin, splits)?; } Ok(()) } @@ -474,7 +474,7 @@ mod tests { .unwrap(), ChunkedLayoutBuilder { layout, - scan: Scan::new(None), + scan: Scan::empty(), layout_builder, message_cache: RelativeLayoutCache::new(cache, dtype), } @@ -491,11 +491,11 @@ mod tests { let cache = Arc::new(RwLock::new(LayoutMessageCache::default())); let (mut filter_layout, mut projection_layout, buf, length) = layout_and_bytes( cache.clone(), - Scan::new(Some(RowFilter::new_expr(BinaryExpr::new_expr( + Scan::new(RowFilter::new_expr(BinaryExpr::new_expr( Arc::new(Identity), Operator::Gt, Literal::new_expr(10.into()), - )))), + ))), ) .await; @@ -527,7 +527,7 @@ mod tests { async fn read_range_no_filter() { let cache = Arc::new(RwLock::new(LayoutMessageCache::default())); let (_, mut projection_layout, buf, length) = - layout_and_bytes(cache.clone(), Scan::new(None)).await; + layout_and_bytes(cache.clone(), Scan::empty()).await; let arr = read_layout(&mut projection_layout, cache, &buf, length).pop_front(); assert!(arr.is_some()); @@ -543,7 +543,7 @@ mod tests { async fn read_no_range() { let cache = Arc::new(RwLock::new(LayoutMessageCache::default())); let (_, mut projection_layout, buf, _) = - layout_and_bytes(cache.clone(), Scan::new(None)).await; + layout_and_bytes(cache.clone(), Scan::empty()).await; let arr = read_layout_data( &mut projection_layout, cache, @@ -564,7 +564,7 @@ mod tests { async fn read_multiple_selectors() { let cache = Arc::new(RwLock::new(LayoutMessageCache::default())); let (_, mut projection_layout, buf, _) = - layout_and_bytes(cache.clone(), Scan::new(None)).await; + layout_and_bytes(cache.clone(), Scan::empty()).await; let mut first_range = BooleanBufferBuilder::new(200); first_range.append_n(150, true); diff --git a/vortex-file/src/read/layouts/columnar.rs b/vortex-file/src/read/layouts/columnar.rs index 2cb6e04e45..319217002d 100644 --- a/vortex-file/src/read/layouts/columnar.rs +++ b/vortex-file/src/read/layouts/columnar.rs @@ -80,7 +80,7 @@ impl ColumnarLayoutBuilder<'_> { let child = self.layout_serde.read_layout( child_layout, - Scan::new(projected_expr), + Scan::from(projected_expr), self.message_cache .relative(resolved_child as u16, child_field), )?; @@ -223,7 +223,7 @@ impl ColumnarLayoutReader { impl LayoutReader for ColumnarLayoutReader { fn add_splits(&self, row_offset: usize, splits: &mut BTreeSet) -> VortexResult<()> { for child in &self.children { - child.add_splits(row_offset, splits)? + child.add_splits(row_offset, splits)?; } Ok(()) } @@ -470,7 +470,7 @@ mod tests { layout_serde .read_layout( initial_read.fb_layout(), - Scan::new(None), + Scan::empty(), RelativeLayoutCache::new(cache.clone(), dtype), ) .unwrap(), @@ -485,11 +485,11 @@ mod tests { let cache = Arc::new(RwLock::new(LayoutMessageCache::default())); let (mut filter_layout, mut project_layout, buf, length) = layout_and_bytes( cache.clone(), - Scan::new(Some(RowFilter::new_expr(BinaryExpr::new_expr( + Scan::new(RowFilter::new_expr(BinaryExpr::new_expr( Column::new_expr(Field::from("ints")), Operator::Gt, Literal::new_expr(10.into()), - )))), + ))), ) .await; let arr = filter_read_layout( @@ -540,7 +540,7 @@ mod tests { async fn read_range_no_filter() { let cache = Arc::new(RwLock::new(LayoutMessageCache::default())); let (_, mut project_layout, buf, length) = - layout_and_bytes(cache.clone(), Scan::new(None)).await; + layout_and_bytes(cache.clone(), Scan::empty()).await; let arr = read_layout(project_layout.as_mut(), cache, &buf, length).pop_front(); assert!(arr.is_some()); @@ -583,7 +583,7 @@ mod tests { let cache = Arc::new(RwLock::new(LayoutMessageCache::default())); let (mut filter_layout, mut project_layout, buf, length) = layout_and_bytes( cache.clone(), - Scan::new(Some(RowFilter::new_expr(BinaryExpr::new_expr( + Scan::new(RowFilter::new_expr(BinaryExpr::new_expr( BinaryExpr::new_expr( Column::new_expr(Field::from("strs")), Operator::Eq, @@ -595,7 +595,7 @@ mod tests { Operator::Lt, Literal::new_expr(150.into()), ), - )))), + ))), ) .await; let arr = filter_read_layout( diff --git a/vortex-file/src/read/layouts/flat.rs b/vortex-file/src/read/layouts/flat.rs index 59e47b4b1d..0c1a176899 100644 --- a/vortex-file/src/read/layouts/flat.rs +++ b/vortex-file/src/read/layouts/flat.rs @@ -139,7 +139,7 @@ mod tests { writer.write_batch(array).await.unwrap(); let written = writer.into_inner(); - let projection_scan = Scan::new(None); + let projection_scan = Scan::empty(); let dtype = Arc::new(LazyDType::from_dtype(PType::I32.into())); ( @@ -180,11 +180,11 @@ mod tests { let cache = Arc::new(RwLock::new(LayoutMessageCache::default())); let (mut filter_layout, mut projection_layout, buf, length) = layout_and_bytes( cache.clone(), - Scan::new(Some(RowFilter::new_expr(BinaryExpr::new_expr( + Scan::new(RowFilter::new_expr(BinaryExpr::new_expr( Arc::new(Identity), Operator::Gt, Literal::new_expr(10.into()), - )))), + ))), ) .await; let arr = filter_read_layout( @@ -225,11 +225,11 @@ mod tests { let cache = Arc::new(RwLock::new(LayoutMessageCache::default())); let (mut filter_layout, mut projection_layout, buf, length) = layout_and_bytes( cache.clone(), - Scan::new(Some(RowFilter::new_expr(BinaryExpr::new_expr( + Scan::new(RowFilter::new_expr(BinaryExpr::new_expr( Arc::new(Identity), Operator::Gt, Literal::new_expr(101.into()), - )))), + ))), ) .await; let arr = filter_read_layout( diff --git a/vortex-file/src/read/layouts/test_read.rs b/vortex-file/src/read/layouts/test_read.rs index 8de47b7d5a..841f34cc39 100644 --- a/vortex-file/src/read/layouts/test_read.rs +++ b/vortex-file/src/read/layouts/test_read.rs @@ -6,20 +6,20 @@ use vortex_array::ArrayData; use vortex_error::VortexUnwrap; use crate::read::mask::RowMask; -use crate::read::splits::FixedSplitIterator; +use crate::read::splits::SplitsAccumulator; use crate::{LayoutMessageCache, LayoutReader, MessageLocator, PollRead}; fn layout_splits( layouts: &[&mut dyn LayoutReader], length: usize, ) -> impl Iterator { - let mut iter = FixedSplitIterator::new(length as u64, None); + let mut iter = SplitsAccumulator::new(length as u64, None); let mut splits = BTreeSet::new(); for layout in layouts { layout.add_splits(0, &mut splits).vortex_unwrap(); } - iter.additional_splits(&mut splits).vortex_unwrap(); - iter.map(|m| m.unwrap()) + iter.append_splits(&mut splits); + iter.into_iter().map(|m| m.unwrap()) } pub fn read_layout_data( diff --git a/vortex-file/src/read/mod.rs b/vortex-file/src/read/mod.rs index 65526b2309..2bc5a4b72c 100644 --- a/vortex-file/src/read/mod.rs +++ b/vortex-file/src/read/mod.rs @@ -48,7 +48,13 @@ impl Scan { Self { expr: None } } - pub fn new(expr: Option) -> Self { + pub fn new(expr: ExprRef) -> Self { + Self { expr: Some(expr) } + } +} + +impl From> for Scan { + fn from(expr: Option) -> Self { Self { expr } } } diff --git a/vortex-file/src/read/splits.rs b/vortex-file/src/read/splits.rs index f340bcbc42..f0ab18fbb4 100644 --- a/vortex-file/src/read/splits.rs +++ b/vortex-file/src/read/splits.rs @@ -1,12 +1,8 @@ use std::collections::BTreeSet; -use std::mem; -use std::pin::Pin; -use std::task::{Context, Poll}; -use futures::Stream; use itertools::Itertools; use vortex_array::stats::ArrayStatistics; -use vortex_error::{vortex_bail, VortexResult, VortexUnwrap}; +use vortex_error::{VortexResult, VortexUnwrap}; use crate::read::buffered::ReadMasked; use crate::{LayoutReader, PollRead, Prune, RowMask}; @@ -62,78 +58,64 @@ impl ReadMasked for ReadRowMask { } } -enum FixedSplitState { - Ranges(Box + Send>), - Splits(BTreeSet), +pub struct SplitsAccumulator { + splits: BTreeSet, + row_mask: Option, } -pub struct FixedSplitIterator { - splits: FixedSplitState, +pub struct SplitsIntoIter { + ranges: Box + Send>, row_mask: Option, } -impl FixedSplitIterator { +impl SplitsAccumulator { pub fn new(row_count: u64, row_mask: Option) -> Self { let mut splits = BTreeSet::new(); splits.insert(row_count.try_into().vortex_unwrap()); - Self { - splits: FixedSplitState::Splits(splits), - row_mask, - } + Self { splits, row_mask } } - pub fn additional_splits(&mut self, splits: &mut BTreeSet) -> VortexResult<()> { - match &mut self.splits { - FixedSplitState::Ranges(_) => { - vortex_bail!("Can't insert additional splits if we started producing row ranges") - } - FixedSplitState::Splits(s) => { - s.append(splits); - Ok(()) - } - } + pub fn append_splits(&mut self, other: &mut BTreeSet) { + self.splits.append(other); } } -impl Iterator for FixedSplitIterator { +impl IntoIterator for SplitsAccumulator { type Item = VortexResult; - fn next(&mut self) -> Option { - match &mut self.splits { - FixedSplitState::Ranges(ranges) => { - // Find next range that's not filtered out by supplied row_mask - for (begin, end) in ranges { - return if let Some(ref row_mask) = self.row_mask { - let sliced = match row_mask.slice(begin, end) { - Ok(s) => s, - Err(e) => return Some(Err(e)), - }; - - if sliced.is_all_false() { - continue; - } - Some(Ok(sliced)) - } else { - Some(Ok(RowMask::new_valid_between(begin, end))) - }; - } - None - } - FixedSplitState::Splits(s) => { - self.splits = FixedSplitState::Ranges(Box::new( - mem::take(s).into_iter().tuple_windows::<(usize, usize)>(), - )); - self.next() - } + type IntoIter = SplitsIntoIter; + + fn into_iter(self) -> Self::IntoIter { + let ranges = Box::new(self.splits.into_iter().tuple_windows::<(usize, usize)>()); + SplitsIntoIter { + ranges, + row_mask: self.row_mask, } } } -impl Stream for FixedSplitIterator { +impl Iterator for SplitsIntoIter { type Item = VortexResult; - fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - Poll::Ready(self.next()) + fn next(&mut self) -> Option { + // Find next range that's not filtered out by supplied row_mask + for (begin, end) in self.ranges.as_mut() { + return if let Some(ref row_mask) = self.row_mask { + let sliced = match row_mask.slice(begin, end) { + Ok(s) => s, + Err(e) => return Some(Err(e)), + }; + + if sliced.is_all_false() { + continue; + } + Some(Ok(sliced)) + } else { + Some(Ok(RowMask::new_valid_between(begin, end))) + }; + } + + None } } @@ -144,28 +126,13 @@ mod tests { use vortex_array::compute::FilterMask; use vortex_error::VortexResult; - use crate::read::splits::FixedSplitIterator; + use crate::read::splits::SplitsAccumulator; use crate::RowMask; - #[test] - #[should_panic] - #[cfg_attr(miri, ignore)] - fn register_after_start() { - let mut mask_iter = FixedSplitIterator::new(10, None); - mask_iter - .additional_splits(&mut BTreeSet::from([0, 1, 2])) - .unwrap(); - assert!(mask_iter.next().is_some()); - mask_iter - .additional_splits(&mut BTreeSet::from([5])) - .unwrap(); - mask_iter.next(); - } - #[test] #[cfg_attr(miri, ignore)] fn filters_empty() { - let mut mask_iter = FixedSplitIterator::new( + let mut mask_iter = SplitsAccumulator::new( 10, Some( RowMask::try_new( @@ -178,11 +145,12 @@ mod tests { .unwrap(), ), ); - mask_iter - .additional_splits(&mut BTreeSet::from([0, 2, 4, 6, 8, 10])) - .unwrap(); + mask_iter.append_splits(&mut BTreeSet::from([0, 2, 4, 6, 8, 10])); - let actual = mask_iter.collect::>>().unwrap(); + let actual = mask_iter + .into_iter() + .collect::>>() + .unwrap(); let expected = vec![RowMask::new_valid_between(4, 6)]; assert_eq!(actual, expected); diff --git a/vortex-file/src/read/stream.rs b/vortex-file/src/read/stream.rs index 244f21d3e5..c1ac843b8f 100644 --- a/vortex-file/src/read/stream.rs +++ b/vortex-file/src/read/stream.rs @@ -3,7 +3,7 @@ use std::pin::Pin; use std::sync::{Arc, RwLock}; use std::task::{Context, Poll}; -use futures::Stream; +use futures::{stream, Stream}; use futures_util::{StreamExt, TryStreamExt}; use vortex_array::array::ChunkedArray; use vortex_array::{ArrayData, IntoArrayData}; @@ -14,7 +14,7 @@ use vortex_io::{IoDispatcher, VortexReadAt}; use crate::read::buffered::{BufferedLayoutReader, ReadArray}; use crate::read::cache::LayoutMessageCache; use crate::read::mask::RowMask; -use crate::read::splits::{FixedSplitIterator, ReadRowMask}; +use crate::read::splits::{ReadRowMask, SplitsAccumulator}; use crate::read::LayoutReader; use crate::LazyDType; @@ -54,20 +54,21 @@ impl VortexFileArrayStream { fr.add_splits(0, &mut reader_splits)?; } - let mut split_iterator = FixedSplitIterator::new(row_count, row_mask); - split_iterator.additional_splits(&mut reader_splits)?; + let mut split_accumulator = SplitsAccumulator::new(row_count, row_mask); + split_accumulator.append_splits(&mut reader_splits); + let splits_stream = stream::iter(split_accumulator); // Set up a stream of RowMask that result from applying a filter expression over the file. let mask_iterator = if let Some(fr) = filter_reader { Box::new(BufferedLayoutReader::new( input.clone(), dispatcher.clone(), - split_iterator, + splits_stream, ReadRowMask::new(fr), messages_cache.clone(), )) as _ } else { - Box::new(split_iterator) as _ + Box::new(splits_stream) as _ }; // Set up a stream of result ArrayData that result from applying the filter and projection @@ -108,15 +109,15 @@ impl Stream for VortexFileArrayStream { impl VortexFileArrayStream { pub async fn read_all(self) -> VortexResult { let dtype = self.dtype().clone(); - let vecs: Vec = self.try_collect().await?; - if vecs.len() == 1 { - vecs.into_iter().next().ok_or_else(|| { + let arrays = self.try_collect::>().await?; + if arrays.len() == 1 { + arrays.into_iter().next().ok_or_else(|| { vortex_panic!( "Should be impossible: vecs.len() == 1 but couldn't get first element" ) }) } else { - ChunkedArray::try_new(vecs, dtype).map(|e| e.into_array()) + ChunkedArray::try_new(arrays, dtype).map(|e| e.into_array()) } } } diff --git a/vortex-file/src/tests.rs b/vortex-file/src/tests.rs index 04dcd51298..615dac6fd6 100644 --- a/vortex-file/src/tests.rs +++ b/vortex-file/src/tests.rs @@ -135,7 +135,7 @@ async fn test_splits() { let layout_reader = layout_serde .read_layout( initial_read.fb_layout(), - Scan::new(None), + Scan::empty(), RelativeLayoutCache::new(cache, dtype), ) .unwrap();