Skip to content

Commit

Permalink
Fix variable resolution in vectorized aggregation planning (#7415)
Browse files Browse the repository at this point in the history
We didn't properly resolve INDEX_VARs in the output targetlist of
DecompressChunk nodes, which are present when it uses a custom scan
targetlist. Fix this by always working with the targetlist where these
variables are resolved to uncompressed chunk variables, like we do
during execution.
  • Loading branch information
akuzm authored Nov 25, 2024
1 parent 24ecdee commit 14c2c1d
Show file tree
Hide file tree
Showing 7 changed files with 220 additions and 68 deletions.
2 changes: 2 additions & 0 deletions .unreleased/resolve-vars
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Fixes: #7410 "aggregated compressed column not found" error on aggregation query.
Thanks: @uasiddiqi for reporting the "aggregated compressed column not found" error.
17 changes: 10 additions & 7 deletions tsl/src/nodes/vector_agg/exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,19 @@ get_input_offset(DecompressChunkState *decompress_state, Var *var)
{
DecompressContext *dcontext = &decompress_state->decompress_context;

/*
* All variable references in the vectorized aggregation node were
* translated to uncompressed chunk variables when it was created.
*/
CustomScan *cscan = castNode(CustomScan, decompress_state->csstate.ss.ps.plan);
Ensure((Index) var->varno == (Index) cscan->scan.scanrelid,
"got vector varno %d expected %d",
var->varno,
cscan->scan.scanrelid);

CompressionColumnDescription *value_column_description = NULL;
for (int i = 0; i < dcontext->num_data_columns; i++)
{
/*
* See the column lookup in compute_plain_qual() for the discussion of
* which attribute numbers occur where. At the moment here it is
* uncompressed_scan_attno, but it might be an oversight of not rewriting
* the references into INDEX_VAR (or OUTER_VAR...?) when we create the
* VectorAgg node.
*/
CompressionColumnDescription *current_column = &dcontext->compressed_chunk_columns[i];
if (current_column->uncompressed_chunk_attno == var->varattno)
{
Expand Down
156 changes: 108 additions & 48 deletions tsl/src/nodes/vector_agg/plan.c
Original file line number Diff line number Diff line change
Expand Up @@ -74,29 +74,44 @@ resolve_outer_special_vars_mutator(Node *node, void *context)
return expression_tree_mutator(node, resolve_outer_special_vars_mutator, context);
}

Var *aggregated_var = castNode(Var, node);
Ensure(aggregated_var->varno == OUTER_VAR,
"encountered unexpected varno %d as an aggregate argument",
aggregated_var->varno);

Var *var = castNode(Var, node);
CustomScan *custom = castNode(CustomScan, context);
TargetEntry *decompress_chunk_tentry =
castNode(TargetEntry, list_nth(custom->scan.plan.targetlist, aggregated_var->varattno - 1));
Var *decompressed_var = castNode(Var, decompress_chunk_tentry->expr);
if (decompressed_var->varno == INDEX_VAR)
if ((Index) var->varno == (Index) custom->scan.scanrelid)
{
/*
* This is already the uncompressed chunk var. We can see it referenced
* by expressions in the output targetlist of DecompressChunk node.
*/
return (Node *) copyObject(var);
}

if (var->varno == OUTER_VAR)
{
/*
* Reference into the output targetlist of the DecompressChunk node.
*/
TargetEntry *decompress_chunk_tentry =
castNode(TargetEntry, list_nth(custom->scan.plan.targetlist, var->varattno - 1));

return resolve_outer_special_vars_mutator((Node *) decompress_chunk_tentry->expr, context);
}

if (var->varno == INDEX_VAR)
{
/*
* This is a reference into the custom scan targetlist, we have to resolve
* it as well.
*/
decompressed_var =
castNode(Var,
castNode(TargetEntry,
list_nth(custom->custom_scan_tlist, decompressed_var->varattno - 1))
->expr);
}
Assert(decompressed_var->varno > 0);
return (Node *) copyObject(decompressed_var);
var = castNode(Var,
castNode(TargetEntry, list_nth(custom->custom_scan_tlist, var->varattno - 1))
->expr);
Assert(var->varno > 0);

return (Node *) copyObject(var);
}

Ensure(false, "encountered unexpected varno %d as an aggregate argument", var->varno);
return node;
}

/*
Expand All @@ -115,20 +130,20 @@ resolve_outer_special_vars(List *agg_tlist, CustomScan *custom)
* node.
*/
static Plan *
vector_agg_plan_create(Agg *agg, CustomScan *decompress_chunk)
vector_agg_plan_create(Agg *agg, CustomScan *decompress_chunk, List *resolved_targetlist)
{
CustomScan *vector_agg = (CustomScan *) makeNode(CustomScan);
vector_agg->custom_plans = list_make1(decompress_chunk);
vector_agg->methods = &scan_methods;

vector_agg->custom_scan_tlist = resolved_targetlist;

/*
* Note that this is being called from the post-planning hook, and therefore
* after set_plan_refs(). The meaning of output targetlists is different from
* the previous planning stages, and they contain special varnos referencing
* the scan targetlists.
*/
vector_agg->custom_scan_tlist =
resolve_outer_special_vars(agg->plan.targetlist, decompress_chunk);
vector_agg->scan.plan.targetlist =
build_trivial_custom_output_targetlist(vector_agg->custom_scan_tlist);

Expand Down Expand Up @@ -179,44 +194,64 @@ is_vector_var(CustomScan *custom, Expr *expr, bool *out_is_segmentby)
return false;
}

Var *aggregated_var = castNode(Var, expr);
Var *decompressed_var = castNode(Var, expr);

/*
* Check if this particular column is a segmentby or has bulk decompression
* enabled. This hook is called after set_plan_refs, and at this stage the
* output targetlist of the aggregation node uses OUTER_VAR references into
* the child scan targetlist, so first we have to translate this.
* This must be called after resolve_outer_special_vars(), so we should only
* see the uncompressed chunk variables here.
*/
Assert(aggregated_var->varno == OUTER_VAR);
TargetEntry *decompressed_target_entry =
list_nth(custom->scan.plan.targetlist, AttrNumberGetAttrOffset(aggregated_var->varattno));
Ensure((Index) decompressed_var->varno == (Index) custom->scan.scanrelid,
"expected scan varno %d got %d",
custom->scan.scanrelid,
decompressed_var->varno);

if (!IsA(decompressed_target_entry->expr, Var))
if (decompressed_var->varattno <= 0)
{
/*
* Can only aggregate the plain Vars. Not sure if this is redundant with
* the similar check above.
*/
/* Can't work with special attributes like tableoid. */
if (out_is_segmentby)
{
*out_is_segmentby = false;
}
return false;
}
Var *decompressed_var = castNode(Var, decompressed_target_entry->expr);

/*
* Now, we have to translate the decompressed varno into the compressed
* column index, to check if the column supports bulk decompression.
*/
List *decompression_map = list_nth(custom->custom_private, DCP_DecompressionMap);
List *is_segmentby_column = list_nth(custom->custom_private, DCP_IsSegmentbyColumn);
List *bulk_decompression_column = list_nth(custom->custom_private, DCP_BulkDecompressionColumn);
int compressed_column_index = 0;
for (; compressed_column_index < list_length(decompression_map); compressed_column_index++)
{
if (list_nth_int(decompression_map, compressed_column_index) == decompressed_var->varattno)
const int custom_scan_attno = list_nth_int(decompression_map, compressed_column_index);
if (custom_scan_attno <= 0)
{
continue;
}

int uncompressed_chunk_attno = 0;
if (custom->custom_scan_tlist == NIL)
{
uncompressed_chunk_attno = custom_scan_attno;
}
else
{
Var *var = castNode(Var,
castNode(TargetEntry,
list_nth(custom->custom_scan_tlist,
AttrNumberGetAttrOffset(custom_scan_attno)))
->expr);
uncompressed_chunk_attno = var->varattno;
}

if (uncompressed_chunk_attno == decompressed_var->varattno)
{
break;
}
}
Ensure(compressed_column_index < list_length(decompression_map), "compressed column not found");

List *bulk_decompression_column = list_nth(custom->custom_private, DCP_BulkDecompressionColumn);
Assert(list_length(decompression_map) == list_length(bulk_decompression_column));
const bool bulk_decompression_enabled_for_column =
list_nth_int(bulk_decompression_column, compressed_column_index);
Expand All @@ -233,6 +268,8 @@ is_vector_var(CustomScan *custom, Expr *expr, bool *out_is_segmentby)
/*
* Check if this column is a segmentby.
*/
List *is_segmentby_column = list_nth(custom->custom_private, DCP_IsSegmentbyColumn);
Assert(list_length(is_segmentby_column) == list_length(decompression_map));
const bool is_segmentby = list_nth_int(is_segmentby_column, compressed_column_index);
if (out_is_segmentby)
{
Expand Down Expand Up @@ -317,7 +354,7 @@ can_vectorize_aggref(Aggref *aggref, CustomScan *custom)
* Currently supports either no grouping or grouping by segmentby columns.
*/
static bool
can_vectorize_grouping(Agg *agg, CustomScan *custom)
can_vectorize_grouping(Agg *agg, CustomScan *custom, List *resolved_targetlist)
{
if (agg->numCols == 0)
{
Expand All @@ -327,7 +364,7 @@ can_vectorize_grouping(Agg *agg, CustomScan *custom)
for (int i = 0; i < agg->numCols; i++)
{
int offset = AttrNumberGetAttrOffset(agg->grpColIdx[i]);
TargetEntry *entry = list_nth(agg->plan.targetlist, offset);
TargetEntry *entry = list_nth_node(TargetEntry, resolved_targetlist, offset);

bool is_segmentby = false;
if (!is_vector_var(custom, entry->expr, &is_segmentby))
Expand Down Expand Up @@ -519,25 +556,48 @@ try_insert_vector_agg_node(Plan *plan)
return plan;
}

if (!can_vectorize_grouping(agg, custom))
/*
* To make it easier to examine the variables participating in the aggregation,
* the subsequent checks are performed on the aggregated targetlist with
* all variables resolved to uncompressed chunk variables.
*/
List *resolved_targetlist = resolve_outer_special_vars(agg->plan.targetlist, custom);

if (!can_vectorize_grouping(agg, custom, resolved_targetlist))
{
/* No GROUP BY support for now. */
return plan;
}

/* Now check the aggregate functions themselves. */
/* Now check the output targetlist. */
ListCell *lc;
foreach (lc, agg->plan.targetlist)
foreach (lc, resolved_targetlist)
{
TargetEntry *target_entry = castNode(TargetEntry, lfirst(lc));
if (!IsA(target_entry->expr, Aggref))
if (IsA(target_entry->expr, Aggref))
{
continue;
Aggref *aggref = castNode(Aggref, target_entry->expr);
if (!can_vectorize_aggref(aggref, custom))
{
/* Aggregate function not vectorizable. */
return plan;
}
}

Aggref *aggref = castNode(Aggref, target_entry->expr);
if (!can_vectorize_aggref(aggref, custom))
else if (IsA(target_entry->expr, Var))
{
if (!is_vector_var(custom, target_entry->expr, NULL))
{
/* Variable not vectorizable. */
return plan;
}
}
else
{
/*
* Sometimes the plan can require this node to perform a projection,
* e.g. we can see a nested loop param in its output targetlist. We
* can't handle this case currently.
*/
return plan;
}
}
Expand All @@ -546,5 +606,5 @@ try_insert_vector_agg_node(Plan *plan)
* Finally, all requirements are satisfied and we can vectorize this partial
* aggregation node.
*/
return vector_agg_plan_create(agg, custom);
return vector_agg_plan_create(agg, custom, resolved_targetlist);
}
87 changes: 75 additions & 12 deletions tsl/test/expected/vector_agg_param.out
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,42 @@ select count(compress_chunk(x)) from show_chunks('pvagg') x;
(1 row)

analyze pvagg;
explain (costs off)
-- The reference for this test is generated using the standard Postgres
-- aggregation. When you change this test, recheck the results against the
-- Postgres aggregation by uncommenting the below GUC.
-- set timescaledb.enable_vectorized_aggregation to off;
explain (verbose, costs off)
select * from unnest(array[0, 1, 2]::int[]) x, lateral (select sum(a) from pvagg where s = x) xx;
QUERY PLAN
---------------------------------------------------------------------------
QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Nested Loop
-> Function Scan on unnest x
Output: x.x, (sum(pvagg.a))
-> Function Scan on pg_catalog.unnest x
Output: x.x
Function Call: unnest('{0,1,2}'::integer[])
-> Finalize Aggregate
-> Custom Scan (ChunkAppend) on pvagg
Output: sum(pvagg.a)
-> Custom Scan (ChunkAppend) on public.pvagg
Output: (PARTIAL sum(pvagg.a))
Startup Exclusion: false
Runtime Exclusion: true
-> Custom Scan (VectorAgg)
-> Custom Scan (DecompressChunk) on _hyper_1_1_chunk
-> Seq Scan on compress_hyper_2_3_chunk
Filter: (s = x.x)
Output: (PARTIAL sum(_hyper_1_1_chunk.a))
Grouping Policy: all compressed batches
-> Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_1_1_chunk
Output: _hyper_1_1_chunk.a
-> Seq Scan on _timescaledb_internal.compress_hyper_2_3_chunk
Output: compress_hyper_2_3_chunk._ts_meta_count, compress_hyper_2_3_chunk.s, compress_hyper_2_3_chunk._ts_meta_min_1, compress_hyper_2_3_chunk._ts_meta_max_1, compress_hyper_2_3_chunk.a
Filter: (compress_hyper_2_3_chunk.s = x.x)
-> Custom Scan (VectorAgg)
-> Custom Scan (DecompressChunk) on _hyper_1_2_chunk
-> Seq Scan on compress_hyper_2_4_chunk
Filter: (s = x.x)
(12 rows)
Output: (PARTIAL sum(_hyper_1_2_chunk.a))
Grouping Policy: all compressed batches
-> Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_1_2_chunk
Output: _hyper_1_2_chunk.a
-> Seq Scan on _timescaledb_internal.compress_hyper_2_4_chunk
Output: compress_hyper_2_4_chunk._ts_meta_count, compress_hyper_2_4_chunk.s, compress_hyper_2_4_chunk._ts_meta_min_1, compress_hyper_2_4_chunk._ts_meta_max_1, compress_hyper_2_4_chunk.a
Filter: (compress_hyper_2_4_chunk.s = x.x)
(27 rows)

select * from unnest(array[0, 1, 2]::int[]) x, lateral (select sum(a) from pvagg where s = x) xx;
x | sum
Expand All @@ -47,4 +66,48 @@ select * from unnest(array[0, 1, 2]::int[]) x, lateral (select sum(a) from pvagg
2 | 1498500
(3 rows)

explain (verbose, costs off)
select * from unnest(array[0, 1, 2]::int[]) x, lateral (select sum(a + x) from pvagg) xx;
QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Nested Loop
Output: x.x, (sum((_hyper_1_1_chunk.a + x.x)))
-> Function Scan on pg_catalog.unnest x
Output: x.x
Function Call: unnest('{0,1,2}'::integer[])
-> Finalize Aggregate
Output: sum((_hyper_1_1_chunk.a + x.x))
-> Append
-> Partial Aggregate
Output: PARTIAL sum((_hyper_1_1_chunk.a + x.x))
-> Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_1_1_chunk
Output: _hyper_1_1_chunk.a
-> Seq Scan on _timescaledb_internal.compress_hyper_2_3_chunk
Output: compress_hyper_2_3_chunk._ts_meta_count, compress_hyper_2_3_chunk.s, compress_hyper_2_3_chunk._ts_meta_min_1, compress_hyper_2_3_chunk._ts_meta_max_1, compress_hyper_2_3_chunk.a
-> Partial Aggregate
Output: PARTIAL sum((_hyper_1_2_chunk.a + x.x))
-> Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_1_2_chunk
Output: _hyper_1_2_chunk.a
-> Seq Scan on _timescaledb_internal.compress_hyper_2_4_chunk
Output: compress_hyper_2_4_chunk._ts_meta_count, compress_hyper_2_4_chunk.s, compress_hyper_2_4_chunk._ts_meta_min_1, compress_hyper_2_4_chunk._ts_meta_max_1, compress_hyper_2_4_chunk.a
(20 rows)

select * from unnest(array[0, 1, 2]::int[]) x, lateral (select sum(a + x) from pvagg) xx;
x | sum
---+---------
0 | 1998000
1 | 1999998
2 | 2001996
(3 rows)

-- The plan for this query differs after PG16, x is not used as grouping key but
-- just added into the output targetlist of partial aggregation nodes.
select * from unnest(array[0, 1, 2]::int[]) x, lateral (select sum(a) from pvagg group by x) xx;
x | sum
---+---------
0 | 1998000
1 | 1998000
2 | 1998000
(3 rows)

drop table pvagg;
Loading

0 comments on commit 14c2c1d

Please sign in to comment.