From 719a9493320e27061b8313896988da57db9b7ec2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabr=C3=ADzio=20de=20Royes=20Mello?= Date: Mon, 2 Dec 2024 13:57:46 -0300 Subject: [PATCH] Version 9 --- tsl/src/continuous_aggs/materialize.c | 86 ++++++++++++++++++--------- 1 file changed, 59 insertions(+), 27 deletions(-) diff --git a/tsl/src/continuous_aggs/materialize.c b/tsl/src/continuous_aggs/materialize.c index 01d2256864c..22dc6150b0a 100644 --- a/tsl/src/continuous_aggs/materialize.c +++ b/tsl/src/continuous_aggs/materialize.c @@ -26,7 +26,6 @@ #include "ts_catalog/continuous_agg.h" #include "ts_catalog/continuous_aggs_watermark.h" -#define CHUNKIDFROMRELID "chunk_id_from_relid" #define CONTINUOUS_AGG_CHUNK_ID_COL_NAME "chunk_id" /********************* @@ -47,6 +46,11 @@ static char *build_merge_update_clause(List *column_names); /*************************** * materialization support * ***************************/ +typedef enum MaterializationOperation +{ + OPERATION_INSERT_DELETE, + OPERATION_MERGE_DELETE +} MaterializationOperation; typedef enum MaterializationPlanType { @@ -115,12 +119,14 @@ typedef struct MaterializationContext static MaterializationPlan *get_materialization_plan(MaterializationPlanType plan_type); static char *create_materialization_plan_query(MaterializationContext *context, MaterializationPlanType plan_type); -static MaterializationPlan *create_materialization_plan(MaterializationContext *context, - MaterializationPlanType plan_type); +static void create_materialization_plan(MaterializationContext *context, + MaterializationPlanType plan_type); +static void create_materialization_plans(MaterializationContext *context, + MaterializationOperation type); static int execute_materialization_plan(MaterializationContext *context, MaterializationPlanType plan_type, uint64 *rows_processed); static void free_materialization_plan(MaterializationPlanType plan_type); -static void free_all_materialization_plans(); +static void free_materialization_plans(); static void update_watermark(MaterializationContext *context); static void update_materializations(MaterializationContext *context); @@ -321,6 +327,7 @@ create_materialization_plan_query(MaterializationContext *context, build_merge_insert_columns(all_columns, ", ", "P.")); break; } + /* Create DELETE after MERGE query statement */ case PLAN_TYPE_MERGE_DELETE: { @@ -359,30 +366,50 @@ create_materialization_plan_query(MaterializationContext *context, return query->data; } -static MaterializationPlan * +static void create_materialization_plan(MaterializationContext *context, MaterializationPlanType plan_type) { MaterializationPlan *materialization = get_materialization_plan(plan_type); - char *query = create_materialization_plan_query(context, plan_type); - Oid types[] = { context->materialization_range.type, context->materialization_range.type }; if (materialization->plan == NULL) { + char *query = create_materialization_plan_query(context, plan_type); + Oid types[] = { context->materialization_range.type, context->materialization_range.type }; + materialization->plan = SPI_prepare(query, 2, types); if (materialization->plan == NULL) elog(ERROR, "%s: SPI_prepare failed: %s", __func__, query); SPI_keepplan(materialization->plan); } +} - return materialization; +static void +create_materialization_plans(MaterializationContext *context, MaterializationOperation type) +{ + switch (type) + { + case OPERATION_INSERT_DELETE: + create_materialization_plan(context, PLAN_TYPE_INSERT); + create_materialization_plan(context, PLAN_TYPE_DELETE); + break; + case OPERATION_MERGE_DELETE: + create_materialization_plan(context, PLAN_TYPE_INSERT); + create_materialization_plan(context, PLAN_TYPE_EXISTS); + create_materialization_plan(context, PLAN_TYPE_MERGE); + create_materialization_plan(context, PLAN_TYPE_MERGE_DELETE); + break; + default: + pg_unreachable(); + break; + } } static int execute_materialization_plan(MaterializationContext *context, MaterializationPlanType plan_type, uint64 *rows_processed) { - MaterializationPlan *materialization = create_materialization_plan(context, plan_type); + MaterializationPlan *materialization = get_materialization_plan(plan_type); Datum values[] = { context->materialization_range.start, context->materialization_range.end }; char nulls[] = { false, false }; @@ -390,8 +417,6 @@ execute_materialization_plan(MaterializationContext *context, MaterializationPla *rows_processed = SPI_processed; - free_materialization_plan(plan_type); - return res; } @@ -408,11 +433,11 @@ free_materialization_plan(MaterializationPlanType plan_type) } static void -free_all_materialization_plans() +free_materialization_plans() { - for (int i = 0; i < _MAX_MATERIALIZATION_PLAN_TYPES; i++) + for (int plan_type = 0; plan_type < _MAX_MATERIALIZATION_PLAN_TYPES; plan_type++) { - free_materialization_plan(i); + free_materialization_plan(plan_type); } } @@ -651,23 +676,30 @@ update_materializations(MaterializationContext *context) { uint64 rows_processed = 0; - /* Make sure all cached plans in the session be released before starting the materialization - * process */ - free_all_materialization_plans(); - - /* MERGE statement is available starting on PG15 and we'll support it only in the new format of - * CAggs and for non-compressed hypertables */ - if (ts_guc_enable_merge_on_cagg_refresh && PG_VERSION_NUM >= 150000 && - ContinuousAggIsFinalized(context->cagg) && - !TS_HYPERTABLE_HAS_COMPRESSION_ENABLED(context->mat_ht)) + PG_TRY(); { - rows_processed = merge_materializations(context); + /* MERGE statement is available starting on PG15 and we'll support it only in the new format + * of CAggs and for non-compressed hypertables */ + if (ts_guc_enable_merge_on_cagg_refresh && PG_VERSION_NUM >= 150000 && + ContinuousAggIsFinalized(context->cagg) && + !TS_HYPERTABLE_HAS_COMPRESSION_ENABLED(context->mat_ht)) + { + create_materialization_plans(context, OPERATION_MERGE_DELETE); + rows_processed = merge_materializations(context); + } + else + { + create_materialization_plans(context, OPERATION_INSERT_DELETE); + rows_processed += delete_materializations(context); + rows_processed += insert_materializations(context); + } } - else + PG_FINALLY(); { - rows_processed += delete_materializations(context); - rows_processed += insert_materializations(context); + /* Make sure all cached plans in the session be released before rethrowing the error */ + free_materialization_plans(); } + PG_END_TRY(); /* Get the max(time_dimension) of the materialized data */ if (rows_processed > 0)