Skip to content

Commit

Permalink
Fixed ChunkAppend regressions, improved handling of partial compresse…
Browse files Browse the repository at this point in the history
…d chunks
  • Loading branch information
jnidzwetzki committed Aug 24, 2023
1 parent c2cd40d commit 7bd1a7c
Show file tree
Hide file tree
Showing 9 changed files with 682 additions and 542 deletions.
258 changes: 154 additions & 104 deletions src/planner/partialize.c
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,8 @@ partialize_agg_paths(RelOptInfo *rel)
return has_combine;
}

/* Get an an existing aggregation path for the given relation or NULL if no aggregation path exists.
/*
* Get an an existing aggregation path for the given relation or NULL if no aggregation path exists.
*/
static bool
has_min_max_agg_path(RelOptInfo *relation)
Expand All @@ -212,7 +213,8 @@ has_min_max_agg_path(RelOptInfo *relation)
return false;
}

/* Get an an existing aggregation path for the given relation or NULL if no aggregation path exists.
/*
* Get an an existing aggregation path for the given relation or NULL if no aggregation path exists.
*/
static AggPath *
get_existing_agg_path(RelOptInfo *relation)
Expand All @@ -231,7 +233,9 @@ get_existing_agg_path(RelOptInfo *relation)
return NULL;
}

/* Get all subpaths from a Append, MergeAppend, or ChunkAppend path */
/*
* Get all subpaths from a Append, MergeAppend, or ChunkAppend path
*/
static List *
get_subpaths_from_append_path(Path *path, bool handle_gather_path)
{
Expand Down Expand Up @@ -259,7 +263,9 @@ get_subpaths_from_append_path(Path *path, bool handle_gather_path)
return NIL;
}

/* Copy an AppendPath and set new subpaths. */
/*
* Copy an AppendPath and set new subpaths.
*/
static AppendPath *
copy_append_path(AppendPath *path, List *subpaths)
{
Expand All @@ -271,7 +277,9 @@ copy_append_path(AppendPath *path, List *subpaths)
return newPath;
}

/* Copy a MergeAppendPath and set new subpaths. */
/*
* Copy a MergeAppendPath and set new subpaths.
*/
static MergeAppendPath *
copy_merge_append_path(PlannerInfo *root, MergeAppendPath *path, List *subpaths)
{
Expand All @@ -291,7 +299,9 @@ copy_merge_append_path(PlannerInfo *root, MergeAppendPath *path, List *subpaths)
return newPath;
}

/* Copy an append like path and set new subpaths */
/*
* Copy an append like path and set new subpaths
*/
static Path *
copy_append_like_path(PlannerInfo *root, Path *path, List *new_subpaths)
{
Expand All @@ -317,7 +327,9 @@ copy_append_like_path(PlannerInfo *root, Path *path, List *new_subpaths)
pg_unreachable();
}

/* Generate a partially sorted aggregated agg path on top of a path */
/*
* Generate a partially sorted aggregated agg path on top of a path
*/
static Path *
get_sorted_partially_agg_path(PlannerInfo *root, Path *path, PathTarget *target,
double d_num_groups, GroupPathExtraData *extra_data)
Expand Down Expand Up @@ -350,7 +362,9 @@ get_sorted_partially_agg_path(PlannerInfo *root, Path *path, PathTarget *target,
return sorted_agg_path;
}

/* Generate a partially hashed aggregated add path on top of a path */
/*
* Generate a partially hashed aggregated add path on top of a path
*/
static Path *
get_hashed_partially_agg_path(PlannerInfo *root, Path *path, PathTarget *target,
double d_num_groups, GroupPathExtraData *extra_data)
Expand All @@ -373,11 +387,14 @@ get_hashed_partially_agg_path(PlannerInfo *root, Path *path, PathTarget *target,
return hash_path;
}

