Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor PrimitiveGroupValueBuilder to use MaybeNullBufferBuilder #12623

Merged
merged 9 commits into from
Sep 30, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -62,57 +62,81 @@ pub trait ArrayRowEq: Send + Sync {

pub struct PrimitiveGroupValueBuilder<T: ArrowPrimitiveType> {
group_values: Vec<T::Native>,
nulls: Vec<bool>,
// whether the array contains at least one null, for fast non-null path
has_null: bool,
/// Nulls buffer for the group values --
/// * None if we have seen no arrays yet
/// * Some if we have seen arrays and have at least one null
///
/// Note this is an Arrow *VALIDITY* buffer (so it is false for nulls, true
/// for non-nulls)
nulls: Option<BooleanBufferBuilder>,
/// If true, the input is guaranteed not to have nulls
nullable: bool,
}

impl<T> PrimitiveGroupValueBuilder<T>
where
T: ArrowPrimitiveType,
{
/// Create a new [`PrimitiveGroupValueBuilder`]
///
/// If `nullable` is false, it means the input will never have nulls
pub fn new(nullable: bool) -> Self {
Self {
group_values: vec![],
nulls: vec![],
has_null: false,
nulls: None,
nullable,
}
}
}

impl<T: ArrowPrimitiveType> ArrayRowEq for PrimitiveGroupValueBuilder<T> {
fn equal_to(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool {
// non-null fast path
// both non-null
// fast path when input has no nulls
if !self.nullable {
debug_assert!(self.nulls.is_none());
return self.group_values[lhs_row]
== array.as_primitive::<T>().value(rhs_row);
}

// lhs is non-null
if self.nulls[lhs_row] {
if array.is_null(rhs_row) {
// slow path if the input could have nulls
if let Some(nulls) = self.nulls.as_ref() {
// if lhs_row is valid (non null), but array is null
let lhs_is_null = !nulls.get_bit(lhs_row);
let rhs_is_null = array.is_null(rhs_row);
if lhs_is_null != rhs_is_null {
return false;
}

return self.group_values[lhs_row]
== array.as_primitive::<T>().value(rhs_row);
}

array.is_null(rhs_row)
self.group_values[lhs_row] == array.as_primitive::<T>().value(rhs_row)
}

fn append_val(&mut self, array: &ArrayRef, row: usize) {
if self.nullable && array.is_null(row) {
self.group_values.push(T::default_value());
self.nulls.push(false);
self.has_null = true;
} else {
let elem = array.as_primitive::<T>().value(row);
self.group_values.push(elem);
self.nulls.push(true);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems like this previous code manages self.nulls even when there are and have been no nulls at all in the input

let is_null = array.is_null(row);

match self.nulls.as_mut() {
Some(nulls) => {
if is_null {
nulls.append(false);
self.group_values.push(T::default_value());
} else {
nulls.append(true);
self.group_values.push(array.as_primitive::<T>().value(row));
}
}
None => {
if is_null {
// have seen no nulls so far, this is the first null,
// need to create the nulls buffer for all currently valid values
let num_values = self.group_values.len();
let mut nulls = BooleanBufferBuilder::new(num_values);
nulls.append_n(num_values, true);
nulls.append(false); // null
self.group_values.push(T::default_value());
self.nulls = Some(nulls);
} else {
// Had no nulls so far, and this is not a null
self.group_values.push(array.as_primitive::<T>().value(row));
}
}
}
}

Expand All @@ -121,37 +145,64 @@ impl<T: ArrowPrimitiveType> ArrayRowEq for PrimitiveGroupValueBuilder<T> {
}

fn size(&self) -> usize {
self.group_values.allocated_size() + self.nulls.allocated_size()
// BooleanBufferBuilder builder::capacity returns capacity in bits (not bytes)
let null_builder_size = self
.nulls
.as_ref()
.map(|nulls| nulls.capacity() / 8)
.unwrap_or(0);
self.group_values.allocated_size() + null_builder_size
}

fn build(self: Box<Self>) -> ArrayRef {
if self.has_null {
Arc::new(PrimitiveArray::<T>::new(
ScalarBuffer::from(self.group_values),
Some(NullBuffer::from(self.nulls)),
))
} else {
Arc::new(PrimitiveArray::<T>::new(
ScalarBuffer::from(self.group_values),
None,
))
}
let Self {
group_values,
nulls,
nullable: _,
} = *self;
let null_buffer = nulls.map(|mut nulls| NullBuffer::from(nulls.finish()));

Arc::new(PrimitiveArray::<T>::new(
ScalarBuffer::from(group_values),
null_buffer,
))
}

fn take_n(&mut self, n: usize) -> ArrayRef {
if self.has_null {
let first_n = self.group_values.drain(0..n).collect::<Vec<_>>();
let first_n_nulls = self.nulls.drain(0..n).collect::<Vec<_>>();
Arc::new(PrimitiveArray::<T>::new(
ScalarBuffer::from(first_n),
Some(NullBuffer::from(first_n_nulls)),
))
} else {
let first_n = self.group_values.drain(0..n).collect::<Vec<_>>();
self.nulls.truncate(self.nulls.len() - n);
Arc::new(PrimitiveArray::<T>::new(ScalarBuffer::from(first_n), None))
}
let first_n = self.group_values.drain(0..n).collect::<Vec<_>>();

let first_n_nulls = self.nulls.take().map(|nulls| {
let (first_n_nulls, remaining_nulls) = take_n_nulls(n, nulls);
self.nulls = Some(remaining_nulls);
first_n_nulls
});

Arc::new(PrimitiveArray::<T>::new(
ScalarBuffer::from(first_n),
first_n_nulls,
))
}
}

/// Takes the first `n` nulls from the `nulls` buffer and
///
/// Returns
/// * [`NullBuffer`] with the first `n` nulls
/// * [`BooleanBufferBuilder`] with the remaining nulls
fn take_n_nulls(
n: usize,
mut nulls: BooleanBufferBuilder,
) -> (NullBuffer, BooleanBufferBuilder) {
// Copy over the values at n..len-1 values to the start of a new builder
alamb marked this conversation as resolved.
Show resolved Hide resolved
// (todo it would be great to use something like `set_bits` from arrow here.
let mut new_builder = BooleanBufferBuilder::new(nulls.len());
for i in n..nulls.len() {
new_builder.append(nulls.get_bit(i));
}
// take only first n values from the original builder
nulls.truncate(n);
let null_buffer = NullBuffer::from(nulls.finish());
(null_buffer, new_builder)
}

pub struct ByteGroupValueBuilder<O>
Expand Down
Loading