Skip to content

Commit

Permalink
refactor: Consolidate single group by column code into sub modules (a…
Browse files Browse the repository at this point in the history
…pache#13392)

* sort out codes of single column group by.

* sort out codes.

* move row to suitable place, and improve comments.

* fix doc.
  • Loading branch information
Rachelint authored Nov 14, 2024
1 parent 2d86725 commit 1d1f353
Show file tree
Hide file tree
Showing 11 changed files with 66 additions and 23 deletions.
43 changes: 30 additions & 13 deletions datafusion/physical-plan/src/aggregates/group_values/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,27 @@
use arrow::record_batch::RecordBatch;
use arrow_array::{downcast_primitive, ArrayRef};
use arrow_schema::{DataType, SchemaRef};
use bytes_view::GroupValuesBytesView;
use datafusion_common::Result;

pub(crate) mod primitive;
use datafusion_expr::EmitTo;
use primitive::GroupValuesPrimitive;

mod multi_column;
pub(crate) mod multi_group_by;

mod row;
use multi_column::GroupValuesColumn;
mod single_group_by;
use datafusion_physical_expr::binary_map::OutputType;
use multi_group_by::GroupValuesColumn;
use row::GroupValuesRows;

mod bytes;
mod bytes_view;
use bytes::GroupValuesByes;
use datafusion_physical_expr::binary_map::OutputType;
pub(crate) use single_group_by::primitive::HashValue;

use crate::aggregates::order::GroupOrdering;
use crate::aggregates::{
group_values::single_group_by::{
bytes::GroupValuesByes, bytes_view::GroupValuesBytesView,
primitive::GroupValuesPrimitive,
},
order::GroupOrdering,
};

mod null_builder;

Expand Down Expand Up @@ -77,7 +80,7 @@ mod null_builder;
/// Each distinct group in a hash aggregation is identified by a unique group id
/// (usize) which is assigned by instances of this trait. Group ids are
/// continuous without gaps, starting from 0.
pub trait GroupValues: Send {
pub(crate) trait GroupValues: Send {
/// Calculates the group id for each input row of `cols`, assigning new
/// group ids as necessary.
///
Expand Down Expand Up @@ -106,7 +109,21 @@ pub trait GroupValues: Send {
}

/// Return a specialized implementation of [`GroupValues`] for the given schema.
pub fn new_group_values(
///
/// [`GroupValues`] implementations choosing logic:
///
/// - If group by single column, and type of this column has
/// the specific [`GroupValues`] implementation, such implementation
/// will be chosen.
///
/// - If group by multiple columns, and all column types have the specific
/// [`GroupColumn`] implementations, [`GroupValuesColumn`] will be chosen.
///
/// - Otherwise, the general implementation [`GroupValuesRows`] will be chosen.
///
/// [`GroupColumn`]: crate::aggregates::group_values::multi_group_by::GroupColumn
///
pub(crate) fn new_group_values(
schema: SchemaRef,
group_ordering: &GroupOrdering,
) -> Result<Box<dyn GroupValues>> {
Expand Down Expand Up @@ -147,7 +164,7 @@ pub fn new_group_values(
}
}

if multi_column::supported_schema(schema.as_ref()) {
if multi_group_by::supported_schema(schema.as_ref()) {
if matches!(group_ordering, GroupOrdering::None) {
Ok(Box::new(GroupValuesColumn::<false>::try_new(schema)?))
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::aggregates::group_values::multi_column::{nulls_equal_to, GroupColumn};
use crate::aggregates::group_values::multi_group_by::{nulls_equal_to, GroupColumn};
use crate::aggregates::group_values::null_builder::MaybeNullBufferBuilder;
use arrow::array::{AsArray, BufferBuilder, GenericBinaryArray, GenericStringArray};
use arrow::buffer::{OffsetBuffer, ScalarBuffer};
Expand Down Expand Up @@ -403,7 +403,7 @@ where
mod tests {
use std::sync::Arc;

use crate::aggregates::group_values::multi_column::bytes::ByteGroupValueBuilder;
use crate::aggregates::group_values::multi_group_by::bytes::ByteGroupValueBuilder;
use arrow_array::{ArrayRef, StringArray};
use arrow_buffer::{BooleanBufferBuilder, NullBuffer};
use datafusion_physical_expr::binary_map::OutputType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::aggregates::group_values::multi_column::{nulls_equal_to, GroupColumn};
use crate::aggregates::group_values::multi_group_by::{nulls_equal_to, GroupColumn};
use crate::aggregates::group_values::null_builder::MaybeNullBufferBuilder;
use arrow::array::{make_view, AsArray, ByteView};
use arrow::buffer::ScalarBuffer;
Expand Down Expand Up @@ -544,7 +544,7 @@ impl<B: ByteViewType> GroupColumn for ByteViewGroupValueBuilder<B> {
mod tests {
use std::sync::Arc;

use crate::aggregates::group_values::multi_column::bytes_view::ByteViewGroupValueBuilder;
use crate::aggregates::group_values::multi_group_by::bytes_view::ByteViewGroupValueBuilder;
use arrow::array::AsArray;
use arrow::datatypes::StringViewType;
use arrow_array::{ArrayRef, StringViewArray};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
// specific language governing permissions and limitations
// under the License.

//! `GroupValues` implementations for multi group by cases
mod bytes;
mod bytes_view;
mod primitive;

use std::mem::{self, size_of};

use crate::aggregates::group_values::multi_column::{
use crate::aggregates::group_values::multi_group_by::{
bytes::ByteGroupValueBuilder, bytes_view::ByteViewGroupValueBuilder,
primitive::PrimitiveGroupValueBuilder,
};
Expand Down Expand Up @@ -1138,7 +1140,9 @@ mod tests {
use datafusion_common::utils::proxy::RawTableAllocExt;
use datafusion_expr::EmitTo;

use crate::aggregates::group_values::{multi_column::GroupValuesColumn, GroupValues};
use crate::aggregates::group_values::{
multi_group_by::GroupValuesColumn, GroupValues,
};

use super::GroupIndexView;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::aggregates::group_values::multi_column::{nulls_equal_to, GroupColumn};
use crate::aggregates::group_values::multi_group_by::{nulls_equal_to, GroupColumn};
use crate::aggregates::group_values::null_builder::MaybeNullBufferBuilder;
use arrow::buffer::ScalarBuffer;
use arrow_array::cast::AsArray;
Expand Down Expand Up @@ -208,7 +208,7 @@ impl<T: ArrowPrimitiveType, const NULLABLE: bool> GroupColumn
mod tests {
use std::sync::Arc;

use crate::aggregates::group_values::multi_column::primitive::PrimitiveGroupValueBuilder;
use crate::aggregates::group_values::multi_group_by::primitive::PrimitiveGroupValueBuilder;
use arrow::datatypes::Int64Type;
use arrow_array::{ArrayRef, Int64Array};
use arrow_buffer::{BooleanBufferBuilder, NullBuffer};
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! `GroupValues` implementations for single group by cases
pub(crate) mod bytes;
pub(crate) mod bytes_view;
pub(crate) mod primitive;
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ use crate::execution_plan::CardinalityEffect;
use datafusion_physical_expr::aggregate::AggregateFunctionExpr;
use itertools::Itertools;

pub mod group_values;
pub(crate) mod group_values;
mod no_grouping;
pub mod order;
mod row_hash;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/aggregates/topk/hash_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

//! A wrapper around `hashbrown::RawTable` that allows entries to be tracked by index
use crate::aggregates::group_values::primitive::HashValue;
use crate::aggregates::group_values::HashValue;
use crate::aggregates::topk::heap::Comparable;
use ahash::RandomState;
use arrow::datatypes::i256;
Expand Down

0 comments on commit 1d1f353

Please sign in to comment.