Skip to content

Commit

Permalink
First path replacement tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jnidzwetzki committed May 5, 2023
1 parent 3f7e9c4 commit c835fa8
Show file tree
Hide file tree
Showing 11 changed files with 220 additions and 7 deletions.
6 changes: 3 additions & 3 deletions src/guc.c
Original file line number Diff line number Diff line change
Expand Up @@ -444,9 +444,9 @@ _guc_init(void)
NULL);

DefineCustomBoolVariable("timescaledb.enable_partitionwise_aggregation",
"Enable async query execution on data nodes",
"Enable optimization that runs remote queries asynchronously"
"across data nodes",
"Enable partition-wise aggregation per chunk",
"Enable the partition-wise aggregation pushdown to the"
"chunk level",
&ts_guc_enable_partitionwise_aggregation,
true,
PGC_USERSET,
Expand Down
2 changes: 2 additions & 0 deletions tsl/src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include "hypertable.h"
#include "license_guc.h"
#include "nodes/decompress_chunk/planner.h"
#include "nodes/decompress_chunk_vector/planner.h"
#include "nodes/skip_scan/skip_scan.h"
#include "nodes/gapfill/gapfill_functions.h"
#include "partialize_finalize.h"
Expand Down Expand Up @@ -268,6 +269,7 @@ ts_module_init(PG_FUNCTION_ARGS)

_continuous_aggs_cache_inval_init();
_decompress_chunk_init();
_decompress_chunk_vector_init();
_skip_scan_init();
_remote_connection_cache_init();
_remote_dist_txn_init();
Expand Down
8 changes: 8 additions & 0 deletions tsl/src/nodes/decompress_chunk/decompress_chunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -1848,3 +1848,11 @@ build_sortinfo(Chunk *chunk, RelOptInfo *chunk_rel, CompressionInfo *info, List
sort_info.can_pushdown_sort = true;
return sort_info;
}

/* Check if the provided path is a DecompressChunkPath */
bool
ts_is_decompress_chunk_path(Path *path)
{
return IsA(path, CustomPath) &&
castNode(CustomPath, path)->methods == &decompress_chunk_path_methods;
}
2 changes: 2 additions & 0 deletions tsl/src/nodes/decompress_chunk/decompress_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,6 @@ void ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *rel, Hype
FormData_hypertable_compression *get_column_compressioninfo(List *hypertable_compression_info,
char *column_name);

extern bool ts_is_decompress_chunk_path(Path *path);

#endif /* TIMESCALEDB_DECOMPRESS_CHUNK_H */
4 changes: 3 additions & 1 deletion tsl/src/nodes/decompress_chunk_vector/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# Add all *.c to sources in upperlevel directory
set(SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/decompress_chunk_vector.c)
set(SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/decompress_chunk_vector.c
${CMAKE_CURRENT_SOURCE_DIR}/exec.c ${CMAKE_CURRENT_SOURCE_DIR}/planner.c)
target_sources(${TSL_LIBRARY_NAME} PRIVATE ${SOURCES})
34 changes: 32 additions & 2 deletions tsl/src/nodes/decompress_chunk_vector/decompress_chunk_vector.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,34 @@
#include "nodes/decompress_chunk_vector/decompress_chunk_vector.h"
#include "utils.h"

static void
handle_agg_sub_path(Path *agg_sub_path)
{
Assert(agg_sub_path != NULL);

// Get Paths from Append
// Check Paths
// Replace with a DecompressChunkVectorPath if ts_is_decompress_chunk_path is true
}

/*
* This function searches for a partial aggregation node on top of a DecompressChunk node
* and replace it by our DecompressChunkVector node.
*
* For example
*
* -> Partial Aggregate (cost=304.18..304.19 rows=1 width=8)
* -> Custom Scan (DecompressChunk) on _hyper_34_35_chunk (cost=0.08..9.18 rows=118000
* width=4)
* -> Parallel Seq Scan on compress_hyper_35_42_chunk (cost=0.00..9.18 rows=118
* width=8)
*
* Will be replaced by
*
* -> Custom Scan (VectorDecompressChunk) on _hyper_34_35_chunk (cost=0.08..9.18 rows=118000
* width=4)
* -> Parallel Seq Scan on compress_hyper_35_42_chunk (cost=0.00..9.18 rows=118 width=8)
*/
void
ts_decompress_vector_modify_paths(PlannerInfo *root, RelOptInfo *input_rel, RelOptInfo *output_rel)
{
Expand All @@ -53,7 +81,9 @@ ts_decompress_vector_modify_paths(PlannerInfo *root, RelOptInfo *input_rel, RelO
if (aggregation_path->aggsplit != AGGSPLIT_FINAL_DESERIAL)
continue;

Path *aggregation_sub_path = aggregation_path->subpath;
Assert(aggregation_sub_path);
/* Handle the subpath of the aggregation */
Path *agg_sub_path = aggregation_path->subpath;
Assert(agg_sub_path != NULL);
handle_agg_sub_path(agg_sub_path);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@
#include "chunk.h"
#include "hypertable.h"

typedef struct DecompressChunkVectorPath
{
CustomPath cpath;
} DecompressChunkVectorPath;

extern void ts_decompress_vector_modify_paths(PlannerInfo *root, RelOptInfo *input_rel,
RelOptInfo *output_rel);

#endif
#endif
76 changes: 76 additions & 0 deletions tsl/src/nodes/decompress_chunk_vector/exec.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* This file and its contents are licensed under the Timescale License.
* Please see the included NOTICE for copyright information and
* LICENSE-TIMESCALE for a copy of the license.
*/

#include <postgres.h>
#include <miscadmin.h>
#include <access/sysattr.h>
#include <executor/executor.h>
#include <nodes/bitmapset.h>
#include <nodes/makefuncs.h>
#include <nodes/nodeFuncs.h>
#include <parser/parsetree.h>
#include <rewrite/rewriteManip.h>
#include <utils/builtins.h>
#include <utils/datum.h>
#include <utils/memutils.h>
#include <utils/typcache.h>

#include "compat/compat.h"
#include "compression/array.h"
#include "compression/compression.h"
#include "nodes/decompress_chunk_vector/exec.h"
#include "ts_catalog/hypertable_compression.h"

static TupleTableSlot *decompress_chunk_vector_exec(CustomScanState *node);
static void decompress_chunk_vector_begin(CustomScanState *node, EState *estate, int eflags);
static void decompress_chunk_vector_end(CustomScanState *node);
static void decompress_chunk_vector_rescan(CustomScanState *node);
static void decompress_chunk_vector_explain(CustomScanState *node, List *ancestors,
ExplainState *es);

static CustomExecMethods decompress_chunk_vector_state_methods = {
.BeginCustomScan = decompress_chunk_vector_begin,
.ExecCustomScan = decompress_chunk_vector_exec,
.EndCustomScan = decompress_chunk_vector_end,
.ReScanCustomScan = decompress_chunk_vector_rescan,
.ExplainCustomScan = decompress_chunk_vector_explain,
};

static TupleTableSlot *
decompress_chunk_vector_exec(CustomScanState *node)
{
return NULL;
}

static void
decompress_chunk_vector_begin(CustomScanState *node, EState *estate, int eflags)
{
}

static void
decompress_chunk_vector_end(CustomScanState *node)
{
}

static void
decompress_chunk_vector_rescan(CustomScanState *node)
{
}

static void
decompress_chunk_vector_explain(CustomScanState *node, List *ancestors, ExplainState *es)
{
}

Node *
decompress_chunk_vector_state_create(CustomScan *cscan)
{
DecompressChunkVectorState *chunk_state;
chunk_state = (DecompressChunkVectorState *) newNode(sizeof(DecompressChunkVectorState),
T_CustomScanState);
chunk_state->csstate.methods = &decompress_chunk_vector_state_methods;
return (Node *) chunk_state;
}
19 changes: 19 additions & 0 deletions tsl/src/nodes/decompress_chunk_vector/exec.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* This file and its contents are licensed under the Timescale License.
* Please see the included NOTICE for copyright information and
* LICENSE-TIMESCALE for a copy of the license.
*/

#ifndef TIMESCALEDB_DECOMPRESS_VECTOR_CHUNK_EXEC_H
#define TIMESCALEDB_DECOMPRESS_VECTOR_CHUNK_EXEC_H

#include <postgres.h>

typedef struct DecompressChunkVectorState
{
CustomScanState csstate;
} DecompressChunkVectorState;

extern Node *decompress_chunk_vector_state_create(CustomScan *cscan);

#endif
52 changes: 52 additions & 0 deletions tsl/src/nodes/decompress_chunk_vector/planner.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* This file and its contents are licensed under the Timescale License.
* Please see the included NOTICE for copyright information and
* LICENSE-TIMESCALE for a copy of the license.
*/

#include <postgres.h>
#include <access/sysattr.h>
#include <catalog/pg_namespace.h>
#include <catalog/pg_operator.h>
#include <nodes/bitmapset.h>
#include <nodes/extensible.h>
#include <nodes/makefuncs.h>
#include <nodes/nodeFuncs.h>
#include <optimizer/optimizer.h>
#include <optimizer/paths.h>
#include <optimizer/plancat.h>
#include <optimizer/restrictinfo.h>
#include <optimizer/tlist.h>
#include <parser/parsetree.h>
#include <utils/builtins.h>
#include <utils/typcache.h>

#include "nodes/decompress_chunk_vector/decompress_chunk_vector.h"
#include "nodes/decompress_chunk_vector/planner.h"
#include "nodes/decompress_chunk_vector/exec.h"
#include "utils.h"

static CustomScanMethods decompress_chunk_plan_methods = {
.CustomName = "DecompressChunk (Vector)",
.CreateCustomScanState = decompress_chunk_vector_state_create,
};

void
_decompress_chunk_vector_init(void)
{
TryRegisterCustomScanMethods(&decompress_chunk_plan_methods);
}

Plan *
decompress_chunk_vector_plan_create(PlannerInfo *root, RelOptInfo *rel, CustomPath *path)
{
DecompressChunkVectorPath *dcpath = (DecompressChunkVectorPath *) path;
CustomScan *decompress_plan = makeNode(CustomScan);

// TODO: Copy information from path to plan
Assert(dcpath);

decompress_plan->methods = &decompress_chunk_plan_methods;

return &decompress_plan->scan.plan;
}
17 changes: 17 additions & 0 deletions tsl/src/nodes/decompress_chunk_vector/planner.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* This file and its contents are licensed under the Timescale License.
* Please see the included NOTICE for copyright information and
* LICENSE-TIMESCALE for a copy of the license.
*/

#ifndef TIMESCALEDB_DECOMPRESS_CHUNK_VECTOR_PLANNER_H
#define TIMESCALEDB_DECOMPRESS_CHUNK_VECTOR_PLANNER_H

#include <postgres.h>

extern Plan *decompress_chunk_vector_plan_create(PlannerInfo *root, RelOptInfo *rel,
CustomPath *path);

extern void _decompress_chunk_vector_init(void);

#endif

0 comments on commit c835fa8

Please sign in to comment.