Skip to content

Commit

Permalink
Determine data type of vector agg at plan stage
Browse files Browse the repository at this point in the history
This patch adds the support for the dynamic detection of the data type
for a vectorized aggregate. In addition, it removes the hard-coded
integer data type and initializes the decompression_map properly. This
also fixes an invalid memory access.
  • Loading branch information
jnidzwetzki committed Nov 7, 2023
1 parent 61f2606 commit 27d773e
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 23 deletions.
13 changes: 13 additions & 0 deletions tsl/src/nodes/decompress_chunk/decompress_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,19 @@ typedef struct DecompressChunkPath
*/
bool perform_vectorized_aggregation;

/*
* Columns that are used for vectorized aggregates. The list contains for each attribute -1 if
* this is not an vectorized aggregate column or the Oid of the data type of the attribute.
*
*
* When creating vectorized aggregates, the decompression logic is not able to determine the
* type of the compressed column based on the output column since we emit partial aggregates
* for this attribute and the raw attribute is not found in the targetlist. So, build a map
* with the used data types here, which is used later to create the compression info
* properly.
*/
List *aggregated_column_type;

List *compressed_pathkeys;
bool needs_sequence_num;
bool reverse;
Expand Down
51 changes: 29 additions & 22 deletions tsl/src/nodes/decompress_chunk/exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,13 @@ decompress_chunk_state_create(CustomScan *cscan)
chunk_state->csstate.methods = &chunk_state->exec_methods;

Assert(IsA(cscan->custom_private, List));
Assert(list_length(cscan->custom_private) == 5);
Assert(list_length(cscan->custom_private) == 6);
List *settings = linitial(cscan->custom_private);
chunk_state->decompression_map = lsecond(cscan->custom_private);
chunk_state->is_segmentby_column = lthird(cscan->custom_private);
chunk_state->bulk_decompression_column = lfourth(cscan->custom_private);
chunk_state->sortinfo = lfifth(cscan->custom_private);
chunk_state->aggregated_column_type = lfifth(cscan->custom_private);
chunk_state->sortinfo = lsixth(cscan->custom_private);
chunk_state->custom_scan_tlist = cscan->custom_scan_tlist;

Assert(IsA(settings, IntList));
Expand All @@ -162,6 +163,16 @@ decompress_chunk_state_create(CustomScan *cscan)
Assert(IsA(cscan->custom_exprs, List));
Assert(list_length(cscan->custom_exprs) == 1);
chunk_state->vectorized_quals_original = linitial(cscan->custom_exprs);
Assert(list_length(chunk_state->decompression_map) ==
list_length(chunk_state->is_segmentby_column));

#ifdef USE_ASSERT_CHECKING
if (chunk_state->perform_vectorized_aggregation)
{
Assert(list_length(chunk_state->decompression_map) ==
list_length(chunk_state->aggregated_column_type));
}
#endif

return (Node *) chunk_state;
}
Expand Down Expand Up @@ -307,8 +318,7 @@ decompress_chunk_begin(CustomScanState *node, EState *estate, int eflags)

ListCell *dest_cell;
ListCell *is_segmentby_cell;
Assert(list_length(chunk_state->decompression_map) ==
list_length(chunk_state->is_segmentby_column));

forboth (dest_cell,
chunk_state->decompression_map,
is_segmentby_cell,
Expand Down Expand Up @@ -364,12 +374,22 @@ decompress_chunk_begin(CustomScanState *node, EState *estate, int eflags)

if (column.output_attno > 0)
{
/* normal column that is also present in decompressed chunk */
Form_pg_attribute attribute =
TupleDescAttr(desc, AttrNumberGetAttrOffset(column.output_attno));
if (chunk_state->perform_vectorized_aggregation &&
lfirst_int(list_nth_cell(chunk_state->aggregated_column_type, compressed_index)) !=
-1)
{
column.typid = lfirst_int(
list_nth_cell(chunk_state->aggregated_column_type, compressed_index));
}
else
{
/* normal column that is also present in decompressed chunk */
Form_pg_attribute attribute =
TupleDescAttr(desc, AttrNumberGetAttrOffset(column.output_attno));

column.typid = attribute->atttypid;
column.value_bytes = get_typlen(column.typid);
column.typid = attribute->atttypid;
column.value_bytes = get_typlen(column.typid);
}

if (list_nth_int(chunk_state->is_segmentby_column, compressed_index))
column.type = SEGMENTBY_COLUMN;
Expand Down Expand Up @@ -630,21 +650,8 @@ perform_vectorized_sum_int4(DecompressChunkState *chunk_state, Aggref *aggref)
{
Assert(chunk_state->enable_bulk_decompression);
Assert(column_description->bulk_decompression_supported);

/* Due to the needed manipulation of the target list to emit partials (see
* decompress_chunk_plan_create), PostgreSQL is not able to determine the type of the
* compressed column automatically. So, correct the column type to the correct value. */
Assert(list_length(aggref->args) == 1);

#ifdef USE_ASSERT_CHECKING
TargetEntry *tlentry = (TargetEntry *) linitial(aggref->args);
Assert(IsA(tlentry->expr, Var));
Var *input_var = castNode(Var, tlentry->expr);
Assert(input_var->vartype == INT4OID);
#endif

column_description->typid = INT4OID;

while (true)
{
TupleTableSlot *compressed_slot =
Expand Down
1 change: 1 addition & 0 deletions tsl/src/nodes/decompress_chunk/exec.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ typedef struct DecompressChunkState
List *decompression_map;
List *is_segmentby_column;
List *bulk_decompression_column;
List *aggregated_column_type;
List *custom_scan_tlist;
int num_total_columns;
int num_compressed_columns;
Expand Down
15 changes: 14 additions & 1 deletion tsl/src/nodes/decompress_chunk/planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,18 @@ build_decompression_map(PlannerInfo *root, DecompressChunkPath *path, List *scan
.fd = *compression_info, .bulk_decompression_possible = bulk_decompression_possible
};
}

if (path->perform_vectorized_aggregation)
{
Assert(list_length(path->custom_path.path.parent->reltarget->exprs) == 1);
Var *var = linitial(path->custom_path.path.parent->reltarget->exprs);
Assert((Index) var->varno == path->custom_path.path.parent->relid);
if (var->varattno == destination_attno_in_uncompressed_chunk)
path->aggregated_column_type =
lappend_int(path->aggregated_column_type, var->vartype);
else
path->aggregated_column_type = lappend_int(path->aggregated_column_type, -1);
}
}

/*
Expand Down Expand Up @@ -891,10 +903,11 @@ decompress_chunk_plan_create(PlannerInfo *root, RelOptInfo *rel, CustomPath *pat
*/
decompress_plan->custom_exprs = list_make1(vectorized_quals);

decompress_plan->custom_private = list_make5(settings,
decompress_plan->custom_private = list_make6(settings,
dcpath->decompression_map,
dcpath->is_segmentby_column,
dcpath->bulk_decompression_column,
dcpath->aggregated_column_type,
sort_options);

return &decompress_plan->scan.plan;
Expand Down

0 comments on commit 27d773e

Please sign in to comment.