Skip to content

Commit

Permalink
Implemented the first vectorized aggregation
Browse files Browse the repository at this point in the history
This commit introduces a vectorized version of the sum() aggregate
function for compressed data. This optimization is enabled if (1) the
data is compressed, (2) no filters are applied, and (3) the aggregation
can be pushed down to the chunk level.
  • Loading branch information
jnidzwetzki committed Oct 12, 2023
1 parent 7a5cecf commit 8fe31a5
Show file tree
Hide file tree
Showing 22 changed files with 1,396 additions and 130 deletions.
1 change: 1 addition & 0 deletions .unreleased/feature_6050
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implements: #6050 Vectorized aggregation execution for sum()
9 changes: 8 additions & 1 deletion src/compat/compat.h
Original file line number Diff line number Diff line change
Expand Up @@ -417,11 +417,18 @@ get_reindex_options(ReindexStmt *stmt)
#endif

/*
* define lfifth macro for convenience
* define some list macros for convenience
*/
#define lfifth(l) lfirst(list_nth_cell(l, 4))
#define lfifth_int(l) lfirst_int(list_nth_cell(l, 4))

#define lsixth(l) lfirst(list_nth_cell(l, 5))
#define lsixth_int(l) lfirst_int(list_nth_cell(l, 5))

#define list_make6(x1, x2, x3, x4, x5, x6) lappend(list_make5(x1, x2, x3, x4, x5), x6)
#define list_make6_oid(x1, x2, x3, x4, x5, x6) lappend_oid(list_make5_oid(x1, x2, x3, x4, x5), x6)
#define list_make6_int(x1, x2, x3, x4, x5, x6) lappend_int(list_make5_int(x1, x2, x3, x4, x5), x6)

/* PG14 adds estinfo parameter to estimate_num_groups for additional context
* about the estimation
* https://github.com/postgres/postgres/commit/ed934d4fa3
Expand Down
9 changes: 9 additions & 0 deletions src/cross_module_fn.c
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,13 @@ job_execute_default_fn(BgwJob *job)
pg_unreachable();
}

static bool
should_skip_partial_agg_node_default_fn(PlannerInfo *root, AggPath *aggregation_path, Path *subpath)
{
/* Don't skip adding the agg node on top of the path */
return false;
}

static bool
process_compress_table_default(AlterTableCmd *cmd, Hypertable *ht,
WithClauseResult *with_clause_options)
Expand Down Expand Up @@ -475,6 +482,8 @@ TSDLLEXPORT CrossModuleFunctions ts_cm_functions_default = {
.policies_alter = error_no_default_fn_pg_community,
.policies_show = error_no_default_fn_pg_community,

.should_skip_partial_agg_node = should_skip_partial_agg_node_default_fn,

.partialize_agg = error_no_default_fn_pg_community,
.finalize_agg_sfunc = error_no_default_fn_pg_community,
.finalize_agg_ffunc = error_no_default_fn_pg_community,
Expand Down
4 changes: 4 additions & 0 deletions src/cross_module_fn.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ typedef struct CrossModuleFunctions
void (*ddl_command_end)(EventTriggerData *command);
void (*sql_drop)(List *dropped_objects);

/* Vectorized queries */
bool (*should_skip_partial_agg_node)(PlannerInfo *root, AggPath *aggregation_path,
Path *subpath);

/* Continuous Aggregates */
PGFunction partialize_agg;
PGFunction finalize_agg_sfunc;
Expand Down
12 changes: 12 additions & 0 deletions src/guc.c
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ bool ts_guc_enable_per_data_node_queries = true;
bool ts_guc_enable_parameterized_data_node_scan = true;
bool ts_guc_enable_async_append = true;
bool ts_guc_enable_chunkwise_aggregation = true;
bool ts_guc_enable_vectorized_aggregation = true;
TSDLLEXPORT bool ts_guc_enable_compression_indexscan = true;
TSDLLEXPORT bool ts_guc_enable_bulk_decompression = true;
TSDLLEXPORT bool ts_guc_enable_skip_scan = true;
Expand Down Expand Up @@ -570,6 +571,17 @@ _guc_init(void)
NULL,
NULL);

DefineCustomBoolVariable("timescaledb.vectorized_aggregation",
"Enable vectorized aggregation",
"Enable vectorized aggregation",
&ts_guc_enable_vectorized_aggregation,
true,
PGC_USERSET,
0,
NULL,
NULL,
NULL);