/*
* Add partially aggregated subpath
*/
static void
create_partially_aggregated_subpaths(PlannerInfo *root, Path *parent_path,
PathTarget *partial_grouping_target, double d_num_groups,
GroupPathExtraData *extra_data, bool can_sort, bool can_hash,
Path *subpath, Path **sorted_path, Path **hashed_path)
add_partially_aggregated_subpaths(PlannerInfo *root, Path *parent_path,
PathTarget *partial_grouping_target, double d_num_groups,
GroupPathExtraData *extra_data, bool can_sort, bool can_hash,
Path *subpath, List **sorted_paths, List **hashed_paths)
{
/* Translate targetlist for partition */
AppendRelInfo *appinfo = ts_get_appendrelinfo(root, subpath->parent->relid, false);
Expand All @@ -391,14 +408,22 @@ create_partially_aggregated_subpaths(PlannerInfo *root, Path *parent_path,

if (can_sort)
{
*sorted_path =
get_sorted_partially_agg_path(root, subpath, chunktarget, d_num_groups, extra_data);
*sorted_paths = lappend(*sorted_paths,
get_sorted_partially_agg_path(root,
subpath,
chunktarget,
d_num_groups,
extra_data));
}

if (can_hash)
{
*hashed_path =
get_hashed_partially_agg_path(root, subpath, chunktarget, d_num_groups, extra_data);
*hashed_paths = lappend(*hashed_paths,
get_hashed_partially_agg_path(root,
subpath,
chunktarget,
d_num_groups,
extra_data));
}
}

Expand Down Expand Up @@ -427,10 +452,11 @@ generate_agg_pushdown_path(PlannerInfo *root, Path *cheapest_total_path, RelOptI
{
Path *subpath = lfirst(lc);

/* Check if we have an append path under an append path (e.g., a partially compressed chunk)*/
/* Check if we have an append path under an append path (e.g., a partially compressed
* chunk)*/
List *subsubpaths = get_subpaths_from_append_path(subpath, false);

if(subsubpaths != NIL)
if (subsubpaths != NIL)
{
List *sorted_subsubpaths = NIL;
List *hashed_subsubpaths = NIL;
Expand All @@ -440,66 +466,42 @@ generate_agg_pushdown_path(PlannerInfo *root, Path *cheapest_total_path, RelOptI
{
Path *subsubpath = lfirst(lc2);

Path *sorted_subsubpath = NULL;
Path *hashed_subsubpath = NULL;

create_partially_aggregated_subpaths(root,
cheapest_total_path,
partial_grouping_target,
d_num_groups,
extra_data,
can_sort,
can_hash,
subsubpath,
&sorted_subsubpath /* Result path */,
&hashed_subsubpath /* Result path */);

if (sorted_subsubpath != NULL)
{
sorted_subsubpaths = lappend(sorted_subsubpaths, sorted_subsubpath);
}

if (hashed_subsubpath != NULL)
{
hashed_subsubpaths = lappend(hashed_subsubpaths, hashed_subsubpath);
}
add_partially_aggregated_subpaths(root,
cheapest_total_path,
partial_grouping_target,
d_num_groups,
extra_data,
can_sort,
can_hash,
subsubpath,
&sorted_subsubpaths /* Result path */,
&hashed_subsubpaths /* Result path */);
}

if(can_sort)
if (can_sort)
{
sorted_subpaths = lappend(sorted_subpaths, copy_append_like_path(root, subpath, sorted_subsubpaths));
sorted_subpaths = lappend(sorted_subpaths,
copy_append_like_path(root, subpath, sorted_subsubpaths));
}

if(can_hash)
if (can_hash)
{
hashed_subpaths = lappend(hashed_subpaths, copy_append_like_path(root, subpath, hashed_subsubpaths));
hashed_subpaths = lappend(hashed_subpaths,
copy_append_like_path(root, subpath, hashed_subsubpaths));
}

continue;
}

Path *sorted_subpath = NULL;
Path *hashed_subpath = NULL;

create_partially_aggregated_subpaths(root,
cheapest_total_path,
partial_grouping_target,
d_num_groups,
extra_data,
can_sort,
can_hash,
subpath,
&sorted_subpath /* Result path */,
&hashed_subpath /* Result path */);

if (sorted_subpath != NULL)
{
sorted_subpaths = lappend(sorted_subpaths, sorted_subpath);
}

if (hashed_subpath != NULL)
else
{
hashed_subpaths = lappend(hashed_subpaths, hashed_subpath);
add_partially_aggregated_subpaths(root,
cheapest_total_path,
partial_grouping_target,
d_num_groups,
extra_data,
can_sort,
can_hash,
subpath,
&sorted_subpaths /* Result paths */,
&hashed_subpaths /* Result paths */);
}
}

Expand Down Expand Up @@ -547,28 +549,56 @@ generate_partial_agg_pushdown_path(PlannerInfo *root, Path *cheapest_partial_pat

Assert(subpath->parallel_safe);

Path *sorted_subpath = NULL;
Path *hashed_subpath = NULL;

create_partially_aggregated_subpaths(root,
cheapest_partial_path,
partial_grouping_target,
d_num_groups,
extra_data,
can_sort,
can_hash,
subpath,
&sorted_subpath /* Result path */,
&hashed_subpath /* Result path */);

if (sorted_subpath != NULL)
/* Check if we have an append path under an append path (e.g., a partially compressed
* chunk)*/
List *subsubpaths = get_subpaths_from_append_path(subpath, false);

if (subsubpaths != NIL)
{
sorted_subpaths = lappend(sorted_subpaths, sorted_subpath);
}
List *sorted_subsubpaths = NIL;
List *hashed_subsubpaths = NIL;

ListCell *lc2;
foreach (lc2, subsubpaths)
{
Path *subsubpath = lfirst(lc2);

if (can_hash)
add_partially_aggregated_subpaths(root,
cheapest_partial_path,
partial_grouping_target,
d_num_groups,
extra_data,
can_sort,
can_hash,
subsubpath,
&sorted_subsubpaths /* Result path */,
&hashed_subsubpaths /* Result path */);
}

if (can_sort)
{
sorted_subpaths = lappend(sorted_subpaths,
copy_append_like_path(root, subpath, sorted_subsubpaths));
}

if (can_hash)
{
hashed_subpaths = lappend(hashed_subpaths,
copy_append_like_path(root, subpath, hashed_subsubpaths));
}
}
else
{
hashed_subpaths = lappend(hashed_subpaths, hashed_subpath);
add_partially_aggregated_subpaths(root,
cheapest_partial_path,
partial_grouping_target,
d_num_groups,
extra_data,
can_sort,
can_hash,
subpath,
&sorted_subpaths /* Result paths */,
&hashed_subpaths /* Result paths */);
}
}

Expand Down Expand Up @@ -603,22 +633,46 @@ generate_partial_agg_pushdown_path(PlannerInfo *root, Path *cheapest_partial_pat
}
}

static Path*
get_best_total_path(RelOptInfo* output_rel)
/*
* Get the best total path for aggregation (prefer chunk appends if we have one)
*/
static Path *
get_best_total_path(RelOptInfo *output_rel)
{
ListCell *lc;
foreach (lc, output_rel->pathlist)
{
Path *path = lfirst(lc);

ListCell *lc;
foreach(lc, output_rel->pathlist)
{
Path *path = lfirst(lc);

if(ts_is_chunk_append_path(path))
return path;
}
if (ts_is_chunk_append_path(path))
return path;
}

return output_rel->cheapest_total_path;
return output_rel->cheapest_total_path;
}

/*
* Check if this path belongs to a plain or sorted aggregation
*/
static bool
is_plain_or_sorted_agg_path(Path *path)
{
List *subpaths = get_subpaths_from_append_path(path, true);

Ensure(subpaths != NIL, "Unable to determine aggregation type");

Path *subpath = linitial(subpaths);

if (IsA(subpath, AggPath))
{
AggPath *agg_path = castNode(AggPath, linitial(subpaths));
Assert(agg_path->aggstrategy == AGG_SORTED || agg_path->aggstrategy == AGG_PLAIN ||
agg_path->aggstrategy == AGG_HASHED);
return agg_path->aggstrategy == AGG_SORTED || agg_path->aggstrategy == AGG_PLAIN;
}

return is_plain_or_sorted_agg_path(subpath);
}

/*
* Convert the aggregation into a partial aggregation and push them down to the chunk level
Expand Down Expand Up @@ -749,12 +803,8 @@ ts_pushdown_partial_agg(PlannerInfo *root, Hypertable *ht, RelOptInfo *input_rel
foreach (lc, partially_grouped_rel->pathlist)
{
Path *append_path = lfirst(lc);
List *subpaths = get_subpaths_from_append_path(append_path, true);
Assert(subpaths != NIL);

AggPath *agg_path = castNode(AggPath, linitial(subpaths));

if (agg_path->aggstrategy != AGG_HASHED)
if (is_plain_or_sorted_agg_path(append_path))
{
bool is_sorted;
int presorted_keys;
Expand Down
Loading

0 comments on commit 7bd1a7c

Please sign in to comment.