Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stable expressions as constant in vectorized filters #6203

Merged
merged 1 commit into from
Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions tsl/src/nodes/decompress_chunk/compressed_batch.c
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ make_single_value_arrow(Oid pgtype, Datum datum, bool isnull)
static void
apply_vector_quals(DecompressChunkState *chunk_state, DecompressBatchState *batch_state)
{
if (!chunk_state->vectorized_quals)
if (!chunk_state->vectorized_quals_constified)
{
return;
}
Expand All @@ -98,7 +98,7 @@ apply_vector_quals(DecompressChunkState *chunk_state, DecompressBatchState *batc
* Compute the quals.
*/
ListCell *lc;
foreach (lc, chunk_state->vectorized_quals)
foreach (lc, chunk_state->vectorized_quals_constified)
{
/* For now we only support "Var ? Const" predicates. */
OpExpr *oe = castNode(OpExpr, lfirst(lc));
Expand Down
57 changes: 54 additions & 3 deletions tsl/src/nodes/decompress_chunk/exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <nodes/bitmapset.h>
#include <nodes/makefuncs.h>
#include <nodes/nodeFuncs.h>
#include <optimizer/optimizer.h>
#include <parser/parsetree.h>
#include <rewrite/rewriteManip.h>
#include <utils/datum.h>
Expand Down Expand Up @@ -160,7 +161,7 @@

Assert(IsA(cscan->custom_exprs, List));
Assert(list_length(cscan->custom_exprs) == 1);
chunk_state->vectorized_quals = linitial(cscan->custom_exprs);
chunk_state->vectorized_quals_original = linitial(cscan->custom_exprs);

return (Node *) chunk_state;
}
Expand Down Expand Up @@ -475,6 +476,51 @@
{
elog(ERROR, "debug: batch sorted merge is required but not used");
}

/* Constify stable expressions in vectorized predicates. */
chunk_state->have_constant_false_vectorized_qual = false;
PlannerGlobal glob = {
.boundParams = node->ss.ps.state->es_param_list_info,
};
PlannerInfo root = {
.glob = &glob,
};
ListCell *lc;
foreach (lc, chunk_state->vectorized_quals_original)
{
Node *constified = estimate_expression_value(&root, (Node *) lfirst(lc));

/*
* Note that some expressions are evaluated to a null Const, like a
* strict comparison with stable expression that evaluates to null. If
* we have such filter, no rows can pass, so we set a special flag to
* return early.
*/
if (IsA(constified, Const))
{
Const *c = castNode(Const, constified);
if (c->constisnull || !DatumGetBool(c))
{
chunk_state->have_constant_false_vectorized_qual = true;
break;
}
else
{
/*
* This is a constant true qual, every row passes and we can
* just ignore it. No idea how it can happen though.
*/
Assert(false);
continue;

Check warning on line 514 in tsl/src/nodes/decompress_chunk/exec.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/nodes/decompress_chunk/exec.c#L513-L514

Added lines #L513 - L514 were not covered by tests
}
}

OpExpr *opexpr = castNode(OpExpr, constified);
Ensure(IsA(lsecond(opexpr->args), Const),
"failed to evaluate runtime constant in vectorized filter");
chunk_state->vectorized_quals_constified =
lappend(chunk_state->vectorized_quals_constified, constified);
}
}

/*
Expand Down Expand Up @@ -738,6 +784,11 @@
return perform_vectorized_aggregation(chunk_state);
}

if (chunk_state->have_constant_false_vectorized_qual)
{
return NULL;
}

queue->pop(chunk_state);
while (queue->needs_next_batch(chunk_state))
{
Expand Down Expand Up @@ -806,13 +857,13 @@
{
DecompressChunkState *chunk_state = (DecompressChunkState *) node;

ts_show_scan_qual(chunk_state->vectorized_quals,
ts_show_scan_qual(chunk_state->vectorized_quals_original,
"Vectorized Filter",
&node->ss.ps,
ancestors,
es);

if (!node->ss.ps.plan->qual && chunk_state->vectorized_quals)
if (!node->ss.ps.plan->qual && chunk_state->vectorized_quals_original)
{
/*
* The normal explain won't show this if there are no normal quals but
Expand Down
9 changes: 7 additions & 2 deletions tsl/src/nodes/decompress_chunk/exec.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,14 @@ typedef struct DecompressChunkState
/*
* For some predicates, we have more efficient implementation that work on
* the entire compressed batch in one go. They go to this list, and the rest
* goes into the usual ss.ps.qual.
* goes into the usual ss.ps.qual. Note that we constify stable functions
* in these predicates at execution time, but have to keep the original
* version for EXPLAIN. We also need special handling for quals that
* evaluate to constant false, hence the flag.
*/
List *vectorized_quals;
List *vectorized_quals_original;
List *vectorized_quals_constified;
bool have_constant_false_vectorized_qual;

/*
* Make non-refcounted copies of the tupdesc for reuse across all batch states
Expand Down
102 changes: 88 additions & 14 deletions tsl/src/nodes/decompress_chunk/planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -377,27 +377,89 @@ find_attr_pos_in_tlist(List *targetlist, AttrNumber pos)
}

static bool
qual_is_vectorizable(DecompressChunkPath *path, Node *qual)
contains_volatile_functions_checker(Oid func_id, void *context)
{
return (func_volatile(func_id) == PROVOLATILE_VOLATILE);
}

static bool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function contains some logic. So, could you add a comment?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 I believe you are just looking for expressions that can be constified at runtime (as opposed to plantime) here. It would help to add a comment.

I was thrown off by the commit message that said we can now support ts> now() - '1 day'::interval. We already do plan time constification for that in some cases. I suggest also modifying the commit message to elaborate that this change now introduces support for ts < now() , ts> now() etc.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already do plan time constification for that in some cases.

That's a different thing, it's for runtime chunk exclusion that is done in this small scope only IIRC. The underlying DecompressChunk executor doesn't see the constified values, and I don't think there's a good way to pass these values down to the underlying nodes. So what I'm doing is also constifying the values there, so that we can perform vectorized filters with them.

I'll try to improve the comments and the message.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. To clarify: ts > now() - <> works with plantime chunk exclusion. But I wasn't aware that it doesn't get passed along via the filters to decompress.

is_not_runtime_constant_walker(Node *node, void *context)
{
if (node == NULL)
{
return false;
}

switch (nodeTag(node))
{
case T_Var:
case T_PlaceHolderVar:
case T_Param:
/*
* We might want to support these nodes to have vectorizable
* join clauses (T_Var), join clauses referencing a variable that is
* above outer join (T_PlaceHolderVar) or initplan parameters and
* prepared statement parameters (T_Param). We don't support them at
* the moment.
*/
return true;
default:
if (check_functions_in_node(node,
contains_volatile_functions_checker,
/* context = */ NULL))
{
return true;
}
return expression_tree_walker(node,
is_not_runtime_constant_walker,
/* context = */ NULL);
}
}

