Skip to content

Commit

Permalink
Added path replacement
Browse files Browse the repository at this point in the history
  • Loading branch information
jnidzwetzki committed May 22, 2023
1 parent c0aeb0e commit 6d013a5
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 38 deletions.
85 changes: 51 additions & 34 deletions tsl/src/nodes/decompress_chunk_vector/decompress_chunk_vector.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,16 @@
#include "import/planner.h"
#include "compression/create.h"
#include "nodes/decompress_chunk/decompress_chunk.h"
#include "nodes/decompress_chunk/planner.h"
#include "nodes/decompress_chunk/qual_pushdown.h"
#include "nodes/decompress_chunk_vector/decompress_chunk_vector.h"
#include "nodes/decompress_chunk_vector/planner.h"
#include "utils.h"

static CustomPathMethods decompress_chunk_vector_path_methods = {
.CustomName = "DecompressChunk (Vector)",
.PlanCustomPath = decompress_chunk_vector_plan_create,
};

/* Check if we can vectorize the given path */
static bool
is_vectorizable_agg_path(Path *path)
Expand Down Expand Up @@ -68,25 +73,53 @@ is_vectorizable_agg_path(Path *path)

/* Generate cheaper path with our vector node */
static void
add_vector_path_append(AppendPath *append_path, Path *path)
change_to_vector_path(PlannerInfo *root, RelOptInfo *output_rel, AggPath *aggregation_path,
List *subpaths)
{
Assert(path != NULL);
Assert(IsA(path, AggPath));
Assert(root != NULL);
Assert(subpaths != NULL);

AggPath *agg_path = castNode(AggPath, path);
}
ListCell *lc;

static void
add_vector_path_merge_append(MergeAppendPath *merge_append_path, Path *path)
{
Assert(path != NULL);
Assert(IsA(path, AggPath));
/* Check if subpaths can be vectorized */
foreach (lc, subpaths)
{
Path *sub_path = lfirst(lc);

AggPath *agg_path = castNode(AggPath, path);
if (is_vectorizable_agg_path(sub_path))
{
Assert(IsA(sub_path, AggPath));
AggPath *agg_path = castNode(AggPath, sub_path);

Assert(ts_is_decompress_chunk_path(agg_path->subpath));
DecompressChunkPath *decompress_path =
(DecompressChunkPath *) castNode(CustomPath, agg_path->subpath);

Assert(decompress_path != NULL);

DecompressChunkVectorPath *vector_path =
(DecompressChunkVectorPath *) newNode(sizeof(DecompressChunkVectorPath),
T_CustomPath);

// TODO: Get planner data from subpath
vector_path->cpath = decompress_path->cpath;
vector_path->info = decompress_path->info;

/* Set the target to our custom vector node */
vector_path->cpath.methods = &decompress_chunk_vector_path_methods;

/* Our node should emit partials */
vector_path->cpath.path.pathtarget = aggregation_path->path.pathtarget;

Assert(vector_path != NULL);
lfirst(lc) = vector_path;
}
}
}

static void
handle_agg_sub_path(Path *agg_sub_path)
handle_agg_sub_path(PlannerInfo *root, RelOptInfo *output_rel, AggPath *aggregation_path,
Path *agg_sub_path)
{
Assert(agg_sub_path != NULL);

Expand All @@ -99,15 +132,7 @@ handle_agg_sub_path(Path *agg_sub_path)
if (list_length(subpaths) < 1)
return;

ListCell *lc;

/* Check if subpath can be vectorized */
foreach (lc, subpaths)
{
Path *sub_path = lfirst(lc);
if (is_vectorizable_agg_path(sub_path))
add_vector_path_append(append_path, sub_path);
}
change_to_vector_path(root, output_rel, aggregation_path, subpaths);
}
else if (IsA(agg_sub_path, MergeAppendPath))
{
Expand All @@ -118,15 +143,7 @@ handle_agg_sub_path(Path *agg_sub_path)
if (list_length(subpaths) < 1)
return;

ListCell *lc;

/* Check if subpath can be vectorized */
foreach (lc, subpaths)
{
Path *sub_path = lfirst(lc);
if (is_vectorizable_agg_path(sub_path))
add_vector_path_merge_append(merge_append_path, sub_path);
}
change_to_vector_path(root, output_rel, aggregation_path, subpaths);
}
else if (IsA(agg_sub_path, GatherPath))
{
Expand All @@ -136,7 +153,7 @@ handle_agg_sub_path(Path *agg_sub_path)
// TODO: Maybe extract AGGPATH subpath also here

if (gather_path->subpath != NULL)
handle_agg_sub_path(gather_path->subpath);
handle_agg_sub_path(root, output_rel, aggregation_path, gather_path->subpath);
}
}

Expand Down Expand Up @@ -183,7 +200,7 @@ ts_decompress_vector_modify_paths(PlannerInfo *root, RelOptInfo *input_rel, RelO

/* Handle the subpath of the aggregation */
Path *agg_sub_path = aggregation_path->subpath;
Assert(agg_sub_path != NULL);
handle_agg_sub_path(agg_sub_path);

handle_agg_sub_path(root, output_rel, aggregation_path, agg_sub_path);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@

#include "chunk.h"
#include "hypertable.h"
#include "nodes/decompress_chunk/decompress_chunk.h"

typedef struct DecompressChunkVectorPath
{
CustomPath cpath;
CompressionInfo *info;
} DecompressChunkVectorPath;

extern void ts_decompress_vector_modify_paths(PlannerInfo *root, RelOptInfo *input_rel,
Expand Down
19 changes: 16 additions & 3 deletions tsl/src/nodes/decompress_chunk_vector/planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,28 @@ _decompress_chunk_vector_init(void)
}

Plan *
decompress_chunk_vector_plan_create(PlannerInfo *root, RelOptInfo *rel, CustomPath *path)
decompress_chunk_vector_plan_create(PlannerInfo *root, RelOptInfo *rel, CustomPath *path,
List *decompressed_tlist, List *clauses, List *custom_plans)
{
// DecompressChunkVectorPath *dcpath = (DecompressChunkVectorPath *) path;
DecompressChunkVectorPath *dcpath = (DecompressChunkVectorPath *) path;
CustomScan *decompress_plan = makeNode(CustomScan);

// TODO: Copy information from path to plan
// Assert(dcpath);

decompress_plan->flags = path->flags;
decompress_plan->methods = &decompress_chunk_plan_methods;

/* Relid */
decompress_plan->scan.scanrelid = dcpath->info->chunk_rel->relid;

/* Targetlist */
decompress_plan->scan.plan.targetlist = decompressed_tlist;
decompress_plan->custom_scan_tlist = decompressed_tlist;

// FIXME "Aggref found in non-Agg plan node"
// decompress_plan->custom_scan_tlist = NIL;

decompress_plan->custom_plans = custom_plans;

return &decompress_plan->scan.plan;
}
3 changes: 2 additions & 1 deletion tsl/src/nodes/decompress_chunk_vector/planner.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
#include <postgres.h>

extern Plan *decompress_chunk_vector_plan_create(PlannerInfo *root, RelOptInfo *rel,
CustomPath *path);
CustomPath *path, List *decompressed_tlist,
List *clauses, List *custom_plans);

extern void _decompress_chunk_vector_init(void);

Expand Down

0 comments on commit 6d013a5

Please sign in to comment.