Skip to content

Commit

Permalink
Vectorized aggregation execution for sum()
Browse files Browse the repository at this point in the history
This commit introduces a vectorized version of the sum() aggregate
function on compressed data. This optimization is enabled if (1) the
data is compressed, (2) no filters or grouping is applied, (3) the data
type is a 32-bit sum, and (4) the aggregation can be pushed down to
the chunk level.
  • Loading branch information
jnidzwetzki committed Oct 18, 2023
1 parent f1fff68 commit 0656d17
Show file tree
Hide file tree
Showing 24 changed files with 2,769 additions and 152 deletions.
1 change: 1 addition & 0 deletions .unreleased/feature_6050
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implements: #6050 Vectorized aggregation execution for sum()
13 changes: 12 additions & 1 deletion src/compat/compat.h
Original file line number Diff line number Diff line change
Expand Up @@ -417,11 +417,18 @@ get_reindex_options(ReindexStmt *stmt)
#endif

/*
* define lfifth macro for convenience
* define some list macros for convenience
*/
#define lfifth(l) lfirst(list_nth_cell(l, 4))
#define lfifth_int(l) lfirst_int(list_nth_cell(l, 4))

#define lsixth(l) lfirst(list_nth_cell(l, 5))
#define lsixth_int(l) lfirst_int(list_nth_cell(l, 5))

#define list_make6(x1, x2, x3, x4, x5, x6) lappend(list_make5(x1, x2, x3, x4, x5), x6)
#define list_make6_oid(x1, x2, x3, x4, x5, x6) lappend_oid(list_make5_oid(x1, x2, x3, x4, x5), x6)
#define list_make6_int(x1, x2, x3, x4, x5, x6) lappend_int(list_make5_int(x1, x2, x3, x4, x5), x6)

/* PG14 adds estinfo parameter to estimate_num_groups for additional context
* about the estimation
* https://github.com/postgres/postgres/commit/ed934d4fa3
Expand Down Expand Up @@ -984,4 +991,8 @@ object_ownercheck(Oid classid, Oid objectid, Oid roleid)
}
#endif

#if PG14_LT
#define F_SUM_INT4 2108
#endif

#endif /* TIMESCALEDB_COMPAT_H */
9 changes: 9 additions & 0 deletions src/cross_module_fn.c
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,13 @@ job_execute_default_fn(BgwJob *job)
pg_unreachable();
}

static bool
does_node_perform_agg_internally(PlannerInfo *root, AggPath *aggregation_path, Path *subpath)
{
/* Don't skip adding the agg node on top of the path */
return false;
}

static bool
process_compress_table_default(AlterTableCmd *cmd, Hypertable *ht,
WithClauseResult *with_clause_options)
Expand Down Expand Up @@ -475,6 +482,8 @@ TSDLLEXPORT CrossModuleFunctions ts_cm_functions_default = {
.policies_alter = error_no_default_fn_pg_community,
.policies_show = error_no_default_fn_pg_community,

.does_node_perform_agg_internally = does_node_perform_agg_internally,

.partialize_agg = error_no_default_fn_pg_community,
.finalize_agg_sfunc = error_no_default_fn_pg_community,
.finalize_agg_ffunc = error_no_default_fn_pg_community,
Expand Down
4 changes: 4 additions & 0 deletions src/cross_module_fn.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ typedef struct CrossModuleFunctions
void (*ddl_command_end)(EventTriggerData *command);
void (*sql_drop)(List *dropped_objects);

/* Vectorized queries */
bool (*does_node_perform_agg_internally)(PlannerInfo *root, AggPath *aggregation_path,
Path *subpath);

/* Continuous Aggregates */
PGFunction partialize_agg;
PGFunction finalize_agg_sfunc;
Expand Down
12 changes: 12 additions & 0 deletions src/guc.c
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ bool ts_guc_enable_per_data_node_queries = true;
bool ts_guc_enable_parameterized_data_node_scan = true;
bool ts_guc_enable_async_append = true;
bool ts_guc_enable_chunkwise_aggregation = true;
bool ts_guc_enable_vectorized_aggregation = true;
TSDLLEXPORT bool ts_guc_enable_compression_indexscan = true;
TSDLLEXPORT bool ts_guc_enable_bulk_decompression = true;
TSDLLEXPORT int ts_guc_bgw_log_level;
Expand Down Expand Up @@ -579,6 +580,17 @@ _guc_init(void)
NULL,
NULL);

