diff --git a/tsl/src/continuous_aggs/materialize.c b/tsl/src/continuous_aggs/materialize.c index 22dc6150b0a..43e9fa54bb7 100644 --- a/tsl/src/continuous_aggs/materialize.c +++ b/tsl/src/continuous_aggs/materialize.c @@ -65,6 +65,7 @@ typedef enum MaterializationPlanType typedef struct MaterializationPlan { const char *query; + const char *strings[3]; SPIPlanPtr plan; bool read_only; } MaterializationPlan; @@ -72,10 +73,16 @@ typedef struct MaterializationPlan static MaterializationPlan _materialization_plans[_MAX_MATERIALIZATION_PLAN_TYPES + 1] = { [PLAN_TYPE_INSERT] = { .query = "INSERT INTO %s.%s SELECT * FROM %s.%s AS I " "WHERE I.%s >= $1 AND I.%s < $2 %s;", + .strings[0] = "insert", + .strings[1] = "inserted", + .strings[2] = "into", .plan = NULL, .read_only = false }, [PLAN_TYPE_DELETE] = { .query = "DELETE FROM %s.%s AS D " "WHERE D.%s >= $1 AND D.%s < $2 %s;", + .strings[0] = "delete", + .strings[1] = "deleted", + .strings[2] = "from", .plan = NULL, .read_only = false }, [PLAN_TYPE_EXISTS] = { .query = "SELECT 1 FROM %s.%s AS M " @@ -93,6 +100,9 @@ static MaterializationPlan _materialization_plans[_MAX_MATERIALIZATION_PLAN_TYPE " %s " /* UPDATE */ " WHEN NOT MATCHED THEN " " INSERT (%s) VALUES (%s) ", + .strings[0] = "merge", + .strings[1] = "merged", + .strings[2] = "into", .plan = NULL, .read_only = false }, [PLAN_TYPE_MERGE_DELETE] = { .query = "DELETE " @@ -101,6 +111,9 @@ static MaterializationPlan _materialization_plans[_MAX_MATERIALIZATION_PLAN_TYPE "AND NOT EXISTS (" " SELECT FROM %s.%s P " " WHERE %s AND P.%s >= $1 AND P.%s < $2) ", + .strings[0] = "delete", + .strings[1] = "deleted", + .strings[2] = "from", .plan = NULL, .read_only = false }, }; @@ -129,11 +142,10 @@ static void free_materialization_plan(MaterializationPlanType plan_type); static void free_materialization_plans(); static void update_watermark(MaterializationContext *context); -static void update_materializations(MaterializationContext *context); -static uint64 delete_materializations(MaterializationContext *context); -static uint64 insert_materializations(MaterializationContext *context); static bool exists_materializations(MaterializationContext *context); -static uint64 merge_materializations(MaterializationContext *context); +static void execute_materializations(MaterializationContext *context); +static uint64 execute_materialization(MaterializationContext *context, + MaterializationPlanType plan_type); /* API to update materializations from refresh code */ void @@ -205,16 +217,16 @@ continuous_agg_update_materialization(Hypertable *mat_ht, const ContinuousAgg *c { context.materialization_range = internal_time_range_to_time_range(combined_materialization_range); - update_materializations(&context); + execute_materializations(&context); } else { context.materialization_range = internal_time_range_to_time_range(invalidation_range); - update_materializations(&context); + execute_materializations(&context); context.materialization_range = internal_time_range_to_time_range(new_materialization_range); - update_materializations(&context); + execute_materializations(&context); } /* Restore search_path */ @@ -671,43 +683,6 @@ update_watermark(MaterializationContext *context) } } -static void -update_materializations(MaterializationContext *context) -{ - uint64 rows_processed = 0; - - PG_TRY(); - { - /* MERGE statement is available starting on PG15 and we'll support it only in the new format - * of CAggs and for non-compressed hypertables */ - if (ts_guc_enable_merge_on_cagg_refresh && PG_VERSION_NUM >= 150000 && - ContinuousAggIsFinalized(context->cagg) && - !TS_HYPERTABLE_HAS_COMPRESSION_ENABLED(context->mat_ht)) - { - create_materialization_plans(context, OPERATION_MERGE_DELETE); - rows_processed = merge_materializations(context); - } - else - { - create_materialization_plans(context, OPERATION_INSERT_DELETE); - rows_processed += delete_materializations(context); - rows_processed += insert_materializations(context); - } - } - PG_FINALLY(); - { - /* Make sure all cached plans in the session be released before rethrowing the error */ - free_materialization_plans(); - } - PG_END_TRY(); - - /* Get the max(time_dimension) of the materialized data */ - if (rows_processed > 0) - { - update_watermark(context); - } -} - static bool exists_materializations(MaterializationContext *context) { @@ -726,100 +701,80 @@ exists_materializations(MaterializationContext *context) } static uint64 -merge_materializations(MaterializationContext *context) -{ - /* Fallback to INSERT materializations if there's no rows to change on it */ - if (!exists_materializations(context)) - { - elog(DEBUG2, - "no rows to update on materialization table \"%s.%s\", falling back to INSERT", - NameStr(*context->materialization_table.schema), - NameStr(*context->materialization_table.name)); - return insert_materializations(context); - } - - int res; - uint64 rows_processed = 0, all_processed = 0; - - res = execute_materialization_plan(context, PLAN_TYPE_MERGE, &rows_processed); - - if (res < 0) - elog(ERROR, - "could not materialize values into the materialization table \"%s.%s\"", - NameStr(*context->materialization_table.schema), - NameStr(*context->materialization_table.name)); - else - elog(LOG, - "merged " UINT64_FORMAT " row(s) into materialization table \"%s.%s\"", - SPI_processed, - NameStr(*context->materialization_table.schema), - NameStr(*context->materialization_table.name)); - - all_processed += rows_processed; - - /* DELETE buckets from the target materialization hypertable when not exists in the source - * hypertable or continuous aggregate (in case of hierarchical) */ - res = execute_materialization_plan(context, PLAN_TYPE_MERGE_DELETE, &rows_processed); - - if (res < 0) - elog(ERROR, - "could not delete values from the materialization table \"%s.%s\"", - NameStr(*context->materialization_table.schema), - NameStr(*context->materialization_table.name)); - else - elog(LOG, - "deleted " UINT64_FORMAT " row(s) from materialization table \"%s.%s\"", - SPI_processed, - NameStr(*context->materialization_table.schema), - NameStr(*context->materialization_table.name)); - - all_processed += rows_processed; - - return all_processed; -} - -static uint64 -delete_materializations(MaterializationContext *context) +execute_materialization(MaterializationContext *context, MaterializationPlanType plan_type) { int res; uint64 rows_processed; + MaterializationPlan *materialization = get_materialization_plan(plan_type); - res = execute_materialization_plan(context, PLAN_TYPE_DELETE, &rows_processed); + res = execute_materialization_plan(context, plan_type, &rows_processed); if (res < 0) elog(ERROR, - "could not delete old values from materialization table \"%s.%s\"", + "could not %s old values %s materialization table \"%s.%s\"", + materialization->strings[0], + materialization->strings[2], NameStr(*context->materialization_table.schema), NameStr(*context->materialization_table.name)); else elog(LOG, - "deleted " UINT64_FORMAT " row(s) from materialization table \"%s.%s\"", - SPI_processed, + "%s " UINT64_FORMAT " row(s) %s materialization table \"%s.%s\"", + materialization->strings[1], + rows_processed, + materialization->strings[2], NameStr(*context->materialization_table.schema), NameStr(*context->materialization_table.name)); return rows_processed; } -static uint64 -insert_materializations(MaterializationContext *context) +static void +execute_materializations(MaterializationContext *context) { - int res; - uint64 rows_processed; + uint64 rows_processed = 0; - res = execute_materialization_plan(context, PLAN_TYPE_INSERT, &rows_processed); + PG_TRY(); + { + /* MERGE statement is available starting on PG15 and we'll support it only in the new format + * of CAggs and for non-compressed hypertables */ + if (ts_guc_enable_merge_on_cagg_refresh && PG_VERSION_NUM >= 150000 && + ContinuousAggIsFinalized(context->cagg) && + !TS_HYPERTABLE_HAS_COMPRESSION_ENABLED(context->mat_ht)) + { + create_materialization_plans(context, OPERATION_MERGE_DELETE); - if (res < 0) - elog(ERROR, - "could not materialize values into the materialization table \"%s.%s\"", - NameStr(*context->materialization_table.schema), - NameStr(*context->materialization_table.name)); - else - elog(LOG, - "inserted " UINT64_FORMAT " row(s) into materialization table \"%s.%s\"", - SPI_processed, - NameStr(*context->materialization_table.schema), - NameStr(*context->materialization_table.name)); + /* Fallback to INSERT materializations if there's no rows to change on it */ + if (!exists_materializations(context)) + { + elog(DEBUG2, + "no rows to merge on materialization table \"%s.%s\", falling back to INSERT", + NameStr(*context->materialization_table.schema), + NameStr(*context->materialization_table.name)); + rows_processed = execute_materialization(context, PLAN_TYPE_INSERT); + } + else + { + rows_processed += execute_materialization(context, PLAN_TYPE_MERGE); + rows_processed += execute_materialization(context, PLAN_TYPE_MERGE_DELETE); + } + } + else + { + create_materialization_plans(context, OPERATION_INSERT_DELETE); + rows_processed += execute_materialization(context, PLAN_TYPE_DELETE); + rows_processed += execute_materialization(context, PLAN_TYPE_INSERT); + } + } + PG_FINALLY(); + { + /* Make sure all cached plans in the session be released before rethrowing the error */ + free_materialization_plans(); + } + PG_END_TRY(); - return rows_processed; + /* Get the max(time_dimension) of the materialized data */ + if (rows_processed > 0) + { + update_watermark(context); + } }