Skip to content

Commit

Permalink
Reworked path handling
Browse files Browse the repository at this point in the history
  • Loading branch information
jnidzwetzki committed May 22, 2023
1 parent 82a77c9 commit c0aeb0e
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 19 deletions.
2 changes: 1 addition & 1 deletion src/planner/expand_hypertable.c
Original file line number Diff line number Diff line change
Expand Up @@ -1420,7 +1420,7 @@ ts_plan_expand_hypertable_chunks(Hypertable *ht, PlannerInfo *root, RelOptInfo *
!has_partialize_function((Node *) root->parse->targetList, TS_DO_NOT_FIX_AGGSPLIT)) ||
hypertable_is_distributed(ht))
{
if(! hypertable_is_distributed(ht))
if (!hypertable_is_distributed(ht))
enable_partitionwise_aggregate = true;

build_hypertable_partition_info(ht, root, rel, list_length(inh_oids));
Expand Down
101 changes: 83 additions & 18 deletions tsl/src/nodes/decompress_chunk_vector/decompress_chunk_vector.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,101 @@
#include "nodes/decompress_chunk_vector/decompress_chunk_vector.h"
#include "utils.h"

/* Check if we can vectorize the given path */
static bool
is_vectorizable_agg_path(Path *path)
{
if (!IsA(path, AggPath))
return false;

AggPath *agg_path = castNode(AggPath, path);

bool is_plain_agg = (agg_path->aggstrategy == AGG_PLAIN);
if (!is_plain_agg)
return false;

bool is_decompress_chunk = ts_is_decompress_chunk_path(agg_path->subpath);
if (!is_decompress_chunk)
return false;

/* We currently handle only one agg function per node */
if (list_length(agg_path->path.pathtarget->exprs) != 1)
return false;

/* Only sum on int 4 is supported at the moment */
Node *expr_node = linitial(agg_path->path.pathtarget->exprs);
if (!IsA(expr_node, Aggref))
return false;

Aggref *aggref = castNode(Aggref, expr_node);

if (aggref->aggfnoid != F_SUM_INT4)
return false;

return true;
}

/* Generate cheaper path with our vector node */
static void
handle_agg_sub_path(Path *agg_sub_path)
add_vector_path_append(AppendPath *append_path, Path *path)
{
Assert(path != NULL);
Assert(IsA(path, AggPath));

AggPath *agg_path = castNode(AggPath, path);
}

static void
add_vector_path_merge_append(MergeAppendPath *merge_append_path, Path *path)
{
List *subpaths = NIL;
Assert(path != NULL);
Assert(IsA(path, AggPath));

AggPath *agg_path = castNode(AggPath, path);
}

static void
handle_agg_sub_path(Path *agg_sub_path)
{
Assert(agg_sub_path != NULL);

if (IsA(agg_sub_path, AppendPath))
{
AppendPath *append_path = castNode(AppendPath, agg_sub_path);
subpaths = append_path->subpaths;
List *subpaths = append_path->subpaths;

/* No subpaths available */
if (list_length(subpaths) < 1)
return;

ListCell *lc;

/* Check if subpath can be vectorized */
foreach (lc, subpaths)
{
Path *sub_path = lfirst(lc);
if (is_vectorizable_agg_path(sub_path))
add_vector_path_append(append_path, sub_path);
}
}
else if (IsA(agg_sub_path, MergeAppendPath))
{
MergeAppendPath *merge_append_path = castNode(MergeAppendPath, agg_sub_path);
subpaths = merge_append_path->subpaths;
List *subpaths = merge_append_path->subpaths;

/* No subpaths available */
if (list_length(subpaths) < 1)
return;

ListCell *lc;

/* Check if subpath can be vectorized */
foreach (lc, subpaths)
{
Path *sub_path = lfirst(lc);
if (is_vectorizable_agg_path(sub_path))
add_vector_path_merge_append(merge_append_path, sub_path);
}
}
else if (IsA(agg_sub_path, GatherPath))
{
Expand All @@ -59,20 +138,6 @@ handle_agg_sub_path(Path *agg_sub_path)
if (gather_path->subpath != NULL)
handle_agg_sub_path(gather_path->subpath);
}
else
{
/* Unsupported sub path */
return;
}

/* No subpaths available */
if (list_length(subpaths) < 1)
return;

// Get Paths from Append and MergeAppendPath
// Check for disabled optimizations (ChunkAppend)
// Check Paths for partial aggegate on top of ts_is_decompress_chunk_path
// Replace with a DecompressChunkVectorPath
}

/*
Expand Down

0 comments on commit c0aeb0e

Please sign in to comment.