From b5775c54b56c5a191512922586b2f1fef5b5c7e9 Mon Sep 17 00:00:00 2001 From: Ian Lai Date: Wed, 5 Feb 2025 09:54:23 +0000 Subject: [PATCH] refactor: switch BooleanBufferBuilder to NullBufferBuilder in MaybeNullBufferBuilder --- .../aggregates/group_values/null_builder.rs | 104 +++++------------- 1 file changed, 30 insertions(+), 74 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/group_values/null_builder.rs b/datafusion/physical-plan/src/aggregates/group_values/null_builder.rs index a584cf58e50a..2c179caf02cd 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/null_builder.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/null_builder.rs @@ -15,119 +15,75 @@ // specific language governing permissions and limitations // under the License. -use arrow_buffer::{BooleanBufferBuilder, NullBuffer}; +use arrow::array::NullBufferBuilder; +use arrow::buffer::NullBuffer; /// Builder for an (optional) null mask /// /// Optimized for avoid creating the bitmask when all values are non-null #[derive(Debug)] -pub(crate) enum MaybeNullBufferBuilder { - /// seen `row_count` rows but no nulls yet - NoNulls { row_count: usize }, - /// have at least one null value - /// +pub(crate) struct MaybeNullBufferBuilder { /// Note this is an Arrow *VALIDITY* buffer (so it is false for nulls, true /// for non-nulls) - Nulls(BooleanBufferBuilder), + nulls: NullBufferBuilder, } impl MaybeNullBufferBuilder { /// Create a new builder pub fn new() -> Self { - Self::NoNulls { row_count: 0 } + Self { + nulls: NullBufferBuilder::new(0), + } } /// Return true if the row at index `row` is null pub fn is_null(&self, row: usize) -> bool { - match self { - Self::NoNulls { .. } => false, - // validity mask means a unset bit is NULL - Self::Nulls(builder) => !builder.get_bit(row), - } + // validity mask means a unset bit is NULL + !self.nulls.is_valid(row) } /// Set the nullness of the next row to `is_null` /// - /// num_values is the current length of the rows being tracked - /// /// If `value` is true, the row is null. /// If `value` is false, the row is non null pub fn append(&mut self, is_null: bool) { - match self { - Self::NoNulls { row_count } if is_null => { - // have seen no nulls so far, this is the first null, - // need to create the nulls buffer for all currently valid values - // alloc 2x the need given we push a new but immediately - let mut nulls = BooleanBufferBuilder::new(*row_count * 2); - nulls.append_n(*row_count, true); - nulls.append(false); - *self = Self::Nulls(nulls); - } - Self::NoNulls { row_count } => { - *row_count += 1; - } - Self::Nulls(builder) => builder.append(!is_null), - } + self.nulls.append(!is_null) } pub fn append_n(&mut self, n: usize, is_null: bool) { - match self { - Self::NoNulls { row_count } if is_null => { - // have seen no nulls so far, this is the first null, - // need to create the nulls buffer for all currently valid values - // alloc 2x the need given we push a new but immediately - let mut nulls = BooleanBufferBuilder::new(*row_count * 2); - nulls.append_n(*row_count, true); - nulls.append_n(n, false); - *self = Self::Nulls(nulls); - } - Self::NoNulls { row_count } => { - *row_count += n; - } - Self::Nulls(builder) => builder.append_n(n, !is_null), + if is_null { + self.nulls.append_n_nulls(n); + } else { + self.nulls.append_n_non_nulls(n); } } /// return the number of heap allocated bytes used by this structure to store boolean values pub fn allocated_size(&self) -> usize { - match self { - Self::NoNulls { .. } => 0, - // BooleanBufferBuilder builder::capacity returns capacity in bits (not bytes) - Self::Nulls(builder) => builder.capacity() / 8, - } + // NullBufferBuilder builder::allocated_size returns capacity in bytes + self.nulls.allocated_size() } /// Return a NullBuffer representing the accumulated nulls so far - pub fn build(self) -> Option { - match self { - Self::NoNulls { .. } => None, - Self::Nulls(mut builder) => Some(NullBuffer::from(builder.finish())), - } + pub fn build(mut self) -> Option { + self.nulls.finish() } /// Returns a NullBuffer representing the first `n` rows accumulated so far /// shifting any remaining down by `n` pub fn take_n(&mut self, n: usize) -> Option { - match self { - Self::NoNulls { row_count } => { - *row_count -= n; - None - } - Self::Nulls(builder) => { - // Copy over the values at n..len-1 values to the start of a - // new builder and leave it in self - // - // TODO: it would be great to use something like `set_bits` from arrow here. - let mut new_builder = BooleanBufferBuilder::new(builder.len()); - for i in n..builder.len() { - new_builder.append(builder.get_bit(i)); - } - std::mem::swap(&mut new_builder, builder); - - // take only first n values from the original builder - new_builder.truncate(n); - Some(NullBuffer::from(new_builder.finish())) - } + // Copy over the values at n..len-1 values to the start of a + // new builder and leave it in self + // + // TODO: it would be great to use something like `set_bits` from arrow here. + let mut new_builder = NullBufferBuilder::new(self.nulls.len()); + for i in n..self.nulls.len() { + new_builder.append(self.nulls.is_valid(i)); } + std::mem::swap(&mut new_builder, &mut self.nulls); + + // take only first n values from the original builder + new_builder.truncate(n); + new_builder.finish() } }