Skip to content

Commit

Permalink
PG16: Fix join recursion
Browse files Browse the repository at this point in the history
Mark the EquivalenceMember for the compressed chunk as derived to
prevent an infinite recursion.
  • Loading branch information
svenklemm committed Nov 3, 2023
1 parent 978fee7 commit 94d16cf
Show file tree
Hide file tree
Showing 4 changed files with 3,096 additions and 3,008 deletions.
41 changes: 38 additions & 3 deletions tsl/src/nodes/decompress_chunk/decompress_chunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -1416,10 +1416,26 @@ create_var_for_compressed_equivalence_member(Var *var, const EMCreationContext *
return NULL;
}

#if PG16_GE
static EquivalenceMember *
find_em_for_relid(EquivalenceClass *ec, Index relid)
{
ListCell *lc;

foreach (lc, ec->ec_members)
{
EquivalenceMember *em = lfirst_node(EquivalenceMember, lc);
if (bms_is_member(relid, em->em_relids) && bms_num_members(em->em_relids) == 1)
return em;
}
return NULL;
}
#endif

/* This function is inspired by the Postgres add_child_rel_equivalences. */
static bool
add_segmentby_to_equivalence_class(EquivalenceClass *cur_ec, CompressionInfo *info,
EMCreationContext *context)
add_segmentby_to_equivalence_class(PlannerInfo *root, EquivalenceClass *cur_ec,
CompressionInfo *info, EMCreationContext *context)
{
Relids uncompressed_chunk_relids = info->chunk_rel->relids;
ListCell *lc;
Expand Down Expand Up @@ -1528,6 +1544,25 @@ add_segmentby_to_equivalence_class(EquivalenceClass *cur_ec, CompressionInfo *in
compressed_fdw_private->compressed_ec_em_pairs =
lappend(compressed_fdw_private->compressed_ec_em_pairs, list_make2(cur_ec, em));

#if PG16_GE
EquivalenceMember *ht_em = find_em_for_relid(cur_ec, info->ht_rel->relid);

if (ht_em && ht_em->em_jdomain)
{
int i = -1;
while ((i = bms_next_member(ht_em->em_jdomain->jd_relids, i)) >= 0)
{
RestrictInfo *d = make_simple_restrictinfo_compat(root, em->em_expr);
d->parent_ec = cur_ec;
d->left_em = find_em_for_relid(cur_ec, i);
if (!d->left_em)
continue;
d->right_em = em;
cur_ec->ec_derives = lappend(cur_ec->ec_derives, d);
}
}
#endif

return true;
}
}
Expand Down Expand Up @@ -1572,7 +1607,7 @@ compressed_rel_setup_equivalence_classes(PlannerInfo *root, CompressionInfo *inf
if (bms_overlap(cur_ec->ec_relids, info->compressed_rel->relids))
continue;

bool em_added = add_segmentby_to_equivalence_class(cur_ec, info, &context);
bool em_added = add_segmentby_to_equivalence_class(root, cur_ec, info, &context);
/* Record this EC index for the compressed rel */
if (em_added)
info->compressed_rel->eclass_indexes =
Expand Down
50 changes: 23 additions & 27 deletions tsl/test/expected/merge_append_partially_compressed-16.out
Original file line number Diff line number Diff line change
Expand Up @@ -698,35 +698,31 @@ SELECT * FROM test1 ORDER BY time ASC NULLS FIRST, x3 DESC NULLS LAST, x4 ASC;

:PREFIX
SELECT x1, x2, max(time) FROM test1 GROUP BY x1, x2, time ORDER BY time limit 10;
QUERY PLAN
---------------------------------------------------------------------------------------------------------------------
QUERY PLAN
---------------------------------------------------------------------------------------------------------------
Limit (actual rows=5 loops=1)
-> Sort (actual rows=5 loops=1)
Sort Key: test1."time"
Sort Method: quicksort
-> Finalize HashAggregate (actual rows=5 loops=1)
Group Key: test1.x1, test1.x2, test1."time"
Batches: 1
-> Custom Scan (ChunkAppend) on test1 (actual rows=5 loops=1)
Order: test1."time", test1.x1, test1.x2
-> Merge Append (actual rows=5 loops=1)
-> Finalize GroupAggregate (actual rows=5 loops=1)
Group Key: test1.x1, test1.x2, test1."time"
-> Custom Scan (ChunkAppend) on test1 (actual rows=5 loops=1)
Order: test1."time", test1.x1, test1.x2
-> Merge Append (actual rows=5 loops=1)
Sort Key: _hyper_3_7_chunk."time", _hyper_3_7_chunk.x1, _hyper_3_7_chunk.x2
-> Sort (actual rows=4 loops=1)
Sort Key: _hyper_3_7_chunk."time", _hyper_3_7_chunk.x1, _hyper_3_7_chunk.x2
Sort Method: quicksort
-> Partial HashAggregate (actual rows=4 loops=1)
Group Key: _hyper_3_7_chunk.x1, _hyper_3_7_chunk.x2, _hyper_3_7_chunk."time"
Batches: 1
-> Custom Scan (DecompressChunk) on _hyper_3_7_chunk (actual rows=4 loops=1)
-> Seq Scan on compress_hyper_4_8_chunk (actual rows=3 loops=1)
-> Sort (actual rows=1 loops=1)
Sort Key: _hyper_3_7_chunk."time", _hyper_3_7_chunk.x1, _hyper_3_7_chunk.x2
-> Sort (actual rows=4 loops=1)
Sort Key: _hyper_3_7_chunk."time", _hyper_3_7_chunk.x1, _hyper_3_7_chunk.x2
Sort Method: quicksort
-> Partial HashAggregate (actual rows=4 loops=1)
Group Key: _hyper_3_7_chunk.x1, _hyper_3_7_chunk.x2, _hyper_3_7_chunk."time"
Batches: 1
-> Custom Scan (DecompressChunk) on _hyper_3_7_chunk (actual rows=4 loops=1)
-> Seq Scan on compress_hyper_4_8_chunk (actual rows=3 loops=1)
-> Sort (actual rows=1 loops=1)
Sort Key: _hyper_3_7_chunk."time", _hyper_3_7_chunk.x1, _hyper_3_7_chunk.x2
Sort Method: quicksort
-> Partial HashAggregate (actual rows=1 loops=1)
Group Key: _hyper_3_7_chunk.x1, _hyper_3_7_chunk.x2, _hyper_3_7_chunk."time"
Batches: 1
-> Seq Scan on _hyper_3_7_chunk (actual rows=1 loops=1)
(26 rows)
Sort Method: quicksort
-> Partial HashAggregate (actual rows=1 loops=1)
Group Key: _hyper_3_7_chunk.x1, _hyper_3_7_chunk.x2, _hyper_3_7_chunk."time"
Batches: 1
-> Seq Scan on _hyper_3_7_chunk (actual rows=1 loops=1)
(22 rows)

:PREFIX
SELECT * FROM test1 ORDER BY x1, x2, x5, x4, time LIMIT 10;
Expand Down
Loading

0 comments on commit 94d16cf

Please sign in to comment.