DefineCustomBoolVariable("timescaledb.enable_remote_explain",
"Show explain from remote nodes when using VERBOSE flag",
"Enable getting and showing EXPLAIN output from remote nodes",
Expand Down
1 change: 1 addition & 0 deletions src/guc.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ extern TSDLLEXPORT bool ts_guc_enable_parameterized_data_node_scan;
extern TSDLLEXPORT bool ts_guc_enable_async_append;
extern TSDLLEXPORT bool ts_guc_enable_skip_scan;
extern TSDLLEXPORT bool ts_guc_enable_chunkwise_aggregation;
extern TSDLLEXPORT bool ts_guc_enable_vectorized_aggregation;
extern bool ts_guc_restoring;
extern int ts_guc_max_open_chunks_per_insert;
extern int ts_guc_max_cached_chunks_per_hypertable;
Expand Down
123 changes: 79 additions & 44 deletions src/planner/partialize.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <parser/parse_func.h>
#include <utils/lsyscache.h>

#include "cross_module_fn.h"
#include "debug_assert.h"
#include "partialize.h"
#include "planner.h"
Expand Down Expand Up @@ -329,7 +330,7 @@ copy_append_like_path(PlannerInfo *root, Path *path, List *new_subpaths, PathTar
/*
* Generate a partially sorted aggregated agg path on top of a path
*/
static Path *
static AggPath *
create_sorted_partial_agg_path(PlannerInfo *root, Path *path, PathTarget *target,
double d_num_groups, GroupPathExtraData *extra_data)
{
Expand All @@ -345,24 +346,24 @@ create_sorted_partial_agg_path(PlannerInfo *root, Path *path, PathTarget *target
path = (Path *) create_sort_path(root, path->parent, path, root->group_pathkeys, -1.0);
}

Path *sorted_agg_path = (Path *) create_agg_path(root,
path->parent,
path,
target,
parse->groupClause ? AGG_SORTED : AGG_PLAIN,
AGGSPLIT_INITIAL_SERIAL,
parse->groupClause,
NIL,
agg_partial_costs,
d_num_groups);
AggPath *sorted_agg_path = create_agg_path(root,
path->parent,
path,
target,
parse->groupClause ? AGG_SORTED : AGG_PLAIN,
AGGSPLIT_INITIAL_SERIAL,
parse->groupClause,
NIL,
agg_partial_costs,
d_num_groups);

return sorted_agg_path;
}

/*
* Generate a partially hashed aggregated add path on top of a path
*/
static Path *
static AggPath *
create_hashed_partial_agg_path(PlannerInfo *root, Path *path, PathTarget *target,
double d_num_groups, GroupPathExtraData *extra_data)
{
Expand All @@ -371,16 +372,16 @@ create_hashed_partial_agg_path(PlannerInfo *root, Path *path, PathTarget *target
/* Determine costs for aggregations */
AggClauseCosts *agg_partial_costs = &extra_data->agg_partial_costs;

Path *hash_path = (Path *) create_agg_path(root,
path->parent,
path,
target,
AGG_HASHED,
AGGSPLIT_INITIAL_SERIAL,
parse->groupClause,
NIL,
agg_partial_costs,
d_num_groups);
AggPath *hash_path = create_agg_path(root,
path->parent,
path,
target,
AGG_HASHED,
AGGSPLIT_INITIAL_SERIAL,
parse->groupClause,
NIL,
agg_partial_costs,
d_num_groups);
return hash_path;
}

Expand All @@ -405,22 +406,32 @@ add_partially_aggregated_subpaths(PlannerInfo *root, Path *parent_path,

if (can_sort)
{
*sorted_paths = lappend(*sorted_paths,
create_sorted_partial_agg_path(root,
subpath,
chunktarget,
d_num_groups,
extra_data));
AggPath *agg_path =
create_sorted_partial_agg_path(root, subpath, chunktarget, d_num_groups, extra_data);

if (ts_cm_functions->should_skip_partial_agg_node(root, agg_path, subpath))
{
*sorted_paths = lappend(*sorted_paths, subpath);
}
else
{
*sorted_paths = lappend(*sorted_paths, (Path *) agg_path);
}
}

if (can_hash)
{
*hashed_paths = lappend(*hashed_paths,
create_hashed_partial_agg_path(root,
subpath,
chunktarget,
d_num_groups,
extra_data));
AggPath *agg_path =
create_hashed_partial_agg_path(root, subpath, chunktarget, d_num_groups, extra_data);

if (ts_cm_functions->should_skip_partial_agg_node(root, agg_path, subpath))
{
*hashed_paths = lappend(*hashed_paths, subpath);
}
else
{
*hashed_paths = lappend(*hashed_paths, (Path *) agg_path);
}
}
}

Expand Down Expand Up @@ -646,27 +657,51 @@ get_best_total_path(RelOptInfo *output_rel)
return output_rel->cheapest_total_path;
}

/*
Is the provided path a agg path that uses a sorted or plain agg strategy?
*/
static bool
is_path_sorted_or_plain_agg_path(Path *path)
{
AggPath *agg_path = castNode(AggPath, path);
Assert(agg_path->aggstrategy == AGG_SORTED || agg_path->aggstrategy == AGG_PLAIN ||
agg_path->aggstrategy == AGG_HASHED);
return agg_path->aggstrategy == AGG_SORTED || agg_path->aggstrategy == AGG_PLAIN;
}

/*
* Check if this path belongs to a plain or sorted aggregation
*/
static bool
is_plain_or_sorted_agg_path(Path *path)
contains_path_plain_or_sorted_agg(Path *path)
{
List *subpaths = get_subpaths_from_append_path(path, true);

Ensure(subpaths != NIL, "Unable to determine aggregation type");

Path *subpath = linitial(subpaths);

if (IsA(subpath, AggPath))
ListCell *lc;
foreach (lc, subpaths)
{
AggPath *agg_path = castNode(AggPath, linitial(subpaths));
Assert(agg_path->aggstrategy == AGG_SORTED || agg_path->aggstrategy == AGG_PLAIN ||
agg_path->aggstrategy == AGG_HASHED);
return agg_path->aggstrategy == AGG_SORTED || agg_path->aggstrategy == AGG_PLAIN;
Path *subpath = lfirst(lc);

if (IsA(subpath, AggPath))
return is_path_sorted_or_plain_agg_path(subpath);

List *subsubpaths = get_subpaths_from_append_path(path, true);

ListCell *lc2;
foreach (lc2, subsubpaths)
{
Path *subsubpath = lfirst(lc2);

if (IsA(subsubpath, AggPath))
is_path_sorted_or_plain_agg_path(subsubpath);
}
}

return is_plain_or_sorted_agg_path(subpath);
/* No dedicated aggregation nodes found (e.g., only vectorized aggregation is used). The sorted
* finalizer is used in that case to finalize the aggregation. */
return true;
}

/*
Expand Down Expand Up @@ -832,7 +867,7 @@ ts_pushdown_partial_agg(PlannerInfo *root, Hypertable *ht, RelOptInfo *input_rel
{
Path *append_path = lfirst(lc);

if (is_plain_or_sorted_agg_path(append_path))
if (contains_path_plain_or_sorted_agg(append_path))
{
bool is_sorted;

Expand Down
2 changes: 2 additions & 0 deletions src/planner/partialize.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
*/
#ifndef TIMESCALEDB_PLAN_PARTIALIZE_H
#define TIMESCALEDB_PLAN_PARTIALIZE_H

#include <postgres.h>
#include <nodes/pathnodes.h>
#include <optimizer/planner.h>

#include "chunk.h"
Expand Down
1 change: 1 addition & 0 deletions tsl/src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ set(SOURCES
dist_backup.c
hypertable.c
init.c
partialize_agg.c
partialize_finalize.c
planner.c
process_utility.c
Expand Down
4 changes: 4 additions & 0 deletions tsl/src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
#include "nodes/decompress_chunk/planner.h"
#include "nodes/skip_scan/skip_scan.h"
#include "nodes/gapfill/gapfill_functions.h"
#include "partialize_agg.h"
#include "partialize_finalize.h"
#include "planner.h"
#include "process_utility.h"
Expand Down Expand Up @@ -144,6 +145,9 @@ CrossModuleFunctions tsl_cm_functions = {
.policies_alter = policies_alter,
.policies_show = policies_show,

/* Vectorized queries */
.should_skip_partial_agg_node = tsl_try_change_to_vector_path,

/* Continuous Aggregates */
.partialize_agg = tsl_partialize_agg,
.finalize_agg_sfunc = tsl_finalize_agg_sfunc,
Expand Down
5 changes: 5 additions & 0 deletions tsl/src/nodes/decompress_chunk/decompress_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ typedef struct DecompressChunkPath
*/
DecompressChunkColumnCompression *uncompressed_chunk_attno_to_compression_info;

/*
* Are we able to execute a vectorized aggregation
*/
bool perform_vectorized_aggregation;

List *compressed_pathkeys;
bool needs_sequence_num;
bool reverse;
Expand Down
Loading

0 comments on commit 8fe31a5

Please sign in to comment.