Skip to content

Commit

Permalink
perf: Improve median with no grouping by 2X (apache#14399)
Browse files Browse the repository at this point in the history
* improve median() no grouping case

* review
  • Loading branch information
2010YOUY01 authored Feb 2, 2025
1 parent fa0874b commit 1e0531f
Showing 1 changed file with 21 additions and 7 deletions.
28 changes: 21 additions & 7 deletions datafusion/functions-aggregate/src/median.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,14 +242,28 @@ impl<T: ArrowNumericType> Debug for MedianAccumulator<T> {

impl<T: ArrowNumericType> Accumulator for MedianAccumulator<T> {
fn state(&mut self) -> Result<Vec<ScalarValue>> {
let all_values = self
.all_values
.iter()
.map(|x| ScalarValue::new_primitive::<T>(Some(*x), &self.data_type))
.collect::<Result<Vec<_>>>()?;
// Convert `all_values` to `ListArray` and return a single List ScalarValue

let arr = ScalarValue::new_list_nullable(&all_values, &self.data_type);
Ok(vec![ScalarValue::List(arr)])
// Build offsets
let offsets =
OffsetBuffer::new(ScalarBuffer::from(vec![0, self.all_values.len() as i32]));

// Build inner array
let values_array = PrimitiveArray::<T>::new(
ScalarBuffer::from(std::mem::take(&mut self.all_values)),
None,
)
.with_data_type(self.data_type.clone());

// Build the result list array
let list_array = ListArray::new(
Arc::new(Field::new_list_field(self.data_type.clone(), true)),
offsets,
Arc::new(values_array),
None,
);

Ok(vec![ScalarValue::List(Arc::new(list_array))])
}

fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
Expand Down

0 comments on commit 1e0531f

Please sign in to comment.