Skip to content

Commit

Permalink
Naive interleaved filtering and data reading (#918)
Browse files Browse the repository at this point in the history
Read filter-related data using a different `LayoutReader`, but with a
shared message cache. Once we wire-in metadata, we can use this
information to skip chunks/parts of chunks.

There are some assumptions on overall alignments/batch size, that are
held by the underlying `BatchReader`.
  • Loading branch information
AdamGS authored Sep 24, 2024
1 parent 0bdfd08 commit e9f0d4d
Show file tree
Hide file tree
Showing 10 changed files with 166 additions and 126 deletions.
7 changes: 5 additions & 2 deletions vortex-dtype/src/serde/flatbuffers/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@ pub fn resolve_field_references<'a, 'b: 'a>(
}

/// Deserialize flatbuffer schema selecting only columns defined by projection
pub fn deserialize_and_project(fb: fb::DType<'_>, projection: &[Field]) -> VortexResult<DType> {
let fb_struct = fb
pub fn deserialize_and_project(
fb_dtype: fb::DType<'_>,
projection: &[Field],
) -> VortexResult<DType> {
let fb_struct = fb_dtype
.type__as_struct_()
.ok_or_else(|| vortex_err!("The top-level type should be a struct"))?;
let nullability = fb_struct.nullable().into();
Expand Down
6 changes: 3 additions & 3 deletions vortex-serde/src/layouts/read/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@ use vortex::array::StructArray;
use vortex::{Array, IntoArray};
use vortex_error::{vortex_err, VortexResult};

use crate::layouts::read::{Layout, ReadResult};
use crate::layouts::read::{LayoutReader, ReadResult};

#[derive(Debug)]
pub struct BatchReader {
names: Arc<[Arc<str>]>,
children: Vec<Box<dyn Layout>>,
children: Vec<Box<dyn LayoutReader>>,
arrays: Vec<Option<Array>>,
}

impl BatchReader {
pub fn new(names: Arc<[Arc<str>]>, children: Vec<Box<dyn Layout>>) -> Self {
pub fn new(names: Arc<[Arc<str>]>, children: Vec<Box<dyn LayoutReader>>) -> Self {
let arrays = vec![None; children.len()];
Self {
names,
Expand Down
6 changes: 3 additions & 3 deletions vortex-serde/src/layouts/read/buffered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@ use vortex::compute::slice;
use vortex::{Array, ArrayDType, IntoArray};
use vortex_error::VortexResult;

use crate::layouts::read::{Layout, ReadResult};
use crate::layouts::read::{LayoutReader, ReadResult};

#[derive(Debug)]
pub struct BufferedReader {
layouts: VecDeque<Box<dyn Layout>>,
layouts: VecDeque<Box<dyn LayoutReader>>,
arrays: VecDeque<Array>,
batch_size: usize,
}

impl BufferedReader {
pub fn new(layouts: VecDeque<Box<dyn Layout>>, batch_size: usize) -> Self {
pub fn new(layouts: VecDeque<Box<dyn LayoutReader>>, batch_size: usize) -> Self {
Self {
layouts,
arrays: Default::default(),
Expand Down
75 changes: 37 additions & 38 deletions vortex-serde/src/layouts/read/builder.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
use std::sync::{Arc, RwLock};

use ahash::HashSet;
use bytes::BytesMut;
use vortex::{Array, ArrayDType};
use vortex_dtype::field::Field;
use vortex_error::{vortex_bail, VortexResult};
use vortex_schema::projection::Projection;
use vortex_schema::Schema;

use crate::io::VortexReadAt;
use crate::layouts::read::cache::{LayoutMessageCache, RelativeLayoutCache};
Expand Down Expand Up @@ -74,65 +71,67 @@ impl<R: VortexReadAt> LayoutReaderBuilder<R> {
let footer = self.read_footer().await?;
let batch_size = self.batch_size.unwrap_or(DEFAULT_BATCH_SIZE);

// TODO(robert): Don't leak filter references into read projection
let (read_projection, result_projection) = if let Some(filter_columns) = self
let filter_projection = self
.row_filter
.as_ref()
.map(|f| f.references())
.filter(|refs| !refs.is_empty())
// This is necessary to have globally addressed columns in the relative cache,
// there is probably a better of doing that, but this works for now and the API isn't very externally-useful.
.map(|refs| footer.resolve_references(&refs.into_iter().collect::<Vec<_>>()))
.transpose()?
{
match self.projection.unwrap_or_default() {
Projection::All => (Projection::All, Projection::All),
Projection::Flat(mut v) => {
let original_len = v.len();
let existing_fields: HashSet<Field> = v.iter().cloned().collect();
v.extend(
filter_columns
.into_iter()
.filter(|f| !existing_fields.contains(f)),
);
(
Projection::Flat(v),
Projection::Flat((0..original_len).map(Field::from).collect()),
)
}
}
} else {
(self.projection.unwrap_or_default(), Projection::All)
};
.map(Projection::from);

let read_projection = self.projection.unwrap_or_default();

let projected_dtype = match &read_projection {
let projected_dtype = match read_projection {
Projection::All => footer.dtype()?,
Projection::Flat(projection) => footer.projected_dtype(projection)?,
Projection::Flat(ref projection) => footer.projected_dtype(projection)?,
};

let filter = self.row_filter.map(|f| {
let schema = Schema::new(projected_dtype.clone());
f.reorder(&schema)
});
let filter_dtype = filter_projection
.as_ref()
.map(|p| match p {
Projection::All => footer.dtype(),
Projection::Flat(fields) => footer.projected_dtype(fields),
})
.transpose()?;

let scan = Scan {
filter,
filter: self.row_filter.clone(),
batch_size,
projection: read_projection,
indices: self.indices,
};

let message_cache = Arc::new(RwLock::new(LayoutMessageCache::default()));
let layouts_cache =
RelativeLayoutCache::new(message_cache.clone(), projected_dtype.clone());

let layout = footer.layout(scan.clone(), layouts_cache)?;
let data_reader = footer.layout(
scan.clone(),
RelativeLayoutCache::new(message_cache.clone(), projected_dtype.clone()),
)?;

let filter_reader = filter_dtype
.zip(filter_projection)
.map(|(dtype, projection)| {
footer.layout(
Scan {
filter: self.row_filter,
batch_size,
projection,
indices: None,
},
RelativeLayoutCache::new(message_cache.clone(), dtype),
)
})
.transpose()?;

Ok(LayoutBatchStream::new(
self.reader,
layout,
data_reader,
filter_reader,
message_cache,
projected_dtype,
scan,
result_projection,
))
}

Expand Down
6 changes: 3 additions & 3 deletions vortex-serde/src/layouts/read/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use vortex_flatbuffers::footer::LayoutVariant;

use crate::layouts::read::cache::RelativeLayoutCache;
use crate::layouts::read::layouts::{ChunkedLayoutSpec, ColumnLayoutSpec, FlatLayout};
use crate::layouts::read::{Layout, Scan};
use crate::layouts::read::{LayoutReader, Scan};

#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
pub struct LayoutId(pub u16);
Expand All @@ -25,7 +25,7 @@ pub trait LayoutSpec: Debug + Send + Sync {
scan: Scan,
layout_reader: LayoutDeserializer,
message_cache: RelativeLayoutCache,
) -> Box<dyn Layout>;
) -> Box<dyn LayoutReader>;
}

pub type LayoutSpecRef = &'static dyn LayoutSpec;
Expand Down Expand Up @@ -73,7 +73,7 @@ impl LayoutDeserializer {
fb_loc: usize,
scan: Scan,
message_cache: RelativeLayoutCache,
) -> VortexResult<Box<dyn Layout>> {
) -> VortexResult<Box<dyn LayoutReader>> {
let fb_layout = unsafe {
let tab = flatbuffers::Table::new(&fb_bytes, fb_loc);
fb::Layout::init_from_table(tab)
Expand Down
15 changes: 11 additions & 4 deletions vortex-serde/src/layouts/read/filtering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use std::sync::Arc;

use vortex::array::BoolArray;
use vortex::compute::and;
use vortex::stats::ArrayStatistics;
use vortex::validity::Validity;
use vortex::{Array, IntoArray};
use vortex_dtype::field::{Field, FieldPath};
use vortex_error::VortexResult;
Expand All @@ -18,8 +20,8 @@ pub struct RowFilter {
}

impl RowFilter {
pub fn new(filter: Arc<dyn VortexExpr>) -> Self {
let conjunction = split_conjunction(&filter)
pub fn new(expr: Arc<dyn VortexExpr>) -> Self {
let conjunction = split_conjunction(&expr)
.into_iter()
.filter(expr_is_filter)
.collect();
Expand All @@ -33,6 +35,12 @@ impl RowFilter {
for expr in self.conjunction.iter() {
let new_mask = expr.evaluate(target)?;
mask = and(new_mask, mask)?;

if mask.statistics().compute_true_count().unwrap_or_default() == 0 {
return Ok(
BoolArray::from_vec(vec![false; target.len()], Validity::AllValid).into_array(),
);
}
}

Ok(mask)
Expand All @@ -42,8 +50,7 @@ impl RowFilter {
pub fn references(&self) -> HashSet<Field> {
let mut set = HashSet::new();
for expr in self.conjunction.iter() {
let references = expr.references();
set.extend(references.iter().cloned());
set.extend(expr.references().iter().cloned());
}

set
Expand Down
4 changes: 2 additions & 2 deletions vortex-serde/src/layouts/read/footer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use vortex_schema::Schema;

use crate::layouts::read::cache::RelativeLayoutCache;
use crate::layouts::read::context::LayoutDeserializer;
use crate::layouts::read::{Layout, Scan, FILE_POSTSCRIPT_SIZE};
use crate::layouts::read::{LayoutReader, Scan, FILE_POSTSCRIPT_SIZE};
use crate::messages::IPCDType;
use crate::FLATBUFFER_SIZE_LENGTH;

Expand Down Expand Up @@ -57,7 +57,7 @@ impl Footer {
&self,
scan: Scan,
message_cache: RelativeLayoutCache,
) -> VortexResult<Box<dyn Layout>> {
) -> VortexResult<Box<dyn LayoutReader>> {
let start_offset = self.leftovers_layout_offset();
let end_offset = self.leftovers.len() - FILE_POSTSCRIPT_SIZE;
let footer_bytes = self
Expand Down
14 changes: 7 additions & 7 deletions vortex-serde/src/layouts/read/layouts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::layouts::read::batch::BatchReader;
use crate::layouts::read::buffered::BufferedReader;
use crate::layouts::read::cache::RelativeLayoutCache;
use crate::layouts::read::context::{LayoutDeserializer, LayoutId, LayoutSpec};
use crate::layouts::read::{Layout, ReadResult, Scan};
use crate::layouts::read::{LayoutReader, ReadResult, Scan};
use crate::stream_writer::ByteRange;
use crate::ArrayBufferReader;

Expand Down Expand Up @@ -44,7 +44,7 @@ impl FlatLayout {
}
}

impl Layout for FlatLayout {
impl LayoutReader for FlatLayout {
fn read_next(&mut self) -> VortexResult<Option<ReadResult>> {
match self.state {
FlatLayoutState::Init => {
Expand Down Expand Up @@ -97,7 +97,7 @@ impl LayoutSpec for ColumnLayoutSpec {
scan: Scan,
layout_serde: LayoutDeserializer,
message_cache: RelativeLayoutCache,
) -> Box<dyn Layout> {
) -> Box<dyn LayoutReader> {
Box::new(ColumnLayout::new(
fb_bytes,
fb_loc,
Expand Down Expand Up @@ -160,7 +160,7 @@ impl ColumnLayout {
idx: usize,
children: Vector<ForwardsUOffset<fb::Layout>>,
dtype: DType,
) -> VortexResult<Box<dyn Layout>> {
) -> VortexResult<Box<dyn LayoutReader>> {
let layout = children.get(idx);

// TODO: Figure out complex nested schema projections
Expand All @@ -176,7 +176,7 @@ impl ColumnLayout {
}
}

impl Layout for ColumnLayout {
impl LayoutReader for ColumnLayout {
fn read_next(&mut self) -> VortexResult<Option<ReadResult>> {
match &mut self.state {
ColumnLayoutState::Init => {
Expand Down Expand Up @@ -236,7 +236,7 @@ impl LayoutSpec for ChunkedLayoutSpec {
scan: Scan,
layout_serde: LayoutDeserializer,
message_cache: RelativeLayoutCache,
) -> Box<dyn Layout> {
) -> Box<dyn LayoutReader> {
Box::new(ChunkedLayout::new(
fb_bytes,
fb_loc,
Expand Down Expand Up @@ -296,7 +296,7 @@ impl ChunkedLayout {
}
}

impl Layout for ChunkedLayout {
impl LayoutReader for ChunkedLayout {
fn read_next(&mut self) -> VortexResult<Option<ReadResult>> {
match &mut self.state {
ChunkedLayoutState::Init => {
Expand Down
2 changes: 1 addition & 1 deletion vortex-serde/src/layouts/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub enum ReadResult {
Batch(Array),
}

pub trait Layout: Debug + Send {
pub trait LayoutReader: Debug + Send {
/// Reads the data from the underlying layout
///
/// The layout can either return a batch data, i.e. an Array or ask for more layout messages to
Expand Down
Loading

0 comments on commit e9f0d4d

Please sign in to comment.