Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
akuzm committed Oct 22, 2024
1 parent c723a5a commit 00910de
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 67 deletions.
40 changes: 9 additions & 31 deletions tsl/src/nodes/vector_agg/function/agg_vector_validity_helper.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,62 +11,40 @@
*/

static pg_attribute_always_inline void
FUNCTION_NAME(vector_impl_arrow)(void *agg_state, const ArrowArray *vector, const uint64 *valid1,
const uint64 *valid2, MemoryContext agg_extra_mctx)
FUNCTION_NAME(vector_impl_arrow)(void *agg_state, const ArrowArray *vector, const uint64 *filter,
MemoryContext agg_extra_mctx)
{
const int n = vector->length;
const CTYPE *values = vector->buffers[1];
FUNCTION_NAME(vector_impl)(agg_state, n, values, valid1, valid2, agg_extra_mctx);
FUNCTION_NAME(vector_impl)(agg_state, n, values, filter, agg_extra_mctx);
}

static pg_noinline void
FUNCTION_NAME(vector_all_valid)(void *agg_state, const ArrowArray *vector,
MemoryContext agg_extra_mctx)
{
FUNCTION_NAME(vector_impl_arrow)(agg_state, vector, NULL, NULL, agg_extra_mctx);
FUNCTION_NAME(vector_impl_arrow)(agg_state, vector, NULL, agg_extra_mctx);
}

