From c835fa83685f6dcab5664056d5d6ca3db97c4319 Mon Sep 17 00:00:00 2001 From: Jan Nidzwetzki Date: Fri, 5 May 2023 14:05:10 +0200 Subject: [PATCH] First path replacement tests --- src/guc.c | 6 +- tsl/src/init.c | 2 + .../nodes/decompress_chunk/decompress_chunk.c | 8 ++ .../nodes/decompress_chunk/decompress_chunk.h | 2 + .../decompress_chunk_vector/CMakeLists.txt | 4 +- .../decompress_chunk_vector.c | 34 ++++++++- .../decompress_chunk_vector.h | 7 +- tsl/src/nodes/decompress_chunk_vector/exec.c | 76 +++++++++++++++++++ tsl/src/nodes/decompress_chunk_vector/exec.h | 19 +++++ .../nodes/decompress_chunk_vector/planner.c | 52 +++++++++++++ .../nodes/decompress_chunk_vector/planner.h | 17 +++++ 11 files changed, 220 insertions(+), 7 deletions(-) create mode 100644 tsl/src/nodes/decompress_chunk_vector/exec.c create mode 100644 tsl/src/nodes/decompress_chunk_vector/exec.h create mode 100644 tsl/src/nodes/decompress_chunk_vector/planner.c create mode 100644 tsl/src/nodes/decompress_chunk_vector/planner.h diff --git a/src/guc.c b/src/guc.c index 9fa50cac093..be81abbb866 100644 --- a/src/guc.c +++ b/src/guc.c @@ -444,9 +444,9 @@ _guc_init(void) NULL); DefineCustomBoolVariable("timescaledb.enable_partitionwise_aggregation", - "Enable async query execution on data nodes", - "Enable optimization that runs remote queries asynchronously" - "across data nodes", + "Enable partition-wise aggregation per chunk", + "Enable the partition-wise aggregation pushdown to the" + "chunk level", &ts_guc_enable_partitionwise_aggregation, true, PGC_USERSET, diff --git a/tsl/src/init.c b/tsl/src/init.c index 210ca1a2996..d64c13e801a 100644 --- a/tsl/src/init.c +++ b/tsl/src/init.c @@ -40,6 +40,7 @@ #include "hypertable.h" #include "license_guc.h" #include "nodes/decompress_chunk/planner.h" +#include "nodes/decompress_chunk_vector/planner.h" #include "nodes/skip_scan/skip_scan.h" #include "nodes/gapfill/gapfill_functions.h" #include "partialize_finalize.h" @@ -268,6 +269,7 @@ ts_module_init(PG_FUNCTION_ARGS) _continuous_aggs_cache_inval_init(); _decompress_chunk_init(); + _decompress_chunk_vector_init(); _skip_scan_init(); _remote_connection_cache_init(); _remote_dist_txn_init(); diff --git a/tsl/src/nodes/decompress_chunk/decompress_chunk.c b/tsl/src/nodes/decompress_chunk/decompress_chunk.c index 2a5fe4f4202..3ea93063bfb 100644 --- a/tsl/src/nodes/decompress_chunk/decompress_chunk.c +++ b/tsl/src/nodes/decompress_chunk/decompress_chunk.c @@ -1848,3 +1848,11 @@ build_sortinfo(Chunk *chunk, RelOptInfo *chunk_rel, CompressionInfo *info, List sort_info.can_pushdown_sort = true; return sort_info; } + +/* Check if the provided path is a DecompressChunkPath */ +bool +ts_is_decompress_chunk_path(Path *path) +{ + return IsA(path, CustomPath) && + castNode(CustomPath, path)->methods == &decompress_chunk_path_methods; +} diff --git a/tsl/src/nodes/decompress_chunk/decompress_chunk.h b/tsl/src/nodes/decompress_chunk/decompress_chunk.h index 5bc26d9671b..082461737d7 100644 --- a/tsl/src/nodes/decompress_chunk/decompress_chunk.h +++ b/tsl/src/nodes/decompress_chunk/decompress_chunk.h @@ -72,4 +72,6 @@ void ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *rel, Hype FormData_hypertable_compression *get_column_compressioninfo(List *hypertable_compression_info, char *column_name); +extern bool ts_is_decompress_chunk_path(Path *path); + #endif /* TIMESCALEDB_DECOMPRESS_CHUNK_H */ diff --git a/tsl/src/nodes/decompress_chunk_vector/CMakeLists.txt b/tsl/src/nodes/decompress_chunk_vector/CMakeLists.txt index fb9da9de5ea..737b78498ce 100644 --- a/tsl/src/nodes/decompress_chunk_vector/CMakeLists.txt +++ b/tsl/src/nodes/decompress_chunk_vector/CMakeLists.txt @@ -1,3 +1,5 @@ # Add all *.c to sources in upperlevel directory -set(SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/decompress_chunk_vector.c) +set(SOURCES + ${CMAKE_CURRENT_SOURCE_DIR}/decompress_chunk_vector.c + ${CMAKE_CURRENT_SOURCE_DIR}/exec.c ${CMAKE_CURRENT_SOURCE_DIR}/planner.c) target_sources(${TSL_LIBRARY_NAME} PRIVATE ${SOURCES}) diff --git a/tsl/src/nodes/decompress_chunk_vector/decompress_chunk_vector.c b/tsl/src/nodes/decompress_chunk_vector/decompress_chunk_vector.c index 017d724a692..8025118ac14 100644 --- a/tsl/src/nodes/decompress_chunk_vector/decompress_chunk_vector.c +++ b/tsl/src/nodes/decompress_chunk_vector/decompress_chunk_vector.c @@ -32,6 +32,34 @@ #include "nodes/decompress_chunk_vector/decompress_chunk_vector.h" #include "utils.h" +static void +handle_agg_sub_path(Path *agg_sub_path) +{ + Assert(agg_sub_path != NULL); + + // Get Paths from Append + // Check Paths + // Replace with a DecompressChunkVectorPath if ts_is_decompress_chunk_path is true +} + +/* + * This function searches for a partial aggregation node on top of a DecompressChunk node + * and replace it by our DecompressChunkVector node. + * + * For example + * + * -> Partial Aggregate (cost=304.18..304.19 rows=1 width=8) + * -> Custom Scan (DecompressChunk) on _hyper_34_35_chunk (cost=0.08..9.18 rows=118000 + * width=4) + * -> Parallel Seq Scan on compress_hyper_35_42_chunk (cost=0.00..9.18 rows=118 + * width=8) + * + * Will be replaced by + * + * -> Custom Scan (VectorDecompressChunk) on _hyper_34_35_chunk (cost=0.08..9.18 rows=118000 + * width=4) + * -> Parallel Seq Scan on compress_hyper_35_42_chunk (cost=0.00..9.18 rows=118 width=8) + */ void ts_decompress_vector_modify_paths(PlannerInfo *root, RelOptInfo *input_rel, RelOptInfo *output_rel) { @@ -53,7 +81,9 @@ ts_decompress_vector_modify_paths(PlannerInfo *root, RelOptInfo *input_rel, RelO if (aggregation_path->aggsplit != AGGSPLIT_FINAL_DESERIAL) continue; - Path *aggregation_sub_path = aggregation_path->subpath; - Assert(aggregation_sub_path); + /* Handle the subpath of the aggregation */ + Path *agg_sub_path = aggregation_path->subpath; + Assert(agg_sub_path != NULL); + handle_agg_sub_path(agg_sub_path); } } diff --git a/tsl/src/nodes/decompress_chunk_vector/decompress_chunk_vector.h b/tsl/src/nodes/decompress_chunk_vector/decompress_chunk_vector.h index e5d010e3eb6..e20a126b89e 100644 --- a/tsl/src/nodes/decompress_chunk_vector/decompress_chunk_vector.h +++ b/tsl/src/nodes/decompress_chunk_vector/decompress_chunk_vector.h @@ -13,7 +13,12 @@ #include "chunk.h" #include "hypertable.h" +typedef struct DecompressChunkVectorPath +{ + CustomPath cpath; +} DecompressChunkVectorPath; + extern void ts_decompress_vector_modify_paths(PlannerInfo *root, RelOptInfo *input_rel, RelOptInfo *output_rel); -#endif \ No newline at end of file +#endif diff --git a/tsl/src/nodes/decompress_chunk_vector/exec.c b/tsl/src/nodes/decompress_chunk_vector/exec.c new file mode 100644 index 00000000000..daf92acb723 --- /dev/null +++ b/tsl/src/nodes/decompress_chunk_vector/exec.c @@ -0,0 +1,76 @@ +/* + * This file and its contents are licensed under the Timescale License. + * Please see the included NOTICE for copyright information and + * LICENSE-TIMESCALE for a copy of the license. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "compat/compat.h" +#include "compression/array.h" +#include "compression/compression.h" +#include "nodes/decompress_chunk_vector/exec.h" +#include "ts_catalog/hypertable_compression.h" + +static TupleTableSlot *decompress_chunk_vector_exec(CustomScanState *node); +static void decompress_chunk_vector_begin(CustomScanState *node, EState *estate, int eflags); +static void decompress_chunk_vector_end(CustomScanState *node); +static void decompress_chunk_vector_rescan(CustomScanState *node); +static void decompress_chunk_vector_explain(CustomScanState *node, List *ancestors, + ExplainState *es); + +static CustomExecMethods decompress_chunk_vector_state_methods = { + .BeginCustomScan = decompress_chunk_vector_begin, + .ExecCustomScan = decompress_chunk_vector_exec, + .EndCustomScan = decompress_chunk_vector_end, + .ReScanCustomScan = decompress_chunk_vector_rescan, + .ExplainCustomScan = decompress_chunk_vector_explain, +}; + +static TupleTableSlot * +decompress_chunk_vector_exec(CustomScanState *node) +{ + return NULL; +} + +static void +decompress_chunk_vector_begin(CustomScanState *node, EState *estate, int eflags) +{ +} + +static void +decompress_chunk_vector_end(CustomScanState *node) +{ +} + +static void +decompress_chunk_vector_rescan(CustomScanState *node) +{ +} + +static void +decompress_chunk_vector_explain(CustomScanState *node, List *ancestors, ExplainState *es) +{ +} + +Node * +decompress_chunk_vector_state_create(CustomScan *cscan) +{ + DecompressChunkVectorState *chunk_state; + chunk_state = (DecompressChunkVectorState *) newNode(sizeof(DecompressChunkVectorState), + T_CustomScanState); + chunk_state->csstate.methods = &decompress_chunk_vector_state_methods; + return (Node *) chunk_state; +} diff --git a/tsl/src/nodes/decompress_chunk_vector/exec.h b/tsl/src/nodes/decompress_chunk_vector/exec.h new file mode 100644 index 00000000000..c1b190cf8f6 --- /dev/null +++ b/tsl/src/nodes/decompress_chunk_vector/exec.h @@ -0,0 +1,19 @@ +/* + * This file and its contents are licensed under the Timescale License. + * Please see the included NOTICE for copyright information and + * LICENSE-TIMESCALE for a copy of the license. + */ + +#ifndef TIMESCALEDB_DECOMPRESS_VECTOR_CHUNK_EXEC_H +#define TIMESCALEDB_DECOMPRESS_VECTOR_CHUNK_EXEC_H + +#include + +typedef struct DecompressChunkVectorState +{ + CustomScanState csstate; +} DecompressChunkVectorState; + +extern Node *decompress_chunk_vector_state_create(CustomScan *cscan); + +#endif diff --git a/tsl/src/nodes/decompress_chunk_vector/planner.c b/tsl/src/nodes/decompress_chunk_vector/planner.c new file mode 100644 index 00000000000..751772f3f8c --- /dev/null +++ b/tsl/src/nodes/decompress_chunk_vector/planner.c @@ -0,0 +1,52 @@ +/* + * This file and its contents are licensed under the Timescale License. + * Please see the included NOTICE for copyright information and + * LICENSE-TIMESCALE for a copy of the license. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "nodes/decompress_chunk_vector/decompress_chunk_vector.h" +#include "nodes/decompress_chunk_vector/planner.h" +#include "nodes/decompress_chunk_vector/exec.h" +#include "utils.h" + +static CustomScanMethods decompress_chunk_plan_methods = { + .CustomName = "DecompressChunk (Vector)", + .CreateCustomScanState = decompress_chunk_vector_state_create, +}; + +void +_decompress_chunk_vector_init(void) +{ + TryRegisterCustomScanMethods(&decompress_chunk_plan_methods); +} + +Plan * +decompress_chunk_vector_plan_create(PlannerInfo *root, RelOptInfo *rel, CustomPath *path) +{ + DecompressChunkVectorPath *dcpath = (DecompressChunkVectorPath *) path; + CustomScan *decompress_plan = makeNode(CustomScan); + + // TODO: Copy information from path to plan + Assert(dcpath); + + decompress_plan->methods = &decompress_chunk_plan_methods; + + return &decompress_plan->scan.plan; +} diff --git a/tsl/src/nodes/decompress_chunk_vector/planner.h b/tsl/src/nodes/decompress_chunk_vector/planner.h new file mode 100644 index 00000000000..0043205f0e5 --- /dev/null +++ b/tsl/src/nodes/decompress_chunk_vector/planner.h @@ -0,0 +1,17 @@ +/* + * This file and its contents are licensed under the Timescale License. + * Please see the included NOTICE for copyright information and + * LICENSE-TIMESCALE for a copy of the license. + */ + +#ifndef TIMESCALEDB_DECOMPRESS_CHUNK_VECTOR_PLANNER_H +#define TIMESCALEDB_DECOMPRESS_CHUNK_VECTOR_PLANNER_H + +#include + +extern Plan *decompress_chunk_vector_plan_create(PlannerInfo *root, RelOptInfo *rel, + CustomPath *path); + +extern void _decompress_chunk_vector_init(void); + +#endif