Skip to content

Commit

Permalink
parametrize python aggregate tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael-J-Ward committed Aug 6, 2024
1 parent 761ff2b commit a02c9e2
Showing 1 changed file with 43 additions and 68 deletions.
111 changes: 43 additions & 68 deletions python/datafusion/tests/test_aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,78 +39,53 @@ def df():
)
return ctx.create_dataframe([[batch]])

@pytest.mark.parametrize("agg_expr, calc_expected", [
(f.avg(column("a")), lambda a, b, c, d: np.array(np.average(a))),
(f.corr(column("a"), column("b")), lambda a, b, c, d: np.array(np.corrcoef(a, b)[0][1])),
(f.count(column("a")), lambda a, b, c, d: pa.array([len(a)])),
# Sample (co)variance -> ddof=1
# Population (co)variance -> ddof=0
(f.covar(column("a"), column("b")), lambda a, b, c, d: np.array(np.cov(a, b, ddof=1)[0][1])),
(f.covar_pop(column("a"), column("c")), lambda a, b, c, d: np.array(np.cov(a, c, ddof=0)[0][1])),
(f.covar_samp(column("b"), column("c")), lambda a, b, c, d: np.array(np.cov(b, c, ddof=1)[0][1])),
# f.grouping(col_a), # No physical plan implemented yet
(f.max(column("a")), lambda a, b, c, d: np.array(np.max(a))),
(f.mean(column("b")), lambda a, b, c, d: np.array(np.mean(b))),
(f.median(column("b")), lambda a, b, c, d: np.array(np.median(b))),
(f.min(column("a")), lambda a, b, c, d: np.array(np.min(a))),
(f.sum(column("b")), lambda a, b, c, d: np.array(np.sum(b.to_pylist()))),
# Sample stdev -> ddof=1
# Population stdev -> ddof=0
(f.stddev(column("a")), lambda a, b, c, d: np.array(np.std(a, ddof=1))),
(f.stddev_pop(column("b")), lambda a, b, c, d: np.array(np.std(b, ddof=0))),
(f.stddev_samp(column("c")), lambda a, b, c, d: np.array(np.std(c, ddof=1))),
(f.var(column("a")), lambda a, b, c, d: np.array(np.var(a, ddof=1))),
(f.var_pop(column("b")), lambda a, b, c, d: np.array(np.var(b, ddof=0))),
(f.var_samp(column("c")), lambda a, b, c, d: np.array(np.var(c, ddof=1))),
])
def test_aggregation_stats(df, agg_expr, calc_expected):

def test_built_in_aggregation(df):
col_a = column("a")
col_b = column("b")
col_c = column("c")

agg_df = df.aggregate(
[],
[
f.approx_distinct(col_b),
f.approx_median(col_b),
f.approx_percentile_cont(col_b, lit(0.5)),
f.approx_percentile_cont_with_weight(col_b, lit(0.6), lit(0.5)),
f.array_agg(col_b),
f.avg(col_a),
f.corr(col_a, col_b),
f.count(col_a),
f.covar(col_a, col_b),
f.covar_pop(col_a, col_c),
f.covar_samp(col_b, col_c),
# f.grouping(col_a), # No physical plan implemented yet
f.max(col_a),
f.mean(col_b),
f.median(col_b),
f.min(col_a),
f.sum(col_b),
f.stddev(col_a),
f.stddev_pop(col_b),
f.stddev_samp(col_c),
f.var(col_a),
f.var_pop(col_b),
f.var_samp(col_c),
],
)
agg_df = df.aggregate([], [agg_expr])
result = agg_df.collect()[0]
values_a, values_b, values_c, values_d = df.collect()[0]
expected = calc_expected(values_a, values_b, values_c, values_d)
np.testing.assert_array_almost_equal(result.column(0), expected)

assert result.column(0) == pa.array([2], type=pa.uint64())
assert result.column(1) == pa.array([4])
assert result.column(2) == pa.array([4])
# Ref: https://github.com/apache/datafusion-python/issues/777
# assert result.column(3) == pa.array([6])
assert result.column(4) == pa.array([[4, 4, 6]])
np.testing.assert_array_almost_equal(result.column(5), np.average(values_a))
np.testing.assert_array_almost_equal(
result.column(6), np.corrcoef(values_a, values_b)[0][1]
)
assert result.column(7) == pa.array([len(values_a)])
# Sample (co)variance -> ddof=1
# Population (co)variance -> ddof=0
np.testing.assert_array_almost_equal(
result.column(8), np.cov(values_a, values_b, ddof=1)[0][1]
)
np.testing.assert_array_almost_equal(
result.column(9), np.cov(values_a, values_c, ddof=0)[0][1]
)
np.testing.assert_array_almost_equal(
result.column(10), np.cov(values_b, values_c, ddof=1)[0][1]
)
np.testing.assert_array_almost_equal(result.column(11), np.max(values_a))
np.testing.assert_array_almost_equal(result.column(12), np.mean(values_b))
np.testing.assert_array_almost_equal(result.column(13), np.median(values_b))
np.testing.assert_array_almost_equal(result.column(14), np.min(values_a))
np.testing.assert_array_almost_equal(
result.column(15), np.sum(values_b.to_pylist())
)
np.testing.assert_array_almost_equal(result.column(16), np.std(values_a, ddof=1))
np.testing.assert_array_almost_equal(result.column(17), np.std(values_b, ddof=0))
np.testing.assert_array_almost_equal(result.column(18), np.std(values_c, ddof=1))
np.testing.assert_array_almost_equal(result.column(19), np.var(values_a, ddof=1))
np.testing.assert_array_almost_equal(result.column(20), np.var(values_b, ddof=0))
np.testing.assert_array_almost_equal(result.column(21), np.var(values_c, ddof=1))

@pytest.mark.parametrize("agg_expr, expected", [
(f.approx_distinct(column("b")), pa.array([2], type=pa.uint64())),
(f.approx_median(column("b")), pa.array([4])),
(f.approx_percentile_cont(column("b"), lit(0.5)), pa.array([4])),
(
f.approx_percentile_cont_with_weight(column("b"), lit(0.6), lit(0.5)),
pa.array([6], type=pa.float64())
),
(f.array_agg(column("b")), pa.array([[4, 4, 6]])),
])
def test_aggregation(df, agg_expr, expected):
agg_df = df.aggregate([], [agg_expr])
result = agg_df.collect()[0]
assert result.column(0) == expected


def test_bit_add_or_xor(df):
Expand Down

0 comments on commit a02c9e2

Please sign in to comment.