Skip to content

Commit

Permalink
make the splits iterator state more explicit
Browse files Browse the repository at this point in the history
  • Loading branch information
AdamGS committed Dec 12, 2024
1 parent 8445b1d commit b214736
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 74 deletions.
8 changes: 4 additions & 4 deletions vortex-file/src/read/layouts/test_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,20 @@ use vortex_array::ArrayData;
use vortex_error::VortexUnwrap;

use crate::read::mask::RowMask;
use crate::read::splits::FixedSplitIterator;
use crate::read::splits::SplitsAccumulator;
use crate::{LayoutMessageCache, LayoutReader, MessageLocator, PollRead};

fn layout_splits(
layouts: &[&mut dyn LayoutReader],
length: usize,
) -> impl Iterator<Item = RowMask> {
let mut iter = FixedSplitIterator::new(length as u64, None);
let mut iter = SplitsAccumulator::new(length as u64, None);
let mut splits = BTreeSet::new();
for layout in layouts {
layout.add_splits(0, &mut splits).vortex_unwrap();
}
iter.append_splits(&mut splits).vortex_unwrap();
iter.map(|m| m.unwrap())
iter.append_splits(&mut splits);
iter.into_iter().map(|m| m.unwrap())
}

pub fn read_layout_data(
Expand Down
118 changes: 52 additions & 66 deletions vortex-file/src/read/splits.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use std::collections::BTreeSet;
use std::mem;

use itertools::Itertools;
use vortex_array::stats::ArrayStatistics;
use vortex_error::{vortex_bail, VortexResult, VortexUnwrap};
use vortex_error::{VortexResult, VortexUnwrap};

use crate::read::buffered::ReadMasked;
use crate::{LayoutReader, PollRead, Prune, RowMask};
Expand Down Expand Up @@ -59,70 +58,69 @@ impl ReadMasked for ReadRowMask {
}
}

