Skip to content

Commit

Permalink
Version 9
Browse files Browse the repository at this point in the history
  • Loading branch information
fabriziomello committed Dec 3, 2024
1 parent 3bc9640 commit 719a949
Showing 1 changed file with 59 additions and 27 deletions.
86 changes: 59 additions & 27 deletions tsl/src/continuous_aggs/materialize.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
#include "ts_catalog/continuous_agg.h"
#include "ts_catalog/continuous_aggs_watermark.h"

#define CHUNKIDFROMRELID "chunk_id_from_relid"
#define CONTINUOUS_AGG_CHUNK_ID_COL_NAME "chunk_id"

/*********************
Expand All @@ -47,6 +46,11 @@ static char *build_merge_update_clause(List *column_names);
/***************************
* materialization support *
***************************/
typedef enum MaterializationOperation
{
OPERATION_INSERT_DELETE,
OPERATION_MERGE_DELETE
} MaterializationOperation;

typedef enum MaterializationPlanType
{
Expand Down Expand Up @@ -115,12 +119,14 @@ typedef struct MaterializationContext
static MaterializationPlan *get_materialization_plan(MaterializationPlanType plan_type);
static char *create_materialization_plan_query(MaterializationContext *context,
MaterializationPlanType plan_type);
static MaterializationPlan *create_materialization_plan(MaterializationContext *context,
MaterializationPlanType plan_type);
static void create_materialization_plan(MaterializationContext *context,
MaterializationPlanType plan_type);
static void create_materialization_plans(MaterializationContext *context,
MaterializationOperation type);
static int execute_materialization_plan(MaterializationContext *context,
MaterializationPlanType plan_type, uint64 *rows_processed);
static void free_materialization_plan(MaterializationPlanType plan_type);
static void free_all_materialization_plans();
static void free_materialization_plans();

static void update_watermark(MaterializationContext *context);
static void update_materializations(MaterializationContext *context);
Expand Down Expand Up @@ -321,6 +327,7 @@ create_materialization_plan_query(MaterializationContext *context,
build_merge_insert_columns(all_columns, ", ", "P."));
break;
}

/* Create DELETE after MERGE query statement */
case PLAN_TYPE_MERGE_DELETE:
{
Expand Down Expand Up @@ -359,39 +366,57 @@ create_materialization_plan_query(MaterializationContext *context,
return query->data;
}

static MaterializationPlan *
static void
create_materialization_plan(MaterializationContext *context, MaterializationPlanType plan_type)
{
MaterializationPlan *materialization = get_materialization_plan(plan_type);
char *query = create_materialization_plan_query(context, plan_type);
Oid types[] = { context->materialization_range.type, context->materialization_range.type };

if (materialization->plan == NULL)
{
char *query = create_materialization_plan_query(context, plan_type);
Oid types[] = { context->materialization_range.type, context->materialization_range.type };

materialization->plan = SPI_prepare(query, 2, types);
if (materialization->plan == NULL)
elog(ERROR, "%s: SPI_prepare failed: %s", __func__, query);

SPI_keepplan(materialization->plan);
}
}

return materialization;
static void
create_materialization_plans(MaterializationContext *context, MaterializationOperation type)
{
switch (type)
{
case OPERATION_INSERT_DELETE:
create_materialization_plan(context, PLAN_TYPE_INSERT);
create_materialization_plan(context, PLAN_TYPE_DELETE);
break;
case OPERATION_MERGE_DELETE:
create_materialization_plan(context, PLAN_TYPE_INSERT);
create_materialization_plan(context, PLAN_TYPE_EXISTS);
create_materialization_plan(context, PLAN_TYPE_MERGE);
create_materialization_plan(context, PLAN_TYPE_MERGE_DELETE);
break;
default:
pg_unreachable();
break;
}
}

static int
execute_materialization_plan(MaterializationContext *context, MaterializationPlanType plan_type,
uint64 *rows_processed)
{
MaterializationPlan *materialization = create_materialization_plan(context, plan_type);
MaterializationPlan *materialization = get_materialization_plan(plan_type);
Datum values[] = { context->materialization_range.start, context->materialization_range.end };
char nulls[] = { false, false };

int res = SPI_execute_plan(materialization->plan, values, nulls, materialization->read_only, 0);

*rows_processed = SPI_processed;

free_materialization_plan(plan_type);

return res;
}

Expand All @@ -408,11 +433,11 @@ free_materialization_plan(MaterializationPlanType plan_type)
}

static void
free_all_materialization_plans()
free_materialization_plans()
{
for (int i = 0; i < _MAX_MATERIALIZATION_PLAN_TYPES; i++)
for (int plan_type = 0; plan_type < _MAX_MATERIALIZATION_PLAN_TYPES; plan_type++)
{
free_materialization_plan(i);
free_materialization_plan(plan_type);
}
}

Expand Down Expand Up @@ -651,23 +676,30 @@ update_materializations(MaterializationContext *context)
{
uint64 rows_processed = 0;

/* Make sure all cached plans in the session be released before starting the materialization
* process */
free_all_materialization_plans();

/* MERGE statement is available starting on PG15 and we'll support it only in the new format of
* CAggs and for non-compressed hypertables */
if (ts_guc_enable_merge_on_cagg_refresh && PG_VERSION_NUM >= 150000 &&
ContinuousAggIsFinalized(context->cagg) &&
!TS_HYPERTABLE_HAS_COMPRESSION_ENABLED(context->mat_ht))
PG_TRY();
{
rows_processed = merge_materializations(context);
/* MERGE statement is available starting on PG15 and we'll support it only in the new format
* of CAggs and for non-compressed hypertables */
if (ts_guc_enable_merge_on_cagg_refresh && PG_VERSION_NUM >= 150000 &&
ContinuousAggIsFinalized(context->cagg) &&
!TS_HYPERTABLE_HAS_COMPRESSION_ENABLED(context->mat_ht))
{
create_materialization_plans(context, OPERATION_MERGE_DELETE);
rows_processed = merge_materializations(context);
}
else
{
create_materialization_plans(context, OPERATION_INSERT_DELETE);
rows_processed += delete_materializations(context);
rows_processed += insert_materializations(context);
}
}
else
PG_FINALLY();
{
rows_processed += delete_materializations(context);
rows_processed += insert_materializations(context);
/* Make sure all cached plans in the session be released before rethrowing the error */
free_materialization_plans();
}
PG_END_TRY();

/* Get the max(time_dimension) of the materialized data */
if (rows_processed > 0)
Expand Down

0 comments on commit 719a949

Please sign in to comment.