/*
* Check if the given node is a run-time constant, i.e. it doesn't contain
* volatile functions or variables or parameters. This means we can evaluate
* it at run time, allowing us to apply the vectorized comparison operators
* that have the form "Var op Const". This applies for example to filter
* expressions like `time > now() - interval '1 hour'`.
* Note that we do the same evaluation when doing run time chunk exclusion, but
* there is no good way to pass the evaluated clauses to the underlying nodes
* like this DecompressChunk node.
*/
static bool
is_not_runtime_constant(Node *node)
{
bool result = is_not_runtime_constant_walker(node, /* context = */ NULL);
return result;
}

/*
* Try to check if the current qual is vectorizable, and if needed make a
* commuted copy. If not, return NULL.
*/
static Node *
make_vectorized_qual(DecompressChunkPath *path, Node *qual)
{
/* Only simple "Var op Const" binary predicates for now. */
if (!IsA(qual, OpExpr))
{
return false;
return NULL;
}

OpExpr *o = castNode(OpExpr, qual);

if (list_length(o->args) != 2)
{
return false;
return NULL;
}

if (IsA(lsecond(o->args), Var) && IsA(linitial(o->args), Const))
if (IsA(lsecond(o->args), Var))
{
/* Try to commute the operator if the constant is on the right. */
Oid commutator_opno = get_commutator(o->opno);
if (OidIsValid(commutator_opno))
{
o = (OpExpr *) copyObject(o);
o->opno = commutator_opno;
/*
* opfuncid is a cache, we can set it to InvalidOid like the
Expand All @@ -408,9 +470,14 @@ qual_is_vectorizable(DecompressChunkPath *path, Node *qual)
}
}

if (!IsA(linitial(o->args), Var) || !IsA(lsecond(o->args), Const))
/*
* We can vectorize the operation where the left side is a Var and the right
* side is a constant or can be evaluated to a constant at run time (e.g.
* contains stable functions).
*/
if (!IsA(linitial(o->args), Var) || is_not_runtime_constant(lsecond(o->args)))
{
return false;
return NULL;
}

Var *var = castNode(Var, linitial(o->args));
Expand All @@ -424,32 +491,39 @@ qual_is_vectorizable(DecompressChunkPath *path, Node *qual)
.bulk_decompression_possible)
{
/* This column doesn't support bulk decompression. */
return false;
return NULL;
}

Oid opcode = get_opcode(o->opno);
if (get_vector_const_predicate(opcode))
{
return true;
return (Node *) o;
}

return false;
return NULL;
}

/*
* Find the scan qualifiers that can be vectorized and put them into a separate
* list.
*/
static void
find_vectorized_quals(DecompressChunkPath *path, List *qual, List **vectorized,
find_vectorized_quals(DecompressChunkPath *path, List *qual_list, List **vectorized,
List **nonvectorized)
{
ListCell *lc;
foreach (lc, qual)
foreach (lc, qual_list)
{
Node *node = lfirst(lc);
List **dest = qual_is_vectorizable(path, node) ? vectorized : nonvectorized;
*dest = lappend(*dest, node);
Node *source_qual = lfirst(lc);
Node *vectorized_qual = make_vectorized_qual(path, source_qual);
if (vectorized_qual)
{
*vectorized = lappend(*vectorized, vectorized_qual);
}
else
{
*nonvectorized = lappend(*nonvectorized, source_qual);
}
}
}

Expand Down
16 changes: 6 additions & 10 deletions tsl/test/expected/agg_partials_pushdown.out
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,7 @@ SELECT count(*), sum(v0), sum(v1), sum(v2), sum(v3) FROM testtable WHERE time >=
Output: PARTIAL count(*), PARTIAL sum(_hyper_1_1_chunk.v0), PARTIAL sum(_hyper_1_1_chunk.v1), PARTIAL sum(_hyper_1_1_chunk.v2), PARTIAL sum(_hyper_1_1_chunk.v3)
-> Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_1_1_chunk (actual rows=25 loops=1)
Output: _hyper_1_1_chunk.v0, _hyper_1_1_chunk.v1, _hyper_1_1_chunk.v2, _hyper_1_1_chunk.v3
Filter: (_hyper_1_1_chunk."time" >= ('2000-01-01 00:00:00+0'::cstring)::timestamp with time zone)
Vectorized Filter: (_hyper_1_1_chunk."time" <= 'Mon Jan 31 16:00:00 2000 PST'::timestamp with time zone)
Vectorized Filter: ((_hyper_1_1_chunk."time" <= 'Mon Jan 31 16:00:00 2000 PST'::timestamp with time zone) AND (_hyper_1_1_chunk."time" >= ('2000-01-01 00:00:00+0'::cstring)::timestamp with time zone))
Bulk Decompression: true
-> Seq Scan on _timescaledb_internal.compress_hyper_2_3_chunk (actual rows=5 loops=1)
Output: compress_hyper_2_3_chunk.filter_1, compress_hyper_2_3_chunk.filler_2, compress_hyper_2_3_chunk.filler_3, compress_hyper_2_3_chunk."time", compress_hyper_2_3_chunk.device_id, compress_hyper_2_3_chunk.v0, compress_hyper_2_3_chunk.v1, compress_hyper_2_3_chunk.v2, compress_hyper_2_3_chunk.v3, compress_hyper_2_3_chunk._ts_meta_count, compress_hyper_2_3_chunk._ts_meta_sequence_num, compress_hyper_2_3_chunk._ts_meta_min_1, compress_hyper_2_3_chunk._ts_meta_max_1
Expand All @@ -142,8 +141,7 @@ SELECT count(*), sum(v0), sum(v1), sum(v2), sum(v3) FROM testtable WHERE time >=
Output: PARTIAL count(*), PARTIAL sum(_hyper_1_2_chunk.v0), PARTIAL sum(_hyper_1_2_chunk.v1), PARTIAL sum(_hyper_1_2_chunk.v2), PARTIAL sum(_hyper_1_2_chunk.v3)
-> Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_1_2_chunk (actual rows=25 loops=1)
Output: _hyper_1_2_chunk.v0, _hyper_1_2_chunk.v1, _hyper_1_2_chunk.v2, _hyper_1_2_chunk.v3
Filter: (_hyper_1_2_chunk."time" >= ('2000-01-01 00:00:00+0'::cstring)::timestamp with time zone)
Vectorized Filter: (_hyper_1_2_chunk."time" <= 'Mon Jan 31 16:00:00 2000 PST'::timestamp with time zone)
Vectorized Filter: ((_hyper_1_2_chunk."time" <= 'Mon Jan 31 16:00:00 2000 PST'::timestamp with time zone) AND (_hyper_1_2_chunk."time" >= ('2000-01-01 00:00:00+0'::cstring)::timestamp with time zone))
Bulk Decompression: true
-> Seq Scan on _timescaledb_internal.compress_hyper_2_4_chunk (actual rows=5 loops=1)
Output: compress_hyper_2_4_chunk.filter_1, compress_hyper_2_4_chunk.filler_2, compress_hyper_2_4_chunk.filler_3, compress_hyper_2_4_chunk."time", compress_hyper_2_4_chunk.device_id, compress_hyper_2_4_chunk.v0, compress_hyper_2_4_chunk.v1, compress_hyper_2_4_chunk.v2, compress_hyper_2_4_chunk.v3, compress_hyper_2_4_chunk._ts_meta_count, compress_hyper_2_4_chunk._ts_meta_sequence_num, compress_hyper_2_4_chunk._ts_meta_min_1, compress_hyper_2_4_chunk._ts_meta_max_1
Expand All @@ -153,7 +151,7 @@ SELECT count(*), sum(v0), sum(v1), sum(v2), sum(v3) FROM testtable WHERE time >=
-> Seq Scan on _timescaledb_internal._hyper_1_2_chunk (actual rows=25 loops=1)
Output: _hyper_1_2_chunk.v0, _hyper_1_2_chunk.v1, _hyper_1_2_chunk.v2, _hyper_1_2_chunk.v3
Filter: ((_hyper_1_2_chunk."time" <= 'Mon Jan 31 16:00:00 2000 PST'::timestamp with time zone) AND (_hyper_1_2_chunk."time" >= ('2000-01-01 00:00:00+0'::cstring)::timestamp with time zone))
(37 rows)
(35 rows)

-- Force plain / sorted aggregation
SET enable_hashagg = OFF;
Expand All @@ -178,8 +176,7 @@ SELECT count(*), sum(v0), sum(v1), sum(v2), sum(v3) FROM testtable WHERE time >=
Output: PARTIAL count(*), PARTIAL sum(_hyper_1_1_chunk.v0), PARTIAL sum(_hyper_1_1_chunk.v1), PARTIAL sum(_hyper_1_1_chunk.v2), PARTIAL sum(_hyper_1_1_chunk.v3)
-> Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_1_1_chunk (actual rows=25 loops=1)
Output: _hyper_1_1_chunk.v0, _hyper_1_1_chunk.v1, _hyper_1_1_chunk.v2, _hyper_1_1_chunk.v3
Filter: (_hyper_1_1_chunk."time" >= ('2000-01-01 00:00:00+0'::cstring)::timestamp with time zone)
Vectorized Filter: (_hyper_1_1_chunk."time" <= 'Mon Jan 31 16:00:00 2000 PST'::timestamp with time zone)
Vectorized Filter: ((_hyper_1_1_chunk."time" <= 'Mon Jan 31 16:00:00 2000 PST'::timestamp with time zone) AND (_hyper_1_1_chunk."time" >= ('2000-01-01 00:00:00+0'::cstring)::timestamp with time zone))
Bulk Decompression: true
-> Seq Scan on _timescaledb_internal.compress_hyper_2_3_chunk (actual rows=5 loops=1)
Output: compress_hyper_2_3_chunk.filter_1, compress_hyper_2_3_chunk.filler_2, compress_hyper_2_3_chunk.filler_3, compress_hyper_2_3_chunk."time", compress_hyper_2_3_chunk.device_id, compress_hyper_2_3_chunk.v0, compress_hyper_2_3_chunk.v1, compress_hyper_2_3_chunk.v2, compress_hyper_2_3_chunk.v3, compress_hyper_2_3_chunk._ts_meta_count, compress_hyper_2_3_chunk._ts_meta_sequence_num, compress_hyper_2_3_chunk._ts_meta_min_1, compress_hyper_2_3_chunk._ts_meta_max_1
Expand All @@ -193,8 +190,7 @@ SELECT count(*), sum(v0), sum(v1), sum(v2), sum(v3) FROM testtable WHERE time >=
Output: PARTIAL count(*), PARTIAL sum(_hyper_1_2_chunk.v0), PARTIAL sum(_hyper_1_2_chunk.v1), PARTIAL sum(_hyper_1_2_chunk.v2), PARTIAL sum(_hyper_1_2_chunk.v3)
-> Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_1_2_chunk (actual rows=25 loops=1)
Output: _hyper_1_2_chunk.v0, _hyper_1_2_chunk.v1, _hyper_1_2_chunk.v2, _hyper_1_2_chunk.v3
Filter: (_hyper_1_2_chunk."time" >= ('2000-01-01 00:00:00+0'::cstring)::timestamp with time zone)
Vectorized Filter: (_hyper_1_2_chunk."time" <= 'Mon Jan 31 16:00:00 2000 PST'::timestamp with time zone)
Vectorized Filter: ((_hyper_1_2_chunk."time" <= 'Mon Jan 31 16:00:00 2000 PST'::timestamp with time zone) AND (_hyper_1_2_chunk."time" >= ('2000-01-01 00:00:00+0'::cstring)::timestamp with time zone))
Bulk Decompression: true
-> Seq Scan on _timescaledb_internal.compress_hyper_2_4_chunk (actual rows=5 loops=1)
Output: compress_hyper_2_4_chunk.filter_1, compress_hyper_2_4_chunk.filler_2, compress_hyper_2_4_chunk.filler_3, compress_hyper_2_4_chunk."time", compress_hyper_2_4_chunk.device_id, compress_hyper_2_4_chunk.v0, compress_hyper_2_4_chunk.v1, compress_hyper_2_4_chunk.v2, compress_hyper_2_4_chunk.v3, compress_hyper_2_4_chunk._ts_meta_count, compress_hyper_2_4_chunk._ts_meta_sequence_num, compress_hyper_2_4_chunk._ts_meta_min_1, compress_hyper_2_4_chunk._ts_meta_max_1
Expand All @@ -204,7 +200,7 @@ SELECT count(*), sum(v0), sum(v1), sum(v2), sum(v3) FROM testtable WHERE time >=
-> Seq Scan on _timescaledb_internal._hyper_1_2_chunk (actual rows=25 loops=1)
Output: _hyper_1_2_chunk.v0, _hyper_1_2_chunk.v1, _hyper_1_2_chunk.v2, _hyper_1_2_chunk.v3
Filter: ((_hyper_1_2_chunk."time" <= 'Mon Jan 31 16:00:00 2000 PST'::timestamp with time zone) AND (_hyper_1_2_chunk."time" >= ('2000-01-01 00:00:00+0'::cstring)::timestamp with time zone))
(37 rows)
(35 rows)

RESET enable_hashagg;
-- Check Append Node under ChunkAppend
Expand Down
Loading