Skip to content

Commit

Permalink
Version 10
Browse files Browse the repository at this point in the history
  • Loading branch information
fabriziomello committed Dec 2, 2024
1 parent e4e28e2 commit d6657d5
Showing 1 changed file with 74 additions and 119 deletions.
193 changes: 74 additions & 119 deletions tsl/src/continuous_aggs/materialize.c
Original file line number Diff line number Diff line change
Expand Up @@ -65,17 +65,24 @@ typedef enum MaterializationPlanType
typedef struct MaterializationPlan
{
const char *query;
const char *strings[3];
SPIPlanPtr plan;
bool read_only;
} MaterializationPlan;

static MaterializationPlan _materialization_plans[_MAX_MATERIALIZATION_PLAN_TYPES + 1] = {
[PLAN_TYPE_INSERT] = { .query = "INSERT INTO %s.%s SELECT * FROM %s.%s AS I "
"WHERE I.%s >= $1 AND I.%s < $2 %s;",
.strings[0] = "insert",
.strings[1] = "inserted",
.strings[2] = "into",
.plan = NULL,
.read_only = false },
[PLAN_TYPE_DELETE] = { .query = "DELETE FROM %s.%s AS D "
"WHERE D.%s >= $1 AND D.%s < $2 %s;",
.strings[0] = "delete",
.strings[1] = "deleted",
.strings[2] = "from",
.plan = NULL,
.read_only = false },
[PLAN_TYPE_EXISTS] = { .query = "SELECT 1 FROM %s.%s AS M "
Expand All @@ -93,6 +100,9 @@ static MaterializationPlan _materialization_plans[_MAX_MATERIALIZATION_PLAN_TYPE
" %s " /* UPDATE */
" WHEN NOT MATCHED THEN "
" INSERT (%s) VALUES (%s) ",
.strings[0] = "merge",
.strings[1] = "merged",
.strings[2] = "into",
.plan = NULL,
.read_only = false },
[PLAN_TYPE_MERGE_DELETE] = { .query = "DELETE "
Expand All @@ -101,6 +111,9 @@ static MaterializationPlan _materialization_plans[_MAX_MATERIALIZATION_PLAN_TYPE
"AND NOT EXISTS ("
" SELECT FROM %s.%s P "
" WHERE %s AND P.%s >= $1 AND P.%s < $2) ",
.strings[0] = "delete",
.strings[1] = "deleted",
.strings[2] = "from",
.plan = NULL,
.read_only = false },
};
Expand Down Expand Up @@ -129,11 +142,10 @@ static void free_materialization_plan(MaterializationPlanType plan_type);
static void free_materialization_plans();

static void update_watermark(MaterializationContext *context);
static void update_materializations(MaterializationContext *context);
static uint64 delete_materializations(MaterializationContext *context);
static uint64 insert_materializations(MaterializationContext *context);
static bool exists_materializations(MaterializationContext *context);
static uint64 merge_materializations(MaterializationContext *context);
static void execute_materializations(MaterializationContext *context);
static uint64 execute_materialization(MaterializationContext *context,
MaterializationPlanType plan_type);

/* API to update materializations from refresh code */
void
Expand Down Expand Up @@ -205,16 +217,16 @@ continuous_agg_update_materialization(Hypertable *mat_ht, const ContinuousAgg *c
{
context.materialization_range =
internal_time_range_to_time_range(combined_materialization_range);
update_materializations(&context);
execute_materializations(&context);
}
else
{
context.materialization_range = internal_time_range_to_time_range(invalidation_range);
update_materializations(&context);
execute_materializations(&context);

context.materialization_range =
internal_time_range_to_time_range(new_materialization_range);
update_materializations(&context);
execute_materializations(&context);
}

/* Restore search_path */
Expand Down Expand Up @@ -671,43 +683,6 @@ update_watermark(MaterializationContext *context)
}
}

static void
update_materializations(MaterializationContext *context)
{
uint64 rows_processed = 0;

PG_TRY();
{
/* MERGE statement is available starting on PG15 and we'll support it only in the new format
* of CAggs and for non-compressed hypertables */
if (ts_guc_enable_merge_on_cagg_refresh && PG_VERSION_NUM >= 150000 &&
ContinuousAggIsFinalized(context->cagg) &&
!TS_HYPERTABLE_HAS_COMPRESSION_ENABLED(context->mat_ht))
{
create_materialization_plans(context, OPERATION_MERGE_DELETE);
rows_processed = merge_materializations(context);
}
else
{
create_materialization_plans(context, OPERATION_INSERT_DELETE);
rows_processed += delete_materializations(context);
rows_processed += insert_materializations(context);
}
}
PG_FINALLY();
{
/* Make sure all cached plans in the session be released before rethrowing the error */
free_materialization_plans();
}
PG_END_TRY();

/* Get the max(time_dimension) of the materialized data */
if (rows_processed > 0)
{
update_watermark(context);
}
}

static bool
exists_materializations(MaterializationContext *context)
{
Expand All @@ -726,100 +701,80 @@ exists_materializations(MaterializationContext *context)
}

static uint64
merge_materializations(MaterializationContext *context)
{
/* Fallback to INSERT materializations if there's no rows to change on it */
if (!exists_materializations(context))
{
elog(DEBUG2,
"no rows to update on materialization table \"%s.%s\", falling back to INSERT",
NameStr(*context->materialization_table.schema),
NameStr(*context->materialization_table.name));
return insert_materializations(context);
}

int res;
uint64 rows_processed = 0, all_processed = 0;

res = execute_materialization_plan(context, PLAN_TYPE_MERGE, &rows_processed);

if (res < 0)
elog(ERROR,
"could not materialize values into the materialization table \"%s.%s\"",
NameStr(*context->materialization_table.schema),
NameStr(*context->materialization_table.name));
else
elog(LOG,
"merged " UINT64_FORMAT " row(s) into materialization table \"%s.%s\"",
SPI_processed,
NameStr(*context->materialization_table.schema),
NameStr(*context->materialization_table.name));

all_processed += rows_processed;

/* DELETE buckets from the target materialization hypertable when not exists in the source
* hypertable or continuous aggregate (in case of hierarchical) */
res = execute_materialization_plan(context, PLAN_TYPE_MERGE_DELETE, &rows_processed);

if (res < 0)
elog(ERROR,
"could not delete values from the materialization table \"%s.%s\"",
NameStr(*context->materialization_table.schema),
NameStr(*context->materialization_table.name));
else
elog(LOG,
"deleted " UINT64_FORMAT " row(s) from materialization table \"%s.%s\"",
SPI_processed,
NameStr(*context->materialization_table.schema),
NameStr(*context->materialization_table.name));

all_processed += rows_processed;

return all_processed;
}

static uint64
delete_materializations(MaterializationContext *context)
execute_materialization(MaterializationContext *context, MaterializationPlanType plan_type)
{
int res;
uint64 rows_processed;
MaterializationPlan *materialization = get_materialization_plan(plan_type);

res = execute_materialization_plan(context, PLAN_TYPE_DELETE, &rows_processed);
res = execute_materialization_plan(context, plan_type, &rows_processed);

if (res < 0)
elog(ERROR,
"could not delete old values from materialization table \"%s.%s\"",
"could not %s old values %s materialization table \"%s.%s\"",
materialization->strings[0],
materialization->strings[2],
NameStr(*context->materialization_table.schema),
NameStr(*context->materialization_table.name));
else
elog(LOG,
"deleted " UINT64_FORMAT " row(s) from materialization table \"%s.%s\"",
SPI_processed,
"%s " UINT64_FORMAT " row(s) %s materialization table \"%s.%s\"",
materialization->strings[1],
rows_processed,
materialization->strings[2],
NameStr(*context->materialization_table.schema),
NameStr(*context->materialization_table.name));

return rows_processed;
}

static uint64
insert_materializations(MaterializationContext *context)
static void
execute_materializations(MaterializationContext *context)
{
int res;
uint64 rows_processed;
uint64 rows_processed = 0;

res = execute_materialization_plan(context, PLAN_TYPE_INSERT, &rows_processed);
PG_TRY();
{
/* MERGE statement is available starting on PG15 and we'll support it only in the new format
* of CAggs and for non-compressed hypertables */
if (ts_guc_enable_merge_on_cagg_refresh && PG_VERSION_NUM >= 150000 &&
ContinuousAggIsFinalized(context->cagg) &&
!TS_HYPERTABLE_HAS_COMPRESSION_ENABLED(context->mat_ht))
{
create_materialization_plans(context, OPERATION_MERGE_DELETE);

if (res < 0)
elog(ERROR,
"could not materialize values into the materialization table \"%s.%s\"",
NameStr(*context->materialization_table.schema),
NameStr(*context->materialization_table.name));
else
elog(LOG,
"inserted " UINT64_FORMAT " row(s) into materialization table \"%s.%s\"",
SPI_processed,
NameStr(*context->materialization_table.schema),
NameStr(*context->materialization_table.name));
/* Fallback to INSERT materializations if there's no rows to change on it */
if (!exists_materializations(context))
{
elog(DEBUG2,
"no rows to merge on materialization table \"%s.%s\", falling back to INSERT",
NameStr(*context->materialization_table.schema),
NameStr(*context->materialization_table.name));
rows_processed = execute_materialization(context, PLAN_TYPE_INSERT);
}
else
{
rows_processed += execute_materialization(context, PLAN_TYPE_MERGE);
rows_processed += execute_materialization(context, PLAN_TYPE_MERGE_DELETE);
}
}
else
{
create_materialization_plans(context, OPERATION_INSERT_DELETE);
rows_processed += execute_materialization(context, PLAN_TYPE_DELETE);
rows_processed += execute_materialization(context, PLAN_TYPE_INSERT);
}
}
PG_FINALLY();
{
/* Make sure all cached plans in the session be released before rethrowing the error */
free_materialization_plans();
}
PG_END_TRY();

return rows_processed;
/* Get the max(time_dimension) of the materialized data */
if (rows_processed > 0)
{
update_watermark(context);
}
}

0 comments on commit d6657d5

Please sign in to comment.