DefineCustomBoolVariable("timescaledb.vectorized_aggregation",
"Enable vectorized aggregation",
"Enable vectorized aggregation for compressed data",
&ts_guc_enable_vectorized_aggregation,
true,
PGC_USERSET,
0,
NULL,
NULL,
NULL);

DefineCustomBoolVariable("timescaledb.enable_remote_explain",
"Show explain from remote nodes when using VERBOSE flag",
"Enable getting and showing EXPLAIN output from remote nodes",
Expand Down
1 change: 1 addition & 0 deletions src/guc.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ extern TSDLLEXPORT bool ts_guc_enable_parameterized_data_node_scan;
extern TSDLLEXPORT bool ts_guc_enable_async_append;
extern TSDLLEXPORT bool ts_guc_enable_skip_scan;
extern TSDLLEXPORT bool ts_guc_enable_chunkwise_aggregation;
extern TSDLLEXPORT bool ts_guc_enable_vectorized_aggregation;
extern bool ts_guc_restoring;
extern int ts_guc_max_open_chunks_per_insert;
extern int ts_guc_max_cached_chunks_per_hypertable;
Expand Down
123 changes: 79 additions & 44 deletions src/planner/partialize.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <parser/parse_func.h>
#include <utils/lsyscache.h>

#include "cross_module_fn.h"
#include "debug_assert.h"
#include "partialize.h"
#include "planner.h"
Expand Down Expand Up @@ -329,7 +330,7 @@ copy_append_like_path(PlannerInfo *root, Path *path, List *new_subpaths, PathTar
/*
* Generate a partially sorted aggregated agg path on top of a path
*/
static Path *
static AggPath *
create_sorted_partial_agg_path(PlannerInfo *root, Path *path, PathTarget *target,
double d_num_groups, GroupPathExtraData *extra_data)
{
Expand All @@ -345,24 +346,24 @@ create_sorted_partial_agg_path(PlannerInfo *root, Path *path, PathTarget *target
path = (Path *) create_sort_path(root, path->parent, path, root->group_pathkeys, -1.0);
}

Path *sorted_agg_path = (Path *) create_agg_path(root,
path->parent,
path,
target,
parse->groupClause ? AGG_SORTED : AGG_PLAIN,
AGGSPLIT_INITIAL_SERIAL,
parse->groupClause,
NIL,
agg_partial_costs,
d_num_groups);
AggPath *sorted_agg_path = create_agg_path(root,
path->parent,
path,
target,
parse->groupClause ? AGG_SORTED : AGG_PLAIN,
AGGSPLIT_INITIAL_SERIAL,
parse->groupClause,
NIL,
agg_partial_costs,
d_num_groups);

return sorted_agg_path;
}

/*
* Generate a partially hashed aggregated add path on top of a path
*/
static Path *
static AggPath *
create_hashed_partial_agg_path(PlannerInfo *root, Path *path, PathTarget *target,
double d_num_groups, GroupPathExtraData *extra_data)
{
Expand All @@ -371,16 +372,16 @@ create_hashed_partial_agg_path(PlannerInfo *root, Path *path, PathTarget *target
/* Determine costs for aggregations */
AggClauseCosts *agg_partial_costs = &extra_data->agg_partial_costs;

Path *hash_path = (Path *) create_agg_path(root,
path->parent,
path,
target,
AGG_HASHED,
AGGSPLIT_INITIAL_SERIAL,
parse->groupClause,
NIL,
agg_partial_costs,
d_num_groups);
AggPath *hash_path = create_agg_path(root,
path->parent,
path,
target,
AGG_HASHED,
AGGSPLIT_INITIAL_SERIAL,
parse->groupClause,
NIL,
agg_partial_costs,
d_num_groups);
return hash_path;
}

Expand All @@ -405,22 +406,32 @@ add_partially_aggregated_subpaths(PlannerInfo *root, Path *parent_path,

if (can_sort)
{
*sorted_paths = lappend(*sorted_paths,
create_sorted_partial_agg_path(root,
subpath,
chunktarget,
d_num_groups,
extra_data));
AggPath *agg_path =
create_sorted_partial_agg_path(root, subpath, chunktarget, d_num_groups, extra_data);

if (ts_cm_functions->does_node_perform_agg_internally(root, agg_path, subpath))
{
*sorted_paths = lappend(*sorted_paths, subpath);
}
else
{
*sorted_paths = lappend(*sorted_paths, (Path *) agg_path);
}
}

if (can_hash)
{
*hashed_paths = lappend(*hashed_paths,
create_hashed_partial_agg_path(root,
subpath,
chunktarget,
d_num_groups,
extra_data));
AggPath *agg_path =
create_hashed_partial_agg_path(root, subpath, chunktarget, d_num_groups, extra_data);

if (ts_cm_functions->does_node_perform_agg_internally(root, agg_path, subpath))
{
*hashed_paths = lappend(*hashed_paths, subpath);
}
else
{
*hashed_paths = lappend(*hashed_paths, (Path *) agg_path);
}
}
}

Expand Down Expand Up @@ -646,27 +657,51 @@ get_best_total_path(RelOptInfo *output_rel)
return output_rel->cheapest_total_path;
}

/*
Is the provided path a agg path that uses a sorted or plain agg strategy?
*/
static bool
is_path_sorted_or_plain_agg_path(Path *path)
{
AggPath *agg_path = castNode(AggPath, path);
Assert(agg_path->aggstrategy == AGG_SORTED || agg_path->aggstrategy == AGG_PLAIN ||
agg_path->aggstrategy == AGG_HASHED);
return agg_path->aggstrategy == AGG_SORTED || agg_path->aggstrategy == AGG_PLAIN;
}

/*
* Check if this path belongs to a plain or sorted aggregation
*/
static bool
is_plain_or_sorted_agg_path(Path *path)
contains_path_plain_or_sorted_agg(Path *path)
{
List *subpaths = get_subpaths_from_append_path(path, true);

Ensure(subpaths != NIL, "Unable to determine aggregation type");

Path *subpath = linitial(subpaths);

if (IsA(subpath, AggPath))
ListCell *lc;
foreach (lc, subpaths)
{
AggPath *agg_path = castNode(AggPath, linitial(subpaths));
Assert(agg_path->aggstrategy == AGG_SORTED || agg_path->aggstrategy == AGG_PLAIN ||
agg_path->aggstrategy == AGG_HASHED);
return agg_path->aggstrategy == AGG_SORTED || agg_path->aggstrategy == AGG_PLAIN;
Path *subpath = lfirst(lc);

if (IsA(subpath, AggPath))
return is_path_sorted_or_plain_agg_path(subpath);

List *subsubpaths = get_subpaths_from_append_path(path, true);

ListCell *lc2;
foreach (lc2, subsubpaths)
{
Path *subsubpath = lfirst(lc2);

if (IsA(subsubpath, AggPath))
is_path_sorted_or_plain_agg_path(subsubpath);
}
}

return is_plain_or_sorted_agg_path(subpath);
/* No dedicated aggregation nodes found (e.g., only vectorized aggregation is used). The sorted
* finalizer is used in that case to finalize the aggregation. */
return true;
}

/*
Expand Down Expand Up @@ -832,7 +867,7 @@ ts_pushdown_partial_agg(PlannerInfo *root, Hypertable *ht, RelOptInfo *input_rel
{
Path *append_path = lfirst(lc);

if (is_plain_or_sorted_agg_path(append_path))
if (contains_path_plain_or_sorted_agg(append_path))
{
bool is_sorted;

Expand Down
2 changes: 2 additions & 0 deletions src/planner/partialize.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
*/
#ifndef TIMESCALEDB_PLAN_PARTIALIZE_H
#define TIMESCALEDB_PLAN_PARTIALIZE_H

#include <postgres.h>
#include <nodes/pathnodes.h>
#include <optimizer/planner.h>

#include "chunk.h"
Expand Down
1 change: 1 addition & 0 deletions tsl/src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ set(SOURCES
dist_backup.c
hypertable.c
init.c
partialize_agg.c
partialize_finalize.c
planner.c
process_utility.c
Expand Down
4 changes: 4 additions & 0 deletions tsl/src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
#include "nodes/decompress_chunk/planner.h"
#include "nodes/skip_scan/skip_scan.h"
#include "nodes/gapfill/gapfill_functions.h"
#include "partialize_agg.h"
#include "partialize_finalize.h"
#include "planner.h"
#include "process_utility.h"
Expand Down Expand Up @@ -144,6 +145,9 @@ CrossModuleFunctions tsl_cm_functions = {
.policies_alter = policies_alter,
.policies_show = policies_show,

/* Vectorized queries */
.does_node_perform_agg_internally = apply_vectorized_agg_optimization,

/* Continuous Aggregates */
.partialize_agg = tsl_partialize_agg,
.finalize_agg_sfunc = tsl_finalize_agg_sfunc,
Expand Down
Loading

0 comments on commit 0656d17

Please sign in to comment.