Skip to content

Commit

Permalink
Support vectorized aggregation on Hypercore TAM
Browse files Browse the repository at this point in the history
Add support for running VectorAgg on top of scans on Hypercore
TAM. Currently, only ColumnarScan can run below VectorAgg when
Hypercore TAM is used. In theory, a SeqScan or IndexScan reading from
Hypercore TAM should also work because they would produce Arrow
slots. However, only ColumnarScan performs vectorized filtering, which
is currently assumed to happen before the VectorAgg node.

In ColumnarScan, it is necessary to turn off projection when VectorAgg
is used. Otherwise, it would project the arrow slot into a virtual
slot, thus losing the vector data. Ideally, a projection should never
be planned to begin with, but this isn't possible since VectorAgg
modifies existing non-vectorized Agg plans that already includes
projections.
  • Loading branch information
erimatnor committed Feb 13, 2025
1 parent 1464965 commit af64c7b
Show file tree
Hide file tree
Showing 29 changed files with 1,247 additions and 48 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/linux-32bit-build-and-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ jobs:
append-* transparent_decompression-*
transparent_decompress_chunk-* pg_dump telemetry bgw_db_scheduler*
hypercore_vacuum vectorized_aggregation vector_agg_text
vector_agg_groupagg
vector_agg_groupagg hypercore_parallel hypercore_vectoragg
SKIPS: chunk_adaptive histogram_test-*
EXTENSIONS: "postgres_fdw test_decoding pageinspect pgstattuple"
strategy:
Expand Down
1 change: 1 addition & 0 deletions .unreleased/pr_7655
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implements: #7655 Support vectorized aggregation on Hypercore TAM
8 changes: 7 additions & 1 deletion tsl/src/hypercore/arrow_tts.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "arrow_cache.h"
#include "compression/arrow_c_data_interface.h"
#include "debug_assert.h"
#include "nodes/decompress_chunk/compressed_batch.h"

#include <limits.h>

Expand Down Expand Up @@ -88,6 +89,10 @@ typedef struct ArrowTupleTableSlot
const uint64 *arrow_qual_result; /* Bitmap with result of qual
* filtering over arrow_array. NULL if
* no filtering has been applied. */

/* Struct to hold values for one column. Necessary for compatibility with
* vector aggs. */
struct CompressedColumnValues ccvalues;
} ArrowTupleTableSlot;

extern const TupleTableSlotOps TTSOpsArrowTuple;
Expand Down Expand Up @@ -413,8 +418,9 @@ arrow_slot_per_segment_memory_context(const TupleTableSlot *slot)
return aslot->per_segment_mcxt;
}

extern bool is_compressed_col(const TupleDesc tupdesc, AttrNumber attno);
extern const ArrowArray *arrow_slot_get_array(TupleTableSlot *slot, AttrNumber attno);

extern bool is_compressed_col(const TupleDesc tupdesc, AttrNumber attno);
extern void arrow_slot_set_referenced_attrs(TupleTableSlot *slot, Bitmapset *attrs);
extern void arrow_slot_set_index_attrs(TupleTableSlot *slot, Bitmapset *attrs);

Expand Down
5 changes: 3 additions & 2 deletions tsl/src/nodes/columnar_scan/columnar_scan.c
Original file line number Diff line number Diff line change
Expand Up @@ -996,9 +996,10 @@ static CustomScanMethods columnar_scan_plan_methods = {
};

bool
is_columnar_scan(const CustomScan *scan)
is_columnar_scan(const Plan *plan)
{
return scan->methods == &columnar_scan_plan_methods;
return IsA(plan, CustomScan) &&
((const CustomScan *) plan)->methods == &columnar_scan_plan_methods;
}

typedef struct VectorQualInfoHypercore
Expand Down
2 changes: 1 addition & 1 deletion tsl/src/nodes/columnar_scan/columnar_scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ typedef struct ColumnarScanPath
extern ColumnarScanPath *columnar_scan_path_create(PlannerInfo *root, RelOptInfo *rel,
Relids required_outer, int parallel_workers);
extern void columnar_scan_set_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, Hypertable *ht);
extern bool is_columnar_scan(const CustomScan *scan);
extern bool is_columnar_scan(const Plan *plan);
extern void _columnar_scan_init(void);

