From 63f7cc88022e6d9b43def591933c81e8977a5c3a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabr=C3=ADzio=20de=20Royes=20Mello?= Date: Thu, 28 Nov 2024 15:37:27 -0300 Subject: [PATCH] Refactor continuous aggregate materialization code In addition to the code reorganization and simplification we changed it by splitting the query execution in multiple steps taking advantage of `SPI_prepare`, `SPI_execute_plan` and `SPI_freeplan`: * create_materialization_plan(PlanType) * execute_materialization_plan(PlanType) * free_materialization_plan(PlanType) This PR is in preparation for a following PR to execute the materialization in small batches to alleviate the I/O spikes when reading and writing many buckets. --- tsl/src/continuous_aggs/materialize.c | 743 ++++++++++++++------------ 1 file changed, 398 insertions(+), 345 deletions(-) diff --git a/tsl/src/continuous_aggs/materialize.c b/tsl/src/continuous_aggs/materialize.c index 5dcb31a1cce..3fe6e0434bd 100644 --- a/tsl/src/continuous_aggs/materialize.c +++ b/tsl/src/continuous_aggs/materialize.c @@ -26,9 +26,12 @@ #include "ts_catalog/continuous_agg.h" #include "ts_catalog/continuous_aggs_watermark.h" -#define CHUNKIDFROMRELID "chunk_id_from_relid" #define CONTINUOUS_AGG_CHUNK_ID_COL_NAME "chunk_id" +/********************* + * utility functions * + *********************/ + static bool ranges_overlap(InternalTimeRange invalidation_range, InternalTimeRange new_materialization_range); static TimeRange internal_time_range_to_time_range(InternalTimeRange internal); @@ -43,33 +46,88 @@ static char *build_merge_update_clause(List *column_names); /*************************** * materialization support * ***************************/ -static void spi_update_watermark(Hypertable *mat_ht, SchemaAndName materialization_table, - const NameData *time_column_name, TimeRange materialization_range, - const char *const chunk_condition); -static void spi_update_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg, - SchemaAndName partial_view, - SchemaAndName materialization_table, - const NameData *time_column_name, - TimeRange invalidation_range, const int32 chunk_id); -static uint64 spi_delete_materializations(SchemaAndName materialization_table, - const NameData *time_column_name, - TimeRange materialization_range, - const char *const chunk_condition); -static uint64 spi_insert_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg, - SchemaAndName partial_view, - SchemaAndName materialization_table, - const NameData *time_column_name, - TimeRange materialization_range, - const char *const chunk_condition); -static bool spi_exists_materializations(SchemaAndName materialization_table, - const NameData *time_column_name, - TimeRange materialization_range); -static uint64 spi_merge_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg, - SchemaAndName partial_view, - SchemaAndName materialization_table, - const NameData *time_column_name, - TimeRange materialization_range); - +typedef enum MaterializationPlanType +{ + PLAN_TYPE_INSERT, + PLAN_TYPE_DELETE, + PLAN_TYPE_EXISTS, + PLAN_TYPE_MERGE, + PLAN_TYPE_MERGE_DELETE, + _MAX_MATERIALIZATION_PLAN_TYPES +} MaterializationPlanType; + +typedef struct MaterializationContext +{ + Hypertable *mat_ht; + const ContinuousAgg *cagg; + SchemaAndName partial_view; + SchemaAndName materialization_table; + NameData *time_column_name; + TimeRange materialization_range; + char *chunk_condition; +} MaterializationContext; + +typedef char *(*MaterializationCreateStatement)(MaterializationContext *context); +typedef void (*MaterializationEmitError)(MaterializationContext *context); +typedef void (*MaterializationEmitProgress)(MaterializationContext *context, uint64 rows_processed); + +typedef struct MaterializationPlan +{ + SPIPlanPtr plan; + bool read_only; + MaterializationCreateStatement create_statement; + MaterializationEmitError emit_error; + MaterializationEmitProgress emit_progress; +} MaterializationPlan; + +static char *create_materialization_insert_statement(MaterializationContext *context); +static char *create_materialization_delete_statement(MaterializationContext *context); +static char *create_materialization_exists_statement(MaterializationContext *context); +static char *create_materialization_merge_statement(MaterializationContext *context); +static char *create_materialization_merge_delete_statement(MaterializationContext *context); + +static void emit_materialization_insert_error(MaterializationContext *context); +static void emit_materialization_delete_error(MaterializationContext *context); +static void emit_materialization_exists_error(MaterializationContext *context); +static void emit_materialization_merge_error(MaterializationContext *context); + +static void emit_materialization_insert_progress(MaterializationContext *context, + uint64 rows_processed); +static void emit_materialization_delete_progress(MaterializationContext *context, + uint64 rows_processed); +static void emit_materialization_merge_progress(MaterializationContext *context, + uint64 rows_processed); + +static MaterializationPlan materialization_plans[_MAX_MATERIALIZATION_PLAN_TYPES + 1] = { + [PLAN_TYPE_INSERT] = { .create_statement = create_materialization_insert_statement, + .emit_error = emit_materialization_insert_error, + .emit_progress = emit_materialization_insert_progress }, + [PLAN_TYPE_DELETE] = { .create_statement = create_materialization_delete_statement, + .emit_error = emit_materialization_delete_error, + .emit_progress = emit_materialization_delete_progress }, + [PLAN_TYPE_EXISTS] = { .read_only = true, + .create_statement = create_materialization_exists_statement, + .emit_error = emit_materialization_exists_error }, + [PLAN_TYPE_MERGE] = { .create_statement = create_materialization_merge_statement, + .emit_error = emit_materialization_merge_error, + .emit_progress = emit_materialization_merge_progress }, + [PLAN_TYPE_MERGE_DELETE] = { .create_statement = create_materialization_merge_delete_statement, + .emit_error = emit_materialization_delete_error, + .emit_progress = emit_materialization_delete_progress }, +}; + +static MaterializationPlan *create_materialization_plan(MaterializationContext *context, + MaterializationPlanType plan_type); +static uint64 execute_materialization_plan(MaterializationContext *context, + MaterializationPlanType plan_type); +static void free_materialization_plan(MaterializationContext *context, + MaterializationPlanType plan_type); +static void free_materialization_plans(MaterializationContext *context); + +static void update_watermark(MaterializationContext *context); +static void execute_materializations(MaterializationContext *context); + +/* API to update materializations from refresh code */ void continuous_agg_update_materialization(Hypertable *mat_ht, const ContinuousAgg *cagg, SchemaAndName partial_view, @@ -81,6 +139,25 @@ continuous_agg_update_materialization(Hypertable *mat_ht, const ContinuousAgg *c InternalTimeRange combined_materialization_range = new_materialization_range; bool materialize_invalidations_separately = range_length(invalidation_range) > 0; + MaterializationContext context = { + .mat_ht = mat_ht, + .cagg = cagg, + .partial_view = partial_view, + .materialization_table = materialization_table, + .time_column_name = (NameData *) time_column_name, + .materialization_range = internal_time_range_to_time_range(new_materialization_range), + /* + * chunk_id is valid if the materializaion update should be done only on the given chunk. + * This is used currently for refresh on chunk drop only. In other cases, manual + * call to refresh_continuous_aggregate or call from a refresh policy, chunk_id is + * not provided, i.e., invalid. Also the chunk_id is used only on the old format. + */ + .chunk_condition = + chunk_id != INVALID_CHUNK_ID && !ContinuousAggIsFinalized(cagg) ? + psprintf(" AND %s = %d", CONTINUOUS_AGG_CHUNK_ID_COL_NAME, chunk_id) : + "", + }; + /* Lock down search_path */ int save_nestlevel = NewGUCNestLevel(); RestrictSearchPath(); @@ -118,32 +195,18 @@ continuous_agg_update_materialization(Hypertable *mat_ht, const ContinuousAgg *c */ if (range_length(invalidation_range) == 0 || !materialize_invalidations_separately) { - spi_update_materializations(mat_ht, - cagg, - partial_view, - materialization_table, - time_column_name, - internal_time_range_to_time_range( - combined_materialization_range), - chunk_id); + context.materialization_range = + internal_time_range_to_time_range(combined_materialization_range); + execute_materializations(&context); } else { - spi_update_materializations(mat_ht, - cagg, - partial_view, - materialization_table, - time_column_name, - internal_time_range_to_time_range(invalidation_range), - chunk_id); - - spi_update_materializations(mat_ht, - cagg, - partial_view, - materialization_table, - time_column_name, - internal_time_range_to_time_range(new_materialization_range), - chunk_id); + context.materialization_range = internal_time_range_to_time_range(invalidation_range); + execute_materializations(&context); + + context.materialization_range = + internal_time_range_to_time_range(new_materialization_range); + execute_materializations(&context); } /* Restore search_path */ @@ -330,203 +393,91 @@ build_merge_update_clause(List *column_names) return ret->data; } -static void -spi_update_watermark(Hypertable *mat_ht, SchemaAndName materialization_table, - const NameData *time_column_name, TimeRange materialization_range, - const char *const chunk_condition) +/* Create INSERT statement */ +static char * +create_materialization_insert_statement(MaterializationContext *context) { - int res; - StringInfo command = makeStringInfo(); - Oid types[] = { materialization_range.type }; - Datum values[] = { materialization_range.start }; - char nulls[] = { false }; - - appendStringInfo(command, - "SELECT %s FROM %s.%s AS I " - "WHERE I.%s >= $1 %s " - "ORDER BY 1 DESC LIMIT 1;", - quote_identifier(NameStr(*time_column_name)), - quote_identifier(NameStr(*materialization_table.schema)), - quote_identifier(NameStr(*materialization_table.name)), - quote_identifier(NameStr(*time_column_name)), - chunk_condition); - - elog(DEBUG2, "%s: %s", __func__, command->data); - res = SPI_execute_with_args(command->data, - 1, - types, - values, - nulls, - false /* read_only */, - 0 /* count */); - - if (res < 0) - elog(ERROR, "could not get the last bucket of the materialized data"); - - Ensure(SPI_gettypeid(SPI_tuptable->tupdesc, 1) == materialization_range.type, - "partition types for result (%d) and dimension (%d) do not match", - SPI_gettypeid(SPI_tuptable->tupdesc, 1), - materialization_range.type); - - if (SPI_processed > 0) - { - bool isnull; - Datum maxdat = SPI_getbinval(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1, &isnull); - - if (!isnull) - { - int64 watermark = ts_time_value_to_internal(maxdat, materialization_range.type); - ts_cagg_watermark_update(mat_ht, watermark, isnull, false); - } - } + StringInfoData query; + initStringInfo(&query); + appendStringInfo(&query, + "INSERT INTO %s.%s SELECT * FROM %s.%s AS I " + "WHERE I.%s >= $1 AND I.%s < $2 %s;", + quote_identifier(NameStr(*context->materialization_table.schema)), + quote_identifier(NameStr(*context->materialization_table.name)), + quote_identifier(NameStr(*context->partial_view.schema)), + quote_identifier(NameStr(*context->partial_view.name)), + quote_identifier(NameStr(*context->time_column_name)), + quote_identifier(NameStr(*context->time_column_name)), + context->chunk_condition); + return query.data; } -static void -spi_update_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg, - SchemaAndName partial_view, SchemaAndName materialization_table, - const NameData *time_column_name, TimeRange invalidation_range, - const int32 chunk_id) -{ - StringInfo chunk_condition = makeStringInfo(); - uint64 rows_processed = 0; - - /* MERGE statement is available starting on PG15 and we'll support it only in the new format of - * CAggs and for non-compressed hypertables */ - if (ts_guc_enable_merge_on_cagg_refresh && PG_VERSION_NUM >= 150000 && - ContinuousAggIsFinalized(cagg) && !TS_HYPERTABLE_HAS_COMPRESSION_ENABLED(mat_ht)) - { - rows_processed = spi_merge_materializations(mat_ht, - cagg, - partial_view, - materialization_table, - time_column_name, - invalidation_range); - } - else - { - /* - * chunk_id is valid if the materializaion update should be done only on the given chunk. - * This is used currently for refresh on chunk drop only. In other cases, manual - * call to refresh_continuous_aggregate or call from a refresh policy, chunk_id is - * not provided, i.e., invalid. - */ - if (chunk_id != INVALID_CHUNK_ID) - appendStringInfo(chunk_condition, "AND chunk_id = %d", chunk_id); - - rows_processed += spi_delete_materializations(materialization_table, - time_column_name, - invalidation_range, - chunk_condition->data); - rows_processed += spi_insert_materializations(mat_ht, - cagg, - partial_view, - materialization_table, - time_column_name, - invalidation_range, - chunk_condition->data); - } - - /* Get the max(time_dimension) of the materialized data */ - if (rows_processed > 0) - { - spi_update_watermark(mat_ht, - materialization_table, - time_column_name, - invalidation_range, - chunk_condition->data); - } +/* Create DELETE statement */ +static char * +create_materialization_delete_statement(MaterializationContext *context) +{ + StringInfoData query; + initStringInfo(&query); + appendStringInfo(&query, + "DELETE FROM %s.%s AS D " + "WHERE D.%s >= $1 AND D.%s < $2 %s;", + quote_identifier(NameStr(*context->materialization_table.schema)), + quote_identifier(NameStr(*context->materialization_table.name)), + quote_identifier(NameStr(*context->time_column_name)), + quote_identifier(NameStr(*context->time_column_name)), + context->chunk_condition); + return query.data; } -static bool -spi_exists_materializations(SchemaAndName materialization_table, const NameData *time_column_name, - TimeRange materialization_range) +/* Create SELECT EXISTS statement */ +static char * +create_materialization_exists_statement(MaterializationContext *context) { - int res; - StringInfo command = makeStringInfo(); - Oid types[] = { materialization_range.type, materialization_range.type }; - Datum values[] = { materialization_range.start, materialization_range.end }; - char nulls[] = { false, false }; - - appendStringInfo(command, + StringInfoData query; + initStringInfo(&query); + appendStringInfo(&query, "SELECT 1 FROM %s.%s AS M " "WHERE M.%s >= $1 AND M.%s < $2 " "LIMIT 1;", - quote_identifier(NameStr(*materialization_table.schema)), - quote_identifier(NameStr(*materialization_table.name)), - quote_identifier(NameStr(*time_column_name)), - quote_identifier(NameStr(*time_column_name))); - - elog(DEBUG2, "%s", command->data); - res = SPI_execute_with_args(command->data, - 2, - types, - values, - nulls, - true /* read_only */, - 0 /* count */); - - if (res < 0) - elog(ERROR, - "could not check the materialization table \"%s.%s\"", - NameStr(*materialization_table.schema), - NameStr(*materialization_table.name)); - - return (SPI_processed > 0); + quote_identifier(NameStr(*context->materialization_table.schema)), + quote_identifier(NameStr(*context->materialization_table.name)), + quote_identifier(NameStr(*context->time_column_name)), + quote_identifier(NameStr(*context->time_column_name))); + return query.data; } -static uint64 -spi_merge_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg, - SchemaAndName partial_view, SchemaAndName materialization_table, - const NameData *time_column_name, TimeRange materialization_range) +/* Create MERGE statement */ +static char * +create_materialization_merge_statement(MaterializationContext *context) { - int res; - StringInfo command = makeStringInfo(); - Oid types[] = { materialization_range.type, materialization_range.type }; - Datum values[] = { materialization_range.start, materialization_range.end }; - char nulls[] = { false, false }; - - /* Fallback to INSERT materializations if there's no rows to change on it */ - if (!spi_exists_materializations(materialization_table, - time_column_name, - materialization_range)) - { - elog(DEBUG2, - "no rows to update on materialization table \"%s.%s\", falling back to INSERT", - NameStr(*materialization_table.schema), - NameStr(*materialization_table.name)); - return spi_insert_materializations(mat_ht, - cagg, - partial_view, - materialization_table, - time_column_name, - materialization_range, - "" /* empty chunk condition for Finalized CAggs */); - } - - List *grp_colnames = cagg_find_groupingcols((ContinuousAgg *) cagg, mat_ht); - List *agg_colnames = cagg_find_aggref_and_var_cols((ContinuousAgg *) cagg, mat_ht); + List *grp_colnames = cagg_find_groupingcols((ContinuousAgg *) context->cagg, context->mat_ht); + List *agg_colnames = + cagg_find_aggref_and_var_cols((ContinuousAgg *) context->cagg, context->mat_ht); List *all_columns = NIL; - uint64 rows_processed = 0; /* Concat both lists into a single one*/ all_columns = list_concat(all_columns, grp_colnames); all_columns = list_concat(all_columns, agg_colnames); - StringInfo merge_update = makeStringInfo(); + StringInfoData merge_update; + initStringInfo(&merge_update); char *merge_update_clause = build_merge_update_clause(all_columns); - /* It make no sense but is possible to create a cagg only with time bucket (without aggregate - * functions) */ + /* It make no sense but is possible to create a cagg only with time bucket (without + * aggregate functions) */ if (merge_update_clause != NULL) { - appendStringInfo(merge_update, + appendStringInfo(&merge_update, " WHEN MATCHED AND ROW(M.*) IS DISTINCT FROM ROW(P.*) THEN " " UPDATE SET %s ", merge_update_clause); } + StringInfoData query; + initStringInfo(&query); + /* MERGE statement to UPDATE affected buckets and INSERT new ones */ - appendStringInfo(command, + appendStringInfo(&query, "WITH partial AS ( " " SELECT * " " FROM %s.%s " @@ -539,57 +490,42 @@ spi_merge_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg, " INSERT (%s) VALUES (%s) ", /* partial VIEW */ - quote_identifier(NameStr(*partial_view.schema)), - quote_identifier(NameStr(*partial_view.name)), + quote_identifier(NameStr(*context->partial_view.schema)), + quote_identifier(NameStr(*context->partial_view.name)), /* partial WHERE */ - quote_identifier(NameStr(*time_column_name)), - quote_identifier(NameStr(*time_column_name)), + quote_identifier(NameStr(*context->time_column_name)), + quote_identifier(NameStr(*context->time_column_name)), /* materialization hypertable */ - quote_identifier(NameStr(*materialization_table.schema)), - quote_identifier(NameStr(*materialization_table.name)), + quote_identifier(NameStr(*context->materialization_table.schema)), + quote_identifier(NameStr(*context->materialization_table.name)), /* MERGE JOIN condition */ build_merge_join_clause(grp_colnames), /* extra MERGE JOIN condition with primary dimension */ - quote_identifier(NameStr(*time_column_name)), - quote_identifier(NameStr(*time_column_name)), + quote_identifier(NameStr(*context->time_column_name)), + quote_identifier(NameStr(*context->time_column_name)), /* UPDATE */ - merge_update->data, + merge_update.data, /* INSERT */ build_merge_insert_columns(all_columns, ", ", NULL), build_merge_insert_columns(all_columns, ", ", "P.")); + return query.data; +} - elog(DEBUG2, "%s: %s", __func__, command->data); - res = SPI_execute_with_args(command->data, - 2, - types, - values, - nulls, - false /* read_only */, - 0 /* count */); - - if (res < 0) - elog(ERROR, - "could not materialize values into the materialization table \"%s.%s\"", - NameStr(*materialization_table.schema), - NameStr(*materialization_table.name)); - else - elog(LOG, - "merged " UINT64_FORMAT " row(s) into materialization table \"%s.%s\"", - SPI_processed, - NameStr(*materialization_table.schema), - NameStr(*materialization_table.name)); - - rows_processed += SPI_processed; +/* Create DELETE after MERGE query statement */ +static char * +create_materialization_merge_delete_statement(MaterializationContext *context) +{ + StringInfoData query; + initStringInfo(&query); + List *grp_colnames = cagg_find_groupingcols((ContinuousAgg *) context->cagg, context->mat_ht); - /* DELETE rows from the materialization hypertable when necessary */ - resetStringInfo(command); - appendStringInfo(command, + appendStringInfo(&query, "DELETE " "FROM %s.%s M " "WHERE M.%s >= $1 AND M.%s < $2 " @@ -598,119 +534,181 @@ spi_merge_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg, " WHERE %s AND P.%s >= $1 AND P.%s < $2) ", /* materialization hypertable */ - quote_identifier(NameStr(*materialization_table.schema)), - quote_identifier(NameStr(*materialization_table.name)), + quote_identifier(NameStr(*context->materialization_table.schema)), + quote_identifier(NameStr(*context->materialization_table.name)), /* materialization hypertable WHERE */ - quote_identifier(NameStr(*time_column_name)), - quote_identifier(NameStr(*time_column_name)), + quote_identifier(NameStr(*context->time_column_name)), + quote_identifier(NameStr(*context->time_column_name)), /* partial VIEW */ - quote_identifier(NameStr(*partial_view.schema)), - quote_identifier(NameStr(*partial_view.name)), + quote_identifier(NameStr(*context->partial_view.schema)), + quote_identifier(NameStr(*context->partial_view.name)), /* MERGE JOIN condition */ build_merge_join_clause(grp_colnames), /* partial WHERE */ - quote_identifier(NameStr(*time_column_name)), - quote_identifier(NameStr(*time_column_name))); + quote_identifier(NameStr(*context->time_column_name)), + quote_identifier(NameStr(*context->time_column_name))); + return query.data; +} - elog(DEBUG2, "%s: %s", __func__, command->data); - res = SPI_execute_with_args(command->data, - 2, - types, - values, - nulls, - false /* read_only */, - 0 /* count */); +static void +emit_materialization_insert_error(MaterializationContext *context) +{ + elog(ERROR, + "could not insert old values into materialization table \"%s.%s\"", + NameStr(*context->materialization_table.schema), + NameStr(*context->materialization_table.name)); +} - if (res < 0) - elog(ERROR, - "could not delete values from the materialization table \"%s.%s\"", - NameStr(*materialization_table.schema), - NameStr(*materialization_table.name)); - else - elog(LOG, - "deleted " UINT64_FORMAT " row(s) from materialization table \"%s.%s\"", - SPI_processed, - NameStr(*materialization_table.schema), - NameStr(*materialization_table.name)); +static void +emit_materialization_delete_error(MaterializationContext *context) +{ + elog(ERROR, + "could not delete old values from materialization table \"%s.%s\"", + NameStr(*context->materialization_table.schema), + NameStr(*context->materialization_table.name)); +} + +static void +emit_materialization_exists_error(MaterializationContext *context) +{ + elog(ERROR, + "could not check the materialization table \"%s.%s\"", + NameStr(*context->materialization_table.schema), + NameStr(*context->materialization_table.name)); +} + +static void +emit_materialization_merge_error(MaterializationContext *context) +{ + elog(ERROR, + "could not merge old values into materialization table \"%s.%s\"", + NameStr(*context->materialization_table.schema), + NameStr(*context->materialization_table.name)); +} + +static void +emit_materialization_insert_progress(MaterializationContext *context, uint64 rows_processed) +{ + elog(LOG, + "inserted " UINT64_FORMAT " row(s) into materialization table \"%s.%s\"", + rows_processed, + NameStr(*context->materialization_table.schema), + NameStr(*context->materialization_table.name)); +} + +static void +emit_materialization_delete_progress(MaterializationContext *context, uint64 rows_processed) +{ + elog(LOG, + "deleted " UINT64_FORMAT " row(s) from materialization table \"%s.%s\"", + rows_processed, + NameStr(*context->materialization_table.schema), + NameStr(*context->materialization_table.name)); +} + +static void +emit_materialization_merge_progress(MaterializationContext *context, uint64 rows_processed) +{ + elog(LOG, + "merged " UINT64_FORMAT " row(s) into materialization table \"%s.%s\"", + rows_processed, + NameStr(*context->materialization_table.schema), + NameStr(*context->materialization_table.name)); +} + +static MaterializationPlan * +create_materialization_plan(MaterializationContext *context, MaterializationPlanType plan_type) +{ + Assert(plan_type >= PLAN_TYPE_INSERT); + Assert(plan_type < _MAX_MATERIALIZATION_PLAN_TYPES); - rows_processed += SPI_processed; + MaterializationPlan *materialization = &materialization_plans[plan_type]; - return rows_processed; + if (materialization->plan == NULL) + { + char *query = materialization->create_statement(context); + Oid types[] = { context->materialization_range.type, context->materialization_range.type }; + + materialization->plan = SPI_prepare(query, 2, types); + if (materialization->plan == NULL) + elog(ERROR, "%s: SPI_prepare failed: %s", __func__, query); + + SPI_keepplan(materialization->plan); + pfree(query); + } + + return materialization; } static uint64 -spi_delete_materializations(SchemaAndName materialization_table, const NameData *time_column_name, - TimeRange materialization_range, const char *const chunk_condition) +execute_materialization_plan(MaterializationContext *context, MaterializationPlanType plan_type) { - int res; - StringInfo command = makeStringInfo(); - Oid types[] = { materialization_range.type, materialization_range.type }; - Datum values[] = { materialization_range.start, materialization_range.end }; + MaterializationPlan *materialization = create_materialization_plan(context, plan_type); + Datum values[] = { context->materialization_range.start, context->materialization_range.end }; char nulls[] = { false, false }; - appendStringInfo(command, - "DELETE FROM %s.%s AS D WHERE " - "D.%s >= $1 AND D.%s < $2 %s;", - quote_identifier(NameStr(*materialization_table.schema)), - quote_identifier(NameStr(*materialization_table.name)), - quote_identifier(NameStr(*time_column_name)), - quote_identifier(NameStr(*time_column_name)), - chunk_condition); - - elog(DEBUG2, "%s: %s", __func__, command->data); - res = SPI_execute_with_args(command->data, - 2, - types, - values, - nulls, - false /* read_only */, - 0 /* count */); + int res = SPI_execute_plan(materialization->plan, values, nulls, materialization->read_only, 0); - if (res < 0) - elog(ERROR, - "could not delete old values from materialization table \"%s.%s\"", - NameStr(*materialization_table.schema), - NameStr(*materialization_table.name)); + if (res < 0 && materialization->emit_error != NULL) + { + materialization->emit_error(context); + } else - elog(LOG, - "deleted " UINT64_FORMAT " row(s) from materialization table \"%s.%s\"", - SPI_processed, - NameStr(*materialization_table.schema), - NameStr(*materialization_table.name)); + { + if (materialization->emit_progress != NULL) + materialization->emit_progress(context, SPI_processed); + } return SPI_processed; } -static uint64 -spi_insert_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg, - SchemaAndName partial_view, SchemaAndName materialization_table, - const NameData *time_column_name, TimeRange materialization_range, - const char *const chunk_condition) +static void +free_materialization_plan(MaterializationContext *context, MaterializationPlanType plan_type) +{ + MaterializationPlan *materialization = &materialization_plans[plan_type]; + + if (materialization->plan != NULL) + { + SPI_freeplan(materialization->plan); + materialization->plan = NULL; + } +} + +static void +free_materialization_plans(MaterializationContext *context) +{ + for (int plan_type = PLAN_TYPE_INSERT; plan_type < _MAX_MATERIALIZATION_PLAN_TYPES; plan_type++) + { + free_materialization_plan(context, plan_type); + } +} + +static void +update_watermark(MaterializationContext *context) { int res; StringInfo command = makeStringInfo(); - Oid types[] = { materialization_range.type, materialization_range.type }; - Datum values[] = { materialization_range.start, materialization_range.end }; - char nulls[] = { false, false }; + Oid types[] = { context->materialization_range.type }; + Datum values[] = { context->materialization_range.start }; + char nulls[] = { false }; appendStringInfo(command, - "INSERT INTO %s.%s SELECT * FROM %s.%s AS I " - "WHERE I.%s >= $1 AND I.%s < $2 %s;", - quote_identifier(NameStr(*materialization_table.schema)), - quote_identifier(NameStr(*materialization_table.name)), - quote_identifier(NameStr(*partial_view.schema)), - quote_identifier(NameStr(*partial_view.name)), - quote_identifier(NameStr(*time_column_name)), - quote_identifier(NameStr(*time_column_name)), - chunk_condition); + "SELECT %s FROM %s.%s AS I " + "WHERE I.%s >= $1 %s " + "ORDER BY 1 DESC LIMIT 1;", + quote_identifier(NameStr(*context->time_column_name)), + quote_identifier(NameStr(*context->materialization_table.schema)), + quote_identifier(NameStr(*context->materialization_table.name)), + quote_identifier(NameStr(*context->time_column_name)), + context->chunk_condition); elog(DEBUG2, "%s: %s", __func__, command->data); res = SPI_execute_with_args(command->data, - 2, + 1, types, values, nulls, @@ -718,16 +716,71 @@ spi_insert_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg, 0 /* count */); if (res < 0) - elog(ERROR, - "could not materialize values into the materialization table \"%s.%s\"", - NameStr(*materialization_table.schema), - NameStr(*materialization_table.name)); - else - elog(LOG, - "inserted " UINT64_FORMAT " row(s) into materialization table \"%s.%s\"", - SPI_processed, - NameStr(*materialization_table.schema), - NameStr(*materialization_table.name)); + elog(ERROR, "%s: could not get the last bucket of the materialized data", __func__); - return SPI_processed; + Ensure(SPI_gettypeid(SPI_tuptable->tupdesc, 1) == context->materialization_range.type, + "partition types for result (%d) and dimension (%d) do not match", + SPI_gettypeid(SPI_tuptable->tupdesc, 1), + context->materialization_range.type); + + if (SPI_processed > 0) + { + bool isnull; + Datum maxdat = SPI_getbinval(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1, &isnull); + + if (!isnull) + { + int64 watermark = + ts_time_value_to_internal(maxdat, context->materialization_range.type); + ts_cagg_watermark_update(context->mat_ht, watermark, isnull, false); + } + } +} + +static void +execute_materializations(MaterializationContext *context) +{ + volatile uint64 rows_processed = 0; + + PG_TRY(); + { + /* MERGE statement is available starting on PG15 and we'll support it only in the new format + * of CAggs and for non-compressed hypertables */ + if (ts_guc_enable_merge_on_cagg_refresh && PG_VERSION_NUM >= 150000 && + ContinuousAggIsFinalized(context->cagg) && + !TS_HYPERTABLE_HAS_COMPRESSION_ENABLED(context->mat_ht)) + { + /* Fallback to INSERT materializations if there are no rows to change on it */ + if (execute_materialization_plan(context, PLAN_TYPE_EXISTS) == 0) + { + elog(DEBUG2, + "no rows to merge on materialization table \"%s.%s\", falling back to INSERT", + NameStr(*context->materialization_table.schema), + NameStr(*context->materialization_table.name)); + rows_processed = execute_materialization_plan(context, PLAN_TYPE_INSERT); + } + else + { + rows_processed += execute_materialization_plan(context, PLAN_TYPE_MERGE); + rows_processed += execute_materialization_plan(context, PLAN_TYPE_MERGE_DELETE); + } + } + else + { + rows_processed += execute_materialization_plan(context, PLAN_TYPE_DELETE); + rows_processed += execute_materialization_plan(context, PLAN_TYPE_INSERT); + } + } + PG_FINALLY(); + { + /* Make sure all cached plans in the session be released before rethrowing the error */ + free_materialization_plans(context); + } + PG_END_TRY(); + + /* Get the max(time_dimension) of the materialized data */ + if (rows_processed > 0) + { + update_watermark(context); + } }