enum FixedSplitState {
Ranges(Box<dyn Iterator<Item = (usize, usize)> + Send>),
Splits(BTreeSet<usize>),
// enum FixedSplitState {
// Ranges(Box<dyn Iterator<Item = (usize, usize)> + Send>),
// Splits(BTreeSet<usize>),
// }

pub struct SplitsAccumulator {
splits: BTreeSet<usize>,
row_mask: Option<RowMask>,
}

pub struct FixedSplitIterator {
splits: FixedSplitState,
pub struct SplitsIntoIter {
ranges: Box<dyn Iterator<Item = (usize, usize)> + Send>,
row_mask: Option<RowMask>,
}

impl FixedSplitIterator {
impl SplitsAccumulator {
pub fn new(row_count: u64, row_mask: Option<RowMask>) -> Self {
let mut splits = BTreeSet::new();
splits.insert(row_count.try_into().vortex_unwrap());
Self {
splits: FixedSplitState::Splits(splits),
row_mask,
}
Self { splits, row_mask }
}

pub fn append_splits(&mut self, splits: &mut BTreeSet<usize>) -> VortexResult<()> {
match &mut self.splits {
FixedSplitState::Ranges(_) => {
vortex_bail!("Can't insert additional splits if we started producing row ranges")
}
FixedSplitState::Splits(s) => {
s.append(splits);
Ok(())
}
pub fn append_splits(&mut self, other: &mut BTreeSet<usize>) {
self.splits.append(other);
}
}

impl IntoIterator for SplitsAccumulator {
type Item = VortexResult<RowMask>;

type IntoIter = SplitsIntoIter;

fn into_iter(self) -> Self::IntoIter {
let ranges = Box::new(self.splits.into_iter().tuple_windows::<(usize, usize)>());
SplitsIntoIter {
ranges,
row_mask: self.row_mask,
}
}
}

impl Iterator for FixedSplitIterator {
impl Iterator for SplitsIntoIter {
type Item = VortexResult<RowMask>;

fn next(&mut self) -> Option<Self::Item> {
match &mut self.splits {
FixedSplitState::Ranges(ranges) => {
// Find next range that's not filtered out by supplied row_mask
for (begin, end) in ranges {
return if let Some(ref row_mask) = self.row_mask {
let sliced = match row_mask.slice(begin, end) {
Ok(s) => s,
Err(e) => return Some(Err(e)),
};

if sliced.is_all_false() {
continue;
}
Some(Ok(sliced))
} else {
Some(Ok(RowMask::new_valid_between(begin, end)))
};
// Find next range that's not filtered out by supplied row_mask
for (begin, end) in self.ranges.as_mut() {
return if let Some(ref row_mask) = self.row_mask {
let sliced = match row_mask.slice(begin, end) {
Ok(s) => s,
Err(e) => return Some(Err(e)),
};

if sliced.is_all_false() {
continue;
}
None
}
FixedSplitState::Splits(s) => {
self.splits = FixedSplitState::Ranges(Box::new(
mem::take(s).into_iter().tuple_windows::<(usize, usize)>(),
));
self.next()
}
Some(Ok(sliced))
} else {
Some(Ok(RowMask::new_valid_between(begin, end)))
};
}

None
}
}

Expand All @@ -133,26 +131,13 @@ mod tests {
use vortex_array::compute::FilterMask;
use vortex_error::VortexResult;

use crate::read::splits::FixedSplitIterator;
use crate::read::splits::SplitsAccumulator;
use crate::RowMask;

#[test]
#[should_panic]
#[cfg_attr(miri, ignore)]
fn register_after_start() {
let mut mask_iter = FixedSplitIterator::new(10, None);
mask_iter
.append_splits(&mut BTreeSet::from([0, 1, 2]))
.unwrap();
assert!(mask_iter.next().is_some());
mask_iter.append_splits(&mut BTreeSet::from([5])).unwrap();
mask_iter.next();
}

#[test]
#[cfg_attr(miri, ignore)]
fn filters_empty() {
let mut mask_iter = FixedSplitIterator::new(
let mut mask_iter = SplitsAccumulator::new(
10,
Some(
RowMask::try_new(
Expand All @@ -165,11 +150,12 @@ mod tests {
.unwrap(),
),
);
mask_iter
.append_splits(&mut BTreeSet::from([0, 2, 4, 6, 8, 10]))
.unwrap();
mask_iter.append_splits(&mut BTreeSet::from([0, 2, 4, 6, 8, 10]));

let actual = mask_iter.collect::<VortexResult<Vec<_>>>().unwrap();
let actual = mask_iter
.into_iter()
.collect::<VortexResult<Vec<_>>>()
.unwrap();
let expected = vec![RowMask::new_valid_between(4, 6)];

assert_eq!(actual, expected);
Expand Down
8 changes: 4 additions & 4 deletions vortex-file/src/read/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use vortex_io::{IoDispatcher, VortexReadAt};
use crate::read::buffered::{BufferedLayoutReader, ReadArray};
use crate::read::cache::LayoutMessageCache;
use crate::read::mask::RowMask;
use crate::read::splits::{FixedSplitIterator, ReadRowMask};
use crate::read::splits::{ReadRowMask, SplitsAccumulator};
use crate::read::LayoutReader;
use crate::LazyDType;

Expand Down Expand Up @@ -54,9 +54,9 @@ impl<R: VortexReadAt + Unpin> VortexFileArrayStream<R> {
fr.add_splits(0, &mut reader_splits)?;
}

let mut split_iterator = FixedSplitIterator::new(row_count, row_mask);
split_iterator.append_splits(&mut reader_splits)?;
let splits_stream = stream::iter(split_iterator);
let mut split_accumulator = SplitsAccumulator::new(row_count, row_mask);
split_accumulator.append_splits(&mut reader_splits);
let splits_stream = stream::iter(split_accumulator);

// Set up a stream of RowMask that result from applying a filter expression over the file.
let mask_iterator = if let Some(fr) = filter_reader {
Expand Down

0 comments on commit b214736

Please sign in to comment.