static pg_noinline void
FUNCTION_NAME(vector_one_validity)(void *agg_state, const ArrowArray *vector, const uint64 *valid,
FUNCTION_NAME(vector_one_validity)(void *agg_state, const ArrowArray *vector, const uint64 *filter,
MemoryContext agg_extra_mctx)
{
FUNCTION_NAME(vector_impl_arrow)(agg_state, vector, valid, NULL, agg_extra_mctx);
}

static pg_noinline void
FUNCTION_NAME(vector_two_validity)(void *agg_state, const ArrowArray *vector, const uint64 *valid1,
const uint64 *valid2, MemoryContext agg_extra_mctx)
{
FUNCTION_NAME(vector_impl_arrow)(agg_state, vector, valid1, valid2, agg_extra_mctx);
FUNCTION_NAME(vector_impl_arrow)(agg_state, vector, filter, agg_extra_mctx);
}

static void
FUNCTION_NAME(vector)(void *agg_state, const ArrowArray *vector, const uint64 *filter,
MemoryContext agg_extra_mctx)
{
const uint64 *row_validity = vector->buffers[0];

if (row_validity == NULL && filter == NULL)
if (filter == NULL)
{
/* All rows are valid and we don't have to check any validity bitmaps. */
FUNCTION_NAME(vector_all_valid)(agg_state, vector, agg_extra_mctx);
}
else if (row_validity != NULL && filter == NULL)
{
/* Have to check only one bitmap -- row validity bitmap. */
FUNCTION_NAME(vector_one_validity)(agg_state, vector, row_validity, agg_extra_mctx);
}
else if (filter != NULL && row_validity == NULL)
{
/* Have to check only one bitmap -- results of the vectorized filter. */
FUNCTION_NAME(vector_one_validity)(agg_state, vector, filter, agg_extra_mctx);
}
else
{
/*
* Have to check both the row validity bitmap and the results of the
* vectorized filter.
*/
FUNCTION_NAME(vector_two_validity)(agg_state, vector, row_validity, filter, agg_extra_mctx);
/* Have to check only one combined validity bitmap. */
FUNCTION_NAME(vector_one_validity)(agg_state, vector, filter, agg_extra_mctx);
}
}
30 changes: 11 additions & 19 deletions tsl/src/nodes/vector_agg/function/float48_accum_single.c
Original file line number Diff line number Diff line change
Expand Up @@ -99,16 +99,15 @@ FUNCTION_NAME(emit)(void *agg_state, Datum *out_result, bool *out_isnull)
* Youngs-Cramer update for rows after the first.
*/
static pg_attribute_always_inline void
FUNCTION_NAME(update)(const uint64 *valid1, const uint64 *valid2, const CTYPE *values, int row,
double *N, double *Sx
FUNCTION_NAME(update)(const uint64 *filter, const CTYPE *values, int row, double *N, double *Sx
#ifdef NEED_SXX
,
double *Sxx
#endif
)
{
const CTYPE newval = values[row];
if (!arrow_row_both_valid(valid1, valid2, row))
if (!arrow_row_is_valid(filter, row))
{
return;
}
Expand Down Expand Up @@ -185,20 +184,19 @@ FUNCTION_NAME(combine)(double *inout_N, double *inout_Sx,
}

#ifdef NEED_SXX
#define UPDATE(valid1, valid2, values, row, N, Sx, Sxx) \
FUNCTION_NAME(update)(valid1, valid2, values, row, N, Sx, Sxx)
#define UPDATE(filter, values, row, N, Sx, Sxx) \
FUNCTION_NAME(update)(filter, values, row, N, Sx, Sxx)
#define COMBINE(inout_N, inout_Sx, inout_Sxx, N2, Sx2, Sxx2) \
FUNCTION_NAME(combine)(inout_N, inout_Sx, inout_Sxx, N2, Sx2, Sxx2)
#else
#define UPDATE(valid1, valid2, values, row, N, Sx, Sxx) \
FUNCTION_NAME(update)(valid1, valid2, values, row, N, Sx)
#define UPDATE(filter, values, row, N, Sx, Sxx) FUNCTION_NAME(update)(filter, values, row, N, Sx)
#define COMBINE(inout_N, inout_Sx, inout_Sxx, N2, Sx2, Sxx2) \
FUNCTION_NAME(combine)(inout_N, inout_Sx, N2, Sx2)
#endif

static pg_attribute_always_inline void
FUNCTION_NAME(vector_impl)(void *agg_state, size_t n, const CTYPE *values, const uint64 *valid1,
const uint64 *valid2, MemoryContext agg_extra_mctx)
FUNCTION_NAME(vector_impl)(void *agg_state, size_t n, const CTYPE *values, const uint64 *filter,
MemoryContext agg_extra_mctx)
{
/*
* Vector registers can be up to 512 bits wide.
Expand Down Expand Up @@ -228,7 +226,7 @@ FUNCTION_NAME(vector_impl)(void *agg_state, size_t n, const CTYPE *values, const
for (; row < n; row++)
{
const CTYPE newval = values[row];
if (arrow_row_both_valid(valid1, valid2, row))
if (arrow_row_is_valid(filter, row))
{
Narray[inner] = 1;
Sxarray[inner] = newval;
Expand All @@ -246,7 +244,7 @@ FUNCTION_NAME(vector_impl)(void *agg_state, size_t n, const CTYPE *values, const
for (size_t inner = row % UNROLL_SIZE; inner > 0 && inner < UNROLL_SIZE && row < n;
inner++, row++)
{
UPDATE(valid1, valid2, values, row, &Narray[inner], &Sxarray[inner], &Sxxarray[inner]);
UPDATE(filter, values, row, &Narray[inner], &Sxarray[inner], &Sxxarray[inner]);
}
#endif

Expand All @@ -258,13 +256,7 @@ FUNCTION_NAME(vector_impl)(void *agg_state, size_t n, const CTYPE *values, const
{
for (size_t inner = 0; inner < UNROLL_SIZE; inner++)
{
UPDATE(valid1,
valid2,
values,
row + inner,
&Narray[inner],
&Sxarray[inner],
&Sxxarray[inner]);
UPDATE(filter, values, row + inner, &Narray[inner], &Sxarray[inner], &Sxxarray[inner]);
}
}

Expand All @@ -274,7 +266,7 @@ FUNCTION_NAME(vector_impl)(void *agg_state, size_t n, const CTYPE *values, const
for (; row < n; row++)
{
const size_t inner = row % UNROLL_SIZE;
UPDATE(valid1, valid2, values, row, &Narray[inner], &Sxarray[inner], &Sxxarray[inner]);
UPDATE(filter, values, row, &Narray[inner], &Sxarray[inner], &Sxxarray[inner]);
}

/*
Expand Down
6 changes: 3 additions & 3 deletions tsl/src/nodes/vector_agg/function/int128_accum_single.c
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ FUNCTION_NAME(emit)(void *agg_state, Datum *out_result, bool *out_isnull)
}

static pg_attribute_always_inline void
FUNCTION_NAME(vector_impl)(void *agg_state, int n, const CTYPE *values, const uint64 *valid1,
const uint64 *valid2, MemoryContext agg_extra_mctx)
FUNCTION_NAME(vector_impl)(void *agg_state, int n, const CTYPE *values, const uint64 *filter,
MemoryContext agg_extra_mctx)
{
int64 N = 0;
int128 sumX = 0;
Expand All @@ -82,7 +82,7 @@ FUNCTION_NAME(vector_impl)(void *agg_state, int n, const CTYPE *values, const ui
#endif
for (int row = 0; row < n; row++)
{
const bool row_ok = arrow_row_both_valid(valid1, valid2, row);
const bool row_ok = arrow_row_is_valid(filter, row);
const CTYPE value = values[row];
N += row_ok;
sumX += value * row_ok;
Expand Down
6 changes: 3 additions & 3 deletions tsl/src/nodes/vector_agg/function/int24_avg_accum_single.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ case PG_AGG_OID_HELPER(AGG_NAME, PG_TYPE):
#else

static pg_attribute_always_inline void
FUNCTION_NAME(vector_impl)(void *agg_state, int n, const CTYPE *values, const uint64 *valid1,
const uint64 *valid2, MemoryContext agg_extra_mctx)
FUNCTION_NAME(vector_impl)(void *agg_state, int n, const CTYPE *values, const uint64 *filter,
MemoryContext agg_extra_mctx)
{
int64 batch_count = 0;
int64 batch_sum = 0;
for (int row = 0; row < n; row++)
{
const bool row_ok = arrow_row_both_valid(valid1, valid2, row);
const bool row_ok = arrow_row_is_valid(filter, row);
batch_count += row_ok;
batch_sum += values[row] * row_ok;
}
Expand Down
6 changes: 3 additions & 3 deletions tsl/src/nodes/vector_agg/function/int24_sum_single.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ case PG_AGG_OID_HELPER(AGG_NAME, PG_TYPE):
#else

static pg_attribute_always_inline void
FUNCTION_NAME(vector_impl)(void *agg_state, int n, const CTYPE *values, const uint64 *valid1,
const uint64 *valid2, MemoryContext agg_extra_mctx)
FUNCTION_NAME(vector_impl)(void *agg_state, int n, const CTYPE *values, const uint64 *filter,
MemoryContext agg_extra_mctx)
{
Int24SumState *state = (Int24SumState *) agg_state;

Expand All @@ -37,7 +37,7 @@ FUNCTION_NAME(vector_impl)(void *agg_state, int n, const CTYPE *values, const ui
bool have_result = false;
for (int row = 0; row < n; row++)
{
const bool row_ok = arrow_row_both_valid(valid1, valid2, row);
const bool row_ok = arrow_row_is_valid(filter, row);
batch_sum += values[row] * row_ok;
have_result |= row_ok;
}
Expand Down
6 changes: 3 additions & 3 deletions tsl/src/nodes/vector_agg/function/minmax_arithmetic_single.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ case PG_AGG_OID_HELPER(AGG_NAME, PG_TYPE):
return &FUNCTION_NAME(argdef);
#else
static pg_attribute_always_inline void
FUNCTION_NAME(vector_impl)(void *agg_state, int n, const CTYPE *values, const uint64 *valid1,
const uint64 *valid2, MemoryContext agg_extra_mctx)
FUNCTION_NAME(vector_impl)(void *agg_state, int n, const CTYPE *values, const uint64 *filter,
MemoryContext agg_extra_mctx)
{
MinMaxState *state = (MinMaxState *) agg_state;

Expand All @@ -20,7 +20,7 @@ FUNCTION_NAME(vector_impl)(void *agg_state, int n, const CTYPE *values, const ui
for (int row = 0; row < n; row++)
{
const CTYPE new_value = values[row];
const bool new_value_ok = arrow_row_both_valid(valid1, valid2, row);
const bool new_value_ok = arrow_row_is_valid(filter, row);

/*
* Note that we have to properly handle NaNs and Infinities for floats.
Expand Down
10 changes: 5 additions & 5 deletions tsl/src/nodes/vector_agg/function/sum_float_single.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ FUNCTION_NAME(emit)(void *agg_state, Datum *out_result, bool *out_isnull)
}

static pg_attribute_always_inline void
FUNCTION_NAME(vector_impl)(void *agg_state, int n, const CTYPE *values, const uint64 *valid1,
const uint64 *valid2, MemoryContext agg_extra_mctx)
FUNCTION_NAME(vector_impl)(void *agg_state, int n, const CTYPE *values, const uint64 *filter,
MemoryContext agg_extra_mctx)
{
/*
* Vector registers can be up to 512 bits wide.
Expand All @@ -46,15 +46,15 @@ FUNCTION_NAME(vector_impl)(void *agg_state, int n, const CTYPE *values, const ui
* infinities and NaNs.
*/
#define INNER_LOOP \
const bool valid = arrow_row_both_valid(valid1, valid2, row); \
const bool row_valid = arrow_row_is_valid(filter, row); \
union \
{ \
CTYPE f; \
MASKTYPE m; \
} u = { .f = values[row] }; \
u.m &= valid ? ~(MASKTYPE) 0 : (MASKTYPE) 0; \
u.m &= row_valid ? ~(MASKTYPE) 0 : (MASKTYPE) 0; \
*dest += u.f; \
*have_result |= valid;
*have_result |= row_valid;

INNER_LOOP
}
Expand Down

0 comments on commit 00910de

Please sign in to comment.