Skip to content

Commit

Permalink
Use filter in RowMask::evaluate (#1515)
Browse files Browse the repository at this point in the history
  • Loading branch information
gatesn authored Dec 1, 2024
1 parent 6c7206a commit 91f3efe
Show file tree
Hide file tree
Showing 12 changed files with 146 additions and 86 deletions.
1 change: 1 addition & 0 deletions bench-vortex/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ async fn take_vortex<T: VortexReadAt + Unpin + 'static>(
LayoutContext::default().into(),
),
)
.with_io_dispatcher(DISPATCHER.clone())
.with_indices(ArrayData::from(indices.to_vec()))
.build()
.await?
Expand Down
17 changes: 17 additions & 0 deletions encodings/datetime-parts/src/compute/filter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
use vortex_array::compute::{filter, FilterFn, FilterMask};
use vortex_array::{ArrayDType, ArrayData, IntoArrayData};
use vortex_error::VortexResult;

use crate::{DateTimePartsArray, DateTimePartsEncoding};

impl FilterFn<DateTimePartsArray> for DateTimePartsEncoding {
fn filter(&self, array: &DateTimePartsArray, mask: FilterMask) -> VortexResult<ArrayData> {
Ok(DateTimePartsArray::try_new(
array.dtype().clone(),
filter(array.days().as_ref(), mask.clone())?,
filter(array.seconds().as_ref(), mask.clone())?,
filter(array.subsecond().as_ref(), mask)?,
)?
.into_array())
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
mod filter;
mod take;

use itertools::Itertools as _;
use vortex_array::array::{PrimitiveArray, TemporalArray};
use vortex_array::compute::{
scalar_at, slice, take, ComputeVTable, ScalarAtFn, SliceFn, TakeFn, TakeOptions,
scalar_at, slice, ComputeVTable, FilterFn, ScalarAtFn, SliceFn, TakeFn,
};
use vortex_array::validity::ArrayValidity;
use vortex_array::{ArrayDType, ArrayData, IntoArrayData, IntoArrayVariant};
Expand All @@ -13,6 +16,10 @@ use vortex_scalar::Scalar;
use crate::{DateTimePartsArray, DateTimePartsEncoding};

impl ComputeVTable for DateTimePartsEncoding {
fn filter_fn(&self) -> Option<&dyn FilterFn<ArrayData>> {
Some(self)
}

fn scalar_at_fn(&self) -> Option<&dyn ScalarAtFn<ArrayData>> {
Some(self)
}
Expand All @@ -26,23 +33,6 @@ impl ComputeVTable for DateTimePartsEncoding {
}
}

impl TakeFn<DateTimePartsArray> for DateTimePartsEncoding {
fn take(
&self,
array: &DateTimePartsArray,
indices: &ArrayData,
options: TakeOptions,
) -> VortexResult<ArrayData> {
Ok(DateTimePartsArray::try_new(
array.dtype().clone(),
take(array.days(), indices, options)?,
take(array.seconds(), indices, options)?,
take(array.subsecond(), indices, options)?,
)?
.into_array())
}
}

impl SliceFn<DateTimePartsArray> for DateTimePartsEncoding {
fn slice(
&self,
Expand Down
22 changes: 22 additions & 0 deletions encodings/datetime-parts/src/compute/take.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use vortex_array::compute::{take, TakeFn, TakeOptions};
use vortex_array::{ArrayDType, ArrayData, IntoArrayData};
use vortex_error::VortexResult;

use crate::{DateTimePartsArray, DateTimePartsEncoding};

impl TakeFn<DateTimePartsArray> for DateTimePartsEncoding {
fn take(
&self,
array: &DateTimePartsArray,
indices: &ArrayData,
options: TakeOptions,
) -> VortexResult<ArrayData> {
Ok(DateTimePartsArray::try_new(
array.dtype().clone(),
take(array.days(), indices, options)?,
take(array.seconds(), indices, options)?,
take(array.subsecond(), indices, options)?,
)?
.into_array())
}
}
34 changes: 34 additions & 0 deletions vortex-array/src/array/chunked/compute/boolean.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use vortex_dtype::{DType, Nullability};
use vortex_error::VortexResult;

use crate::array::{ChunkedArray, ChunkedEncoding};
use crate::compute::{and, and_kleene, or, or_kleene, slice, BinaryBooleanFn, BinaryOperator};
use crate::{ArrayData, IntoArrayData};

impl BinaryBooleanFn<ChunkedArray> for ChunkedEncoding {
fn binary_boolean(
&self,
lhs: &ChunkedArray,
rhs: &ArrayData,
op: BinaryOperator,
) -> VortexResult<Option<ArrayData>> {
let mut idx = 0;
let mut chunks = Vec::with_capacity(lhs.nchunks());

for chunk in lhs.chunks() {
let sliced = slice(rhs, idx, idx + chunk.len())?;
let result = match op {
BinaryOperator::And => and(&chunk, &sliced),
BinaryOperator::AndKleene => and_kleene(&chunk, &sliced),
BinaryOperator::Or => or(&chunk, &sliced),
BinaryOperator::OrKleene => or_kleene(&chunk, &sliced),
};
chunks.push(result?);
idx += chunk.len();
}

Ok(Some(
ChunkedArray::try_new(chunks, DType::Bool(Nullability::Nullable))?.into_array(),
))
}
}
30 changes: 30 additions & 0 deletions vortex-array/src/array/chunked/compute/compare.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use vortex_dtype::{DType, Nullability};
use vortex_error::VortexResult;

use crate::array::{ChunkedArray, ChunkedEncoding};
use crate::compute::{compare, slice, CompareFn, Operator};
use crate::{ArrayData, IntoArrayData};

impl CompareFn<ChunkedArray> for ChunkedEncoding {
fn compare(
&self,
lhs: &ChunkedArray,
rhs: &ArrayData,
operator: Operator,
) -> VortexResult<Option<ArrayData>> {
let mut idx = 0;
let mut compare_chunks = Vec::with_capacity(lhs.nchunks());

for chunk in lhs.chunks() {
let sliced = slice(rhs, idx, idx + chunk.len())?;
let cmp_result = compare(&chunk, &sliced, operator)?;
compare_chunks.push(cmp_result);

idx += chunk.len();
}

Ok(Some(
ChunkedArray::try_new(compare_chunks, DType::Bool(Nullability::Nullable))?.into_array(),
))
}
}
36 changes: 9 additions & 27 deletions vortex-array/src/array/chunked/compute/mod.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,27 @@
use vortex_dtype::{DType, Nullability};
use vortex_dtype::DType;
use vortex_error::VortexResult;

use crate::array::chunked::ChunkedArray;
use crate::array::ChunkedEncoding;
use crate::compute::{
compare, slice, try_cast, CastFn, CompareFn, ComputeVTable, FilterFn, InvertFn, Operator,
ScalarAtFn, SliceFn, SubtractScalarFn, TakeFn,
try_cast, BinaryBooleanFn, CastFn, CompareFn, ComputeVTable, FilterFn, InvertFn, ScalarAtFn,
SliceFn, SubtractScalarFn, TakeFn,
};
use crate::{ArrayData, IntoArrayData};

mod boolean;
mod compare;
mod filter;
mod invert;
mod scalar_at;
mod slice;
mod take;

impl ComputeVTable for ChunkedEncoding {
fn binary_boolean_fn(&self) -> Option<&dyn BinaryBooleanFn<ArrayData>> {
Some(self)
}

fn cast_fn(&self) -> Option<&dyn CastFn<ArrayData>> {
Some(self)
}
Expand Down Expand Up @@ -59,30 +65,6 @@ impl CastFn<ChunkedArray> for ChunkedEncoding {
}
}

impl CompareFn<ChunkedArray> for ChunkedEncoding {
fn compare(
&self,
lhs: &ChunkedArray,
rhs: &ArrayData,
operator: Operator,
) -> VortexResult<Option<ArrayData>> {
let mut idx = 0;
let mut compare_chunks = Vec::with_capacity(lhs.nchunks());

for chunk in lhs.chunks() {
let sliced = slice(rhs, idx, idx + chunk.len())?;
let cmp_result = compare(&chunk, &sliced, operator)?;
compare_chunks.push(cmp_result);

idx += chunk.len();
}

Ok(Some(
ChunkedArray::try_new(compare_chunks, DType::Bool(Nullability::Nullable))?.into_array(),
))
}
}

#[cfg(test)]
mod test {
use vortex_dtype::{DType, Nullability, PType};
Expand Down
5 changes: 4 additions & 1 deletion vortex-array/src/canonical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,10 @@ where
/// the array's internal codec.
impl IntoCanonical for ArrayData {
fn into_canonical(self) -> VortexResult<Canonical> {
log::debug!("Canonicalizing array with encoding {:?}", self.encoding());
// We only care to know when we canonicalize something non-trivial.
if !self.is_canonical() && self.len() > 1 {
log::debug!("Canonicalizing array with encoding {:?}", self.encoding());
}
self.encoding().into_canonical(self)
}
}
Expand Down
23 changes: 8 additions & 15 deletions vortex-array/src/compute/boolean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ fn binary_boolean(lhs: &ArrayData, rhs: &ArrayData, op: BinaryOperator) -> Vorte
}

// If the RHS is constant and the LHS is Arrow, we can't do any better than arrow_compare.
if lhs.is_arrow() && rhs.is_constant() {
if lhs.is_arrow() && (rhs.is_arrow() || rhs.is_constant()) {
return arrow_boolean(lhs.clone(), rhs.clone(), op);
}

Expand All @@ -104,13 +104,6 @@ fn binary_boolean(lhs: &ArrayData, rhs: &ArrayData, op: BinaryOperator) -> Vorte
.and_then(|f| f.binary_boolean(lhs, rhs, op).transpose())
{
return result;
} else {
log::debug!(
"No boolean implementation found for LHS {}, RHS {}, and operator {:?}",
lhs.encoding().id(),
rhs.encoding().id(),
op,
);
}

if let Some(result) = rhs
Expand All @@ -119,15 +112,15 @@ fn binary_boolean(lhs: &ArrayData, rhs: &ArrayData, op: BinaryOperator) -> Vorte
.and_then(|f| f.binary_boolean(rhs, lhs, op).transpose())
{
return result;
} else {
log::debug!(
"No boolean implementation found for LHS {}, RHS {}, and operator {:?}",
rhs.encoding().id(),
lhs.encoding().id(),
op,
);
}

log::debug!(
"No boolean implementation found for LHS {}, RHS {}, and operator {:?} (or inverse)",
rhs.encoding().id(),
lhs.encoding().id(),
op,
);

// If neither side implements the trait, then we delegate to Arrow compute.
arrow_boolean(lhs.clone(), rhs.clone(), op)
}
Expand Down
23 changes: 8 additions & 15 deletions vortex-array/src/compute/compare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ pub fn compare(
}

// If the RHS is constant and the LHS is Arrow, we can't do any better than arrow_compare.
if left.is_arrow() && right.is_constant() {
if left.is_arrow() && (right.is_arrow() || right.is_constant()) {
return arrow_compare(left, right, operator);
}

Expand All @@ -133,13 +133,6 @@ pub fn compare(
.and_then(|f| f.compare(left, right, operator).transpose())
{
return result;
} else {
log::debug!(
"No compare implementation found for LHS {}, RHS {}, and operator {}",
left.encoding().id(),
right.encoding().id(),
operator,
);
}

if let Some(result) = right
Expand All @@ -148,15 +141,15 @@ pub fn compare(
.and_then(|f| f.compare(right, left, operator.swap()).transpose())
{
return result;
} else {
log::debug!(
"No compare implementation found for LHS {}, RHS {}, and operator {}",
right.encoding().id(),
left.encoding().id(),
operator.swap(),
);
}

log::debug!(
"No compare implementation found for LHS {}, RHS {}, and operator {} (or inverse)",
right.encoding().id(),
left.encoding().id(),
operator.swap(),
);

// Fallback to arrow on canonical types
arrow_compare(left, right, operator)
}
Expand Down
2 changes: 2 additions & 0 deletions vortex-array/src/data/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,8 +393,10 @@ impl<T: AsRef<ArrayData>> ArrayStatistics for T {
}
}

// FIXME(ngates): this is really slow...
fn inherit_statistics(&self, parent: &dyn Statistics) {
let stats = self.statistics();
// The to_set call performs a slow clone of the stats
for (stat, scalar) in parent.to_set() {
stats.set(stat, scalar);
}
Expand Down
13 changes: 3 additions & 10 deletions vortex-file/src/read/mask.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,14 @@ use std::cmp::{max, min};
use std::fmt::{Display, Formatter};

use vortex_array::array::{BoolArray, ConstantArray, PrimitiveArray, SparseArray};
use vortex_array::compute::{and, filter, slice, take, try_cast, FilterMask, TakeOptions};
use vortex_array::compute::{and, filter, slice, try_cast, FilterMask};
use vortex_array::stats::ArrayStatistics;
use vortex_array::validity::{ArrayValidity, LogicalValidity};
use vortex_array::{ArrayDType, ArrayData, IntoArrayData, IntoArrayVariant};
use vortex_dtype::Nullability::NonNullable;
use vortex_dtype::{DType, PType};
use vortex_error::{vortex_bail, VortexExpect, VortexResult, VortexUnwrap};

const PREFER_TAKE_TO_FILTER_DENSITY: f64 = 1.0 / 1024.0;

/// Bitmap of selected rows within given [begin, end) row range
#[derive(Debug, Clone)]
pub struct RowMask {
Expand Down Expand Up @@ -213,13 +211,8 @@ impl RowMask {
return Ok(Some(sliced.clone()));
}

if (true_count as f64 / sliced.len() as f64) < PREFER_TAKE_TO_FILTER_DENSITY {
let indices = self.to_indices_array()?;
take(sliced, indices, TakeOptions::default()).map(Some)
} else {
let mask = FilterMask::try_from(self.bitmask.clone())?;
filter(sliced, mask).map(Some)
}
let mask = FilterMask::try_from(self.bitmask.clone())?;
filter(sliced, mask).map(Some)
}

pub fn to_indices_array(&self) -> VortexResult<ArrayData> {
Expand Down

0 comments on commit 91f3efe

Please sign in to comment.