From 4c47fdbe110655f2ff97e69238564458e10fe9dd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabr=C3=ADzio=20de=20Royes=20Mello?= Date: Mon, 2 Dec 2024 19:18:46 -0300 Subject: [PATCH] Refactor continuous aggregate materialization code In addition to the code reorganization and simplification we changed it by splitting the query execution in multiple steps taking advantage of `SPI_prepare`, `SPI_execute_plan` and `SPI_freeplan`: * create_materialization_plan(PlanType) * execute_materialization_plan(PlanType) * free_materialization_plan(PlanType) This PR is in preparation for a following PR to execute the materialization in small batches to alleviate the I/O spikes when reading and writing many buckets. --- tsl/src/continuous_aggs/materialize.c | 362 +++++++++++++------------- 1 file changed, 181 insertions(+), 181 deletions(-) diff --git a/tsl/src/continuous_aggs/materialize.c b/tsl/src/continuous_aggs/materialize.c index 43e9fa54bb7..30263d25bf1 100644 --- a/tsl/src/continuous_aggs/materialize.c +++ b/tsl/src/continuous_aggs/materialize.c @@ -143,9 +143,9 @@ static void free_materialization_plans(); static void update_watermark(MaterializationContext *context); static bool exists_materializations(MaterializationContext *context); -static void execute_materializations(MaterializationContext *context); static uint64 execute_materialization(MaterializationContext *context, MaterializationPlanType plan_type); +static void execute_materializations(MaterializationContext *context); /* API to update materializations from refresh code */ void @@ -233,6 +233,186 @@ continuous_agg_update_materialization(Hypertable *mat_ht, const ContinuousAgg *c AtEOXact_GUC(false, save_nestlevel); } +static bool +ranges_overlap(InternalTimeRange invalidation_range, InternalTimeRange new_materialization_range) +{ + Assert(invalidation_range.start <= invalidation_range.end); + Assert(new_materialization_range.start <= new_materialization_range.end); + return !(invalidation_range.end < new_materialization_range.start || + new_materialization_range.end < invalidation_range.start); +} + +static int64 +range_length(const InternalTimeRange range) +{ + Assert(range.end >= range.start); + + return int64_saturating_sub(range.end, range.start); +} + +static Datum +time_range_internal_to_min_time_value(Oid type) +{ + switch (type) + { + case TIMESTAMPOID: + return TimestampGetDatum(DT_NOBEGIN); + case TIMESTAMPTZOID: + return TimestampTzGetDatum(DT_NOBEGIN); + case DATEOID: + return DateADTGetDatum(DATEVAL_NOBEGIN); + default: + return ts_internal_to_time_value(PG_INT64_MIN, type); + } +} + +static Datum +time_range_internal_to_max_time_value(Oid type) +{ + switch (type) + { + case TIMESTAMPOID: + return TimestampGetDatum(DT_NOEND); + case TIMESTAMPTZOID: + return TimestampTzGetDatum(DT_NOEND); + case DATEOID: + return DateADTGetDatum(DATEVAL_NOEND); + break; + default: + return ts_internal_to_time_value(PG_INT64_MAX, type); + } +} + +static Datum +internal_to_time_value_or_infinite(int64 internal, Oid time_type, bool *is_infinite_out) +{ + /* MIN and MAX can occur due to NULL thresholds, or due to a lack of invalidations. Since our + * regular conversion function errors in those cases, and we want to use those as markers for an + * open threshold in one direction, we special case this here*/ + if (internal == PG_INT64_MIN) + { + if (is_infinite_out != NULL) + *is_infinite_out = true; + return time_range_internal_to_min_time_value(time_type); + } + else if (internal == PG_INT64_MAX) + { + if (is_infinite_out != NULL) + *is_infinite_out = true; + return time_range_internal_to_max_time_value(time_type); + } + else + { + if (is_infinite_out != NULL) + *is_infinite_out = false; + return ts_internal_to_time_value(internal, time_type); + } +} + +static TimeRange +internal_time_range_to_time_range(InternalTimeRange internal) +{ + TimeRange range; + range.type = internal.type; + + range.start = internal_to_time_value_or_infinite(internal.start, internal.type, NULL); + range.end = internal_to_time_value_or_infinite(internal.end, internal.type, NULL); + + return range; +} + +static List * +cagg_find_aggref_and_var_cols(ContinuousAgg *cagg, Hypertable *mat_ht) +{ + List *retlist = NIL; + ListCell *lc; + Query *cagg_view_query = ts_continuous_agg_get_query(cagg); + + foreach (lc, cagg_view_query->targetList) + { + TargetEntry *tle = castNode(TargetEntry, lfirst(lc)); + + if (!tle->resjunk && (tle->ressortgroupref == 0 || + get_sortgroupref_clause_noerr(tle->ressortgroupref, + cagg_view_query->groupClause) == NULL)) + retlist = lappend(retlist, get_attname(mat_ht->main_table_relid, tle->resno, false)); + } + + return retlist; +} + +static char * +build_merge_insert_columns(List *strings, const char *separator, const char *prefix) +{ + StringInfo ret = makeStringInfo(); + + Assert(strings != NIL); + + ListCell *lc; + foreach (lc, strings) + { + char *grpcol = (char *) lfirst(lc); + if (ret->len > 0) + appendStringInfoString(ret, separator); + + if (prefix) + appendStringInfoString(ret, prefix); + appendStringInfoString(ret, quote_identifier(grpcol)); + } + + elog(DEBUG2, "%s: %s", __func__, ret->data); + return ret->data; +} + +static char * +build_merge_join_clause(List *column_names) +{ + StringInfo ret = makeStringInfo(); + + Assert(column_names != NIL); + + ListCell *lc; + foreach (lc, column_names) + { + char *column = (char *) lfirst(lc); + + if (ret->len > 0) + appendStringInfoString(ret, " AND "); + + appendStringInfoString(ret, "P."); + appendStringInfoString(ret, quote_identifier(column)); + appendStringInfoString(ret, " = M."); + appendStringInfoString(ret, quote_identifier(column)); + } + + elog(DEBUG2, "%s: %s", __func__, ret->data); + return ret->data; +} + +static char * +build_merge_update_clause(List *column_names) +{ + StringInfo ret = makeStringInfo(); + + Assert(column_names != NIL); + + ListCell *lc; + foreach (lc, column_names) + { + char *column = (char *) lfirst(lc); + + if (ret->len > 0) + appendStringInfoString(ret, ", "); + + appendStringInfoString(ret, quote_identifier(column)); + appendStringInfoString(ret, " = P."); + appendStringInfoString(ret, quote_identifier(column)); + } + + elog(DEBUG2, "%s: %s", __func__, ret->data); + return ret->data; +} + static MaterializationPlan * get_materialization_plan(MaterializationPlanType plan_type) { @@ -453,186 +633,6 @@ free_materialization_plans() } } -static bool -ranges_overlap(InternalTimeRange invalidation_range, InternalTimeRange new_materialization_range) -{ - Assert(invalidation_range.start <= invalidation_range.end); - Assert(new_materialization_range.start <= new_materialization_range.end); - return !(invalidation_range.end < new_materialization_range.start || - new_materialization_range.end < invalidation_range.start); -} - -static int64 -range_length(const InternalTimeRange range) -{ - Assert(range.end >= range.start); - - return int64_saturating_sub(range.end, range.start); -} - -static Datum -time_range_internal_to_min_time_value(Oid type) -{ - switch (type) - { - case TIMESTAMPOID: - return TimestampGetDatum(DT_NOBEGIN); - case TIMESTAMPTZOID: - return TimestampTzGetDatum(DT_NOBEGIN); - case DATEOID: - return DateADTGetDatum(DATEVAL_NOBEGIN); - default: - return ts_internal_to_time_value(PG_INT64_MIN, type); - } -} - -static Datum -time_range_internal_to_max_time_value(Oid type) -{ - switch (type) - { - case TIMESTAMPOID: - return TimestampGetDatum(DT_NOEND); - case TIMESTAMPTZOID: - return TimestampTzGetDatum(DT_NOEND); - case DATEOID: - return DateADTGetDatum(DATEVAL_NOEND); - break; - default: - return ts_internal_to_time_value(PG_INT64_MAX, type); - } -} - -static Datum -internal_to_time_value_or_infinite(int64 internal, Oid time_type, bool *is_infinite_out) -{ - /* MIN and MAX can occur due to NULL thresholds, or due to a lack of invalidations. Since our - * regular conversion function errors in those cases, and we want to use those as markers for an - * open threshold in one direction, we special case this here*/ - if (internal == PG_INT64_MIN) - { - if (is_infinite_out != NULL) - *is_infinite_out = true; - return time_range_internal_to_min_time_value(time_type); - } - else if (internal == PG_INT64_MAX) - { - if (is_infinite_out != NULL) - *is_infinite_out = true; - return time_range_internal_to_max_time_value(time_type); - } - else - { - if (is_infinite_out != NULL) - *is_infinite_out = false; - return ts_internal_to_time_value(internal, time_type); - } -} - -static TimeRange -internal_time_range_to_time_range(InternalTimeRange internal) -{ - TimeRange range; - range.type = internal.type; - - range.start = internal_to_time_value_or_infinite(internal.start, internal.type, NULL); - range.end = internal_to_time_value_or_infinite(internal.end, internal.type, NULL); - - return range; -} - -static List * -cagg_find_aggref_and_var_cols(ContinuousAgg *cagg, Hypertable *mat_ht) -{ - List *retlist = NIL; - ListCell *lc; - Query *cagg_view_query = ts_continuous_agg_get_query(cagg); - - foreach (lc, cagg_view_query->targetList) - { - TargetEntry *tle = castNode(TargetEntry, lfirst(lc)); - - if (!tle->resjunk && (tle->ressortgroupref == 0 || - get_sortgroupref_clause_noerr(tle->ressortgroupref, - cagg_view_query->groupClause) == NULL)) - retlist = lappend(retlist, get_attname(mat_ht->main_table_relid, tle->resno, false)); - } - - return retlist; -} - -static char * -build_merge_insert_columns(List *strings, const char *separator, const char *prefix) -{ - StringInfo ret = makeStringInfo(); - - Assert(strings != NIL); - - ListCell *lc; - foreach (lc, strings) - { - char *grpcol = (char *) lfirst(lc); - if (ret->len > 0) - appendStringInfoString(ret, separator); - - if (prefix) - appendStringInfoString(ret, prefix); - appendStringInfoString(ret, quote_identifier(grpcol)); - } - - elog(DEBUG2, "%s: %s", __func__, ret->data); - return ret->data; -} - -static char * -build_merge_join_clause(List *column_names) -{ - StringInfo ret = makeStringInfo(); - - Assert(column_names != NIL); - - ListCell *lc; - foreach (lc, column_names) - { - char *column = (char *) lfirst(lc); - - if (ret->len > 0) - appendStringInfoString(ret, " AND "); - - appendStringInfoString(ret, "P."); - appendStringInfoString(ret, quote_identifier(column)); - appendStringInfoString(ret, " = M."); - appendStringInfoString(ret, quote_identifier(column)); - } - - elog(DEBUG2, "%s: %s", __func__, ret->data); - return ret->data; -} - -static char * -build_merge_update_clause(List *column_names) -{ - StringInfo ret = makeStringInfo(); - - Assert(column_names != NIL); - - ListCell *lc; - foreach (lc, column_names) - { - char *column = (char *) lfirst(lc); - - if (ret->len > 0) - appendStringInfoString(ret, ", "); - - appendStringInfoString(ret, quote_identifier(column)); - appendStringInfoString(ret, " = P."); - appendStringInfoString(ret, quote_identifier(column)); - } - - elog(DEBUG2, "%s: %s", __func__, ret->data); - return ret->data; -} - static void update_watermark(MaterializationContext *context) {