#endif /* TIMESCALEDB_COLUMNAR_SCAN_H */
3 changes: 2 additions & 1 deletion tsl/src/nodes/vector_agg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ set(SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/grouping_policy_batch.c
${CMAKE_CURRENT_SOURCE_DIR}/grouping_policy_hash.c
${CMAKE_CURRENT_SOURCE_DIR}/plan.c
${CMAKE_CURRENT_SOURCE_DIR}/plan_decompress_chunk.c)
${CMAKE_CURRENT_SOURCE_DIR}/plan_decompress_chunk.c
${CMAKE_CURRENT_SOURCE_DIR}/plan_tam.c)
target_sources(${TSL_LIBRARY_NAME} PRIVATE ${SOURCES})
142 changes: 132 additions & 10 deletions tsl/src/nodes/vector_agg/exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,28 @@

#include <commands/explain.h>
#include <executor/executor.h>
#include <executor/tuptable.h>
#include <nodes/extensible.h>
#include <nodes/makefuncs.h>
#include <nodes/nodeFuncs.h>
#include <nodes/pg_list.h>
#include <optimizer/optimizer.h>

#include "nodes/vector_agg/exec.h"

#include "compression/arrow_c_data_interface.h"
#include "hypercore/arrow_tts.h"
#include "hypercore/vector_quals.h"
#include "nodes/columnar_scan/columnar_scan.h"
#include "nodes/decompress_chunk/compressed_batch.h"
#include "nodes/decompress_chunk/exec.h"
#include "nodes/decompress_chunk/vector_quals.h"
#include "nodes/vector_agg.h"
#include "nodes/vector_agg/plan.h"

