Skip to content

Commit

Permalink
refactor: switch BooleanBufferBuilder to NullBufferBuilder in MaybeNu…
Browse files Browse the repository at this point in the history
…llBufferBuilder
  • Loading branch information
Ian Lai authored and Ian Lai committed Feb 5, 2025
1 parent e8d9b62 commit b5775c5
Showing 1 changed file with 30 additions and 74 deletions.
104 changes: 30 additions & 74 deletions datafusion/physical-plan/src/aggregates/group_values/null_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,119 +15,75 @@
// specific language governing permissions and limitations
// under the License.

use arrow_buffer::{BooleanBufferBuilder, NullBuffer};
use arrow::array::NullBufferBuilder;
use arrow::buffer::NullBuffer;

/// Builder for an (optional) null mask
///
/// Optimized for avoid creating the bitmask when all values are non-null
#[derive(Debug)]
pub(crate) enum MaybeNullBufferBuilder {
/// seen `row_count` rows but no nulls yet
NoNulls { row_count: usize },
/// have at least one null value
///
pub(crate) struct MaybeNullBufferBuilder {
/// Note this is an Arrow *VALIDITY* buffer (so it is false for nulls, true
/// for non-nulls)
Nulls(BooleanBufferBuilder),
nulls: NullBufferBuilder,
}

impl MaybeNullBufferBuilder {
/// Create a new builder
pub fn new() -> Self {
Self::NoNulls { row_count: 0 }
Self {
nulls: NullBufferBuilder::new(0),
}
}

/// Return true if the row at index `row` is null
pub fn is_null(&self, row: usize) -> bool {
match self {
Self::NoNulls { .. } => false,
// validity mask means a unset bit is NULL
Self::Nulls(builder) => !builder.get_bit(row),
}
// validity mask means a unset bit is NULL
!self.nulls.is_valid(row)
}

/// Set the nullness of the next row to `is_null`
///
/// num_values is the current length of the rows being tracked
///
/// If `value` is true, the row is null.
/// If `value` is false, the row is non null
pub fn append(&mut self, is_null: bool) {
match self {
Self::NoNulls { row_count } if is_null => {
// have seen no nulls so far, this is the first null,
// need to create the nulls buffer for all currently valid values
// alloc 2x the need given we push a new but immediately
let mut nulls = BooleanBufferBuilder::new(*row_count * 2);
nulls.append_n(*row_count, true);
nulls.append(false);
*self = Self::Nulls(nulls);
}
Self::NoNulls { row_count } => {
*row_count += 1;
}
Self::Nulls(builder) => builder.append(!is_null),
}
self.nulls.append(!is_null)
}

pub fn append_n(&mut self, n: usize, is_null: bool) {
match self {
Self::NoNulls { row_count } if is_null => {
// have seen no nulls so far, this is the first null,
// need to create the nulls buffer for all currently valid values
// alloc 2x the need given we push a new but immediately
let mut nulls = BooleanBufferBuilder::new(*row_count * 2);
nulls.append_n(*row_count, true);
nulls.append_n(n, false);
*self = Self::Nulls(nulls);
}
Self::NoNulls { row_count } => {
*row_count += n;
}
Self::Nulls(builder) => builder.append_n(n, !is_null),
if is_null {
self.nulls.append_n_nulls(n);
} else {
self.nulls.append_n_non_nulls(n);
}
}

/// return the number of heap allocated bytes used by this structure to store boolean values
pub fn allocated_size(&self) -> usize {
match self {
Self::NoNulls { .. } => 0,
// BooleanBufferBuilder builder::capacity returns capacity in bits (not bytes)
Self::Nulls(builder) => builder.capacity() / 8,
}
// NullBufferBuilder builder::allocated_size returns capacity in bytes
self.nulls.allocated_size()
}

/// Return a NullBuffer representing the accumulated nulls so far
pub fn build(self) -> Option<NullBuffer> {
match self {
Self::NoNulls { .. } => None,
Self::Nulls(mut builder) => Some(NullBuffer::from(builder.finish())),
}
pub fn build(mut self) -> Option<NullBuffer> {
self.nulls.finish()
}

/// Returns a NullBuffer representing the first `n` rows accumulated so far
/// shifting any remaining down by `n`
pub fn take_n(&mut self, n: usize) -> Option<NullBuffer> {
match self {
Self::NoNulls { row_count } => {
*row_count -= n;
None
}
Self::Nulls(builder) => {
// Copy over the values at n..len-1 values to the start of a
// new builder and leave it in self
//
// TODO: it would be great to use something like `set_bits` from arrow here.
let mut new_builder = BooleanBufferBuilder::new(builder.len());
for i in n..builder.len() {
new_builder.append(builder.get_bit(i));
}
std::mem::swap(&mut new_builder, builder);

// take only first n values from the original builder
new_builder.truncate(n);
Some(NullBuffer::from(new_builder.finish()))
}
// Copy over the values at n..len-1 values to the start of a
// new builder and leave it in self
//
// TODO: it would be great to use something like `set_bits` from arrow here.
let mut new_builder = NullBufferBuilder::new(self.nulls.len());
for i in n..self.nulls.len() {
new_builder.append(self.nulls.is_valid(i));
}
std::mem::swap(&mut new_builder, &mut self.nulls);

// take only first n values from the original builder
new_builder.truncate(n);
new_builder.finish()
}
}

0 comments on commit b5775c5

Please sign in to comment.