static int
get_input_offset(const CustomScanState *state, const Var *var)
get_input_offset_decompress_chunk(const DecompressChunkState *decompress_state, const Var *var)
{
const DecompressChunkState *decompress_state = (DecompressChunkState *) state;
const DecompressContext *dcontext = &decompress_state->decompress_context;

/*
Expand Down Expand Up @@ -58,16 +62,57 @@ get_input_offset(const CustomScanState *state, const Var *var)
}

static void
get_column_storage_properties(const CustomScanState *state, int input_offset,
GroupingColumn *result)
get_column_storage_properties_decompress_chunk(const DecompressChunkState *state, int input_offset,
GroupingColumn *result)
{
const DecompressChunkState *decompress_state = (DecompressChunkState *) state;
const DecompressContext *dcontext = &decompress_state->decompress_context;
const DecompressContext *dcontext = &state->decompress_context;
const CompressionColumnDescription *desc = &dcontext->compressed_chunk_columns[input_offset];
result->value_bytes = desc->value_bytes;
result->by_value = desc->by_value;
}

/*
* Given a Var reference, get the offset of the corresponding attribute in the
* input tuple.
*
* For a node returning arrow slots, this is just the attribute number in the
* Var. But if the node is DecompressChunk, it is necessary to translate
* between the compressed and non-compressed columns.
*/
static int
get_input_offset(const CustomScanState *state, const Var *var)
{
if (TTS_IS_ARROWTUPLE(state->ss.ss_ScanTupleSlot))
return AttrNumberGetAttrOffset(var->varattno);

return get_input_offset_decompress_chunk((const DecompressChunkState *) state, var);
}

/*
* Get the type length and "byval" properties for the grouping column given by
* the input offset.
*
* For a node returning arrow slots, the properties can be read directly from
* the scanned relation's tuple descriptor. For DecompressChunk, the input
* offset references the compressed relation.
*/
static void
get_column_storage_properties(const CustomScanState *state, int input_offset,
GroupingColumn *result)
{
if (TTS_IS_ARROWTUPLE(state->ss.ss_ScanTupleSlot))
{
const TupleDesc tupdesc = RelationGetDescr(state->ss.ss_currentRelation);
result->by_value = TupleDescAttr(tupdesc, input_offset)->attbyval;
result->value_bytes = TupleDescAttr(tupdesc, input_offset)->attlen;
return;
}

get_column_storage_properties_decompress_chunk((const DecompressChunkState *) state,
input_offset,
result);
}

static void
vector_agg_begin(CustomScanState *node, EState *estate, int eflags)
{
Expand Down Expand Up @@ -312,6 +357,49 @@ compressed_batch_get_next_slot(VectorAggState *vector_agg_state)
return &batch_state->decompressed_scan_slot_data.base;
}

/*
* Get the next slot to aggregate for a arrow tuple table slot.
*
* Implements "get next slot" on top of ColumnarScan (or any node producing
* ArrowTupleTableSlots). It just reads the slot from the child node.
*/
static TupleTableSlot *
arrow_get_next_slot(VectorAggState *vector_agg_state)
{
TupleTableSlot *slot = vector_agg_state->custom.ss.ss_ScanTupleSlot;

if (!TTS_EMPTY(slot))
{
Assert(TTS_IS_ARROWTUPLE(slot));

/* If we read an arrow slot previously, the entire arrow array should
* have been aggregated so we should mark it is consumed so that we
* get the next array (or end) when we read the next slot. */

arrow_slot_mark_consumed(slot);
}

slot = ExecProcNode(linitial(vector_agg_state->custom.custom_ps));

if (TupIsNull(slot))
{
/* The input has ended. */
vector_agg_state->input_ended = true;
return NULL;
}

Assert(TTS_IS_ARROWTUPLE(slot));

/* Filtering should have happened in the scan node below so the slot
* should not be consumed here. */
Assert(!arrow_slot_is_consumed(slot));

/* Remember the slot until we're called next time */
vector_agg_state->custom.ss.ss_ScanTupleSlot = slot;

return slot;
}

/*
* Initialize vector quals for a compressed batch.
*
Expand Down Expand Up @@ -341,6 +429,18 @@ compressed_batch_init_vector_quals(VectorAggState *agg_state, VectorAggDef *agg_
return &agg_state->vqual_state.vqstate;
}

/*
* Initialize FILTER vector quals for an arrow tuple slot.
*
* Used to implement vectorized aggregate function filter clause.
*/
static VectorQualState *
arrow_init_vector_quals(VectorAggState *agg_state, VectorAggDef *agg_def, TupleTableSlot *slot)
{
vector_qual_state_init(&agg_state->vqual_state.vqstate, agg_def->filter_clauses, slot);
return &agg_state->vqual_state.vqstate;
}

static TupleTableSlot *
vector_agg_exec(CustomScanState *node)
{
Expand Down Expand Up @@ -481,20 +581,42 @@ Node *
vector_agg_state_create(CustomScan *cscan)
{
VectorAggState *state = (VectorAggState *) newNode(sizeof(VectorAggState), T_CustomScanState);
CustomScan *childscan = castNode(CustomScan, linitial(cscan->custom_plans));

state->custom.methods = &exec_methods;

/*
* Initialize VectorAggState to process vector slots from different
* subnodes. Currently, only compressed batches are supported, but arrow
* slots will be supported as well.
* subnodes.
*
* VectorAgg supports two child nodes: ColumnarScan (producing arrow tuple
* table slots) and DecompressChunk (producing compressed batches).
*
* When the child is ColumnarScan, VectorAgg expects Arrow slots that
* carry arrow arrays. ColumnarScan performs standard qual filtering and
* vectorized qual filtering prior to handing the slot up to VectorAgg.
*
* When the child is DecompressChunk, VectorAgg doesn't read the slot from
* the child node. Instead, it bypasses DecompressChunk and reads
* compressed tuples directly from the grandchild. It therefore needs to
* handle batch decompression and vectorized qual filtering itself, in its
* own "get next slot" implementation.
*
* The vector qual init functions are needed to implement vectorized
* aggregate function FILTER clauses for arrow tuple table slots and
* compressed batches, respectively.
*/
state->get_next_slot = compressed_batch_get_next_slot;
state->init_vector_quals = compressed_batch_init_vector_quals;
if (is_columnar_scan(&childscan->scan.plan))
{
state->get_next_slot = arrow_get_next_slot;
state->init_vector_quals = arrow_init_vector_quals;
}
else
{
Assert(strcmp(childscan->methods->CustomName, "DecompressChunk") == 0);
state->get_next_slot = compressed_batch_get_next_slot;
state->init_vector_quals = compressed_batch_init_vector_quals;
}

return (Node *) state;
}
27 changes: 26 additions & 1 deletion tsl/src/nodes/vector_agg/plan.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include "exec.h"
#include "import/list.h"
#include "nodes/columnar_scan/columnar_scan.h"
#include "nodes/decompress_chunk/vector_quals.h"
#include "nodes/vector_agg.h"
#include "utils.h"
Expand Down Expand Up @@ -177,6 +178,27 @@ vector_agg_plan_create(Plan *childplan, Agg *agg, List *resolved_targetlist,
lfirst(list_nth_cell(vector_agg->custom_private, VASI_GroupingType)) =
makeInteger(grouping_type);

#if PG15_GE
if (is_columnar_scan(childplan))
{
CustomScan *custom = castNode(CustomScan, childplan);

/*
* ColumnarScan should not project when doing vectorized
* aggregation. If it projects, it will turn the arrow slot into a set
* of virtual slots and the vector data will not be passed up to
* VectorAgg.
*
* To make ColumnarScan avoid projection, unset the custom scan node's
* projection flag. Normally, it is to late to change this flag as
* PostgreSQL already planned projection based on it. However,
* ColumnarScan rechecks this flag before it begins execution and
* ignores any projection if the flag is not set.
*/
custom->flags &= ~CUSTOMPATH_SUPPORT_PROJECTION;
}
#endif

return (Plan *) vector_agg;
}

Expand Down Expand Up @@ -471,8 +493,11 @@ vectoragg_plan_possible(Plan *childplan, const List *rtable, VectorQualInfo *vqi

CustomScan *customscan = castNode(CustomScan, childplan);
bool vectoragg_possible = false;
RangeTblEntry *rte = rt_fetch(customscan->scan.scanrelid, rtable);

if (strcmp(customscan->methods->CustomName, "DecompressChunk") == 0)
if (ts_is_hypercore_am(ts_get_rel_am(rte->relid)))
vectoragg_possible = vectoragg_plan_tam(childplan, rtable, vqi);
else if (strcmp(customscan->methods->CustomName, "DecompressChunk") == 0)
vectoragg_possible = vectoragg_plan_decompress_chunk(childplan, vqi);

return vectoragg_possible;
Expand Down
2 changes: 1 addition & 1 deletion tsl/src/nodes/vector_agg/plan.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,6 @@ typedef enum

extern void _vector_agg_init(void);
extern bool vectoragg_plan_decompress_chunk(Plan *childplan, VectorQualInfo *vqi);

extern bool vectoragg_plan_tam(Plan *childplan, const List *rtable, VectorQualInfo *vqi);
Plan *try_insert_vector_agg_node(Plan *plan, List *rtable);
bool has_vector_agg_node(Plan *plan, bool *has_normal_agg);
52 changes: 52 additions & 0 deletions tsl/src/nodes/vector_agg/plan_tam.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* This file and its contents are licensed under the Timescale License.
* Please see the included NOTICE for copyright information and
* LICENSE-TIMESCALE for a copy of the license.
*/
#include <postgres.h>
#include <access/attnum.h>
#include <nodes/parsenodes.h>
#include <parser/parsetree.h>

#include "hypercore/hypercore_handler.h"
#include "nodes/decompress_chunk/vector_quals.h"
#include "plan.h"

bool
vectoragg_plan_tam(Plan *childplan, const List *rtable, VectorQualInfo *vqi)
{
const CustomScan *customscan = castNode(CustomScan, childplan);
RangeTblEntry *rte = rt_fetch(customscan->scan.scanrelid, rtable);
Relation rel = table_open(rte->relid, AccessShareLock);
const HypercoreInfo *hinfo = RelationGetHypercoreInfo(rel);

*vqi = (VectorQualInfo){
.rti = customscan->scan.scanrelid,
.vector_attrs = (bool *) palloc0(sizeof(bool) * (hinfo->num_columns + 1)),
.segmentby_attrs = (bool *) palloc0(sizeof(bool) * (hinfo->num_columns + 1)),
/*
* Hypercore TAM and ColumnarScan do not yet support specific ordering
* (via pathkeys) so vector data will always be as read.
*/
.reverse = false,
};

for (int i = 0; i < hinfo->num_columns; i++)
{
AttrNumber attno = AttrOffsetGetAttrNumber(i);

if (!hinfo->columns[i].is_dropped)
{
/*
* Hypercore TAM only supports bulk decompression, so all columns
* are vectorizable, including segmentby columns.
*/
vqi->vector_attrs[attno] = true;
vqi->segmentby_attrs[attno] = hinfo->columns[i].is_segmentby;
}
}

table_close(rel, NoLock);

return true;
}
Loading

0 comments on commit af64c7b

Please sign in to comment.