Skip to content

Commit

Permalink
Refactor continuous aggregate materialization code
Browse files Browse the repository at this point in the history
In addition to the code reorganization and simplification we changed
it by splitting the query execution in multiple steps taking advantage
of `SPI_prepare`, `SPI_execute_plan` and `SPI_freeplan`:
* create_materialization_plan(PlanType)
* execute_materialization_plan(PlanType)
* free_materialization_plan(PlanType)

This PR is in preparation for a following PR to execute the
materialization in small batches to alleviate the I/O spikes when
reading and writing many buckets.
  • Loading branch information
fabriziomello committed Dec 3, 2024
1 parent 0eb1234 commit 4c47fdb
Showing 1 changed file with 181 additions and 181 deletions.
362 changes: 181 additions & 181 deletions tsl/src/continuous_aggs/materialize.c
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,9 @@ static void free_materialization_plans();

static void update_watermark(MaterializationContext *context);
static bool exists_materializations(MaterializationContext *context);
static void execute_materializations(MaterializationContext *context);
static uint64 execute_materialization(MaterializationContext *context,
MaterializationPlanType plan_type);
static void execute_materializations(MaterializationContext *context);

/* API to update materializations from refresh code */
void
Expand Down Expand Up @@ -233,6 +233,186 @@ continuous_agg_update_materialization(Hypertable *mat_ht, const ContinuousAgg *c
AtEOXact_GUC(false, save_nestlevel);
}

static bool
ranges_overlap(InternalTimeRange invalidation_range, InternalTimeRange new_materialization_range)
{
Assert(invalidation_range.start <= invalidation_range.end);
Assert(new_materialization_range.start <= new_materialization_range.end);
return !(invalidation_range.end < new_materialization_range.start ||
new_materialization_range.end < invalidation_range.start);
}

static int64
range_length(const InternalTimeRange range)
{
Assert(range.end >= range.start);

return int64_saturating_sub(range.end, range.start);
}

static Datum
time_range_internal_to_min_time_value(Oid type)
{
switch (type)
{
case TIMESTAMPOID:
return TimestampGetDatum(DT_NOBEGIN);
case TIMESTAMPTZOID:
return TimestampTzGetDatum(DT_NOBEGIN);
case DATEOID:
return DateADTGetDatum(DATEVAL_NOBEGIN);
default:
return ts_internal_to_time_value(PG_INT64_MIN, type);
}
}

static Datum
time_range_internal_to_max_time_value(Oid type)
{
switch (type)
{
case TIMESTAMPOID:
return TimestampGetDatum(DT_NOEND);
case TIMESTAMPTZOID:
return TimestampTzGetDatum(DT_NOEND);
case DATEOID:
return DateADTGetDatum(DATEVAL_NOEND);
break;
default:
return ts_internal_to_time_value(PG_INT64_MAX, type);
}
}

static Datum
internal_to_time_value_or_infinite(int64 internal, Oid time_type, bool *is_infinite_out)
{
/* MIN and MAX can occur due to NULL thresholds, or due to a lack of invalidations. Since our
* regular conversion function errors in those cases, and we want to use those as markers for an
* open threshold in one direction, we special case this here*/
if (internal == PG_INT64_MIN)
{
if (is_infinite_out != NULL)
*is_infinite_out = true;
return time_range_internal_to_min_time_value(time_type);
}
else if (internal == PG_INT64_MAX)
{
if (is_infinite_out != NULL)
*is_infinite_out = true;
return time_range_internal_to_max_time_value(time_type);
}
else
{
if (is_infinite_out != NULL)
*is_infinite_out = false;
return ts_internal_to_time_value(internal, time_type);
}
}

static TimeRange
internal_time_range_to_time_range(InternalTimeRange internal)
{
TimeRange range;
range.type = internal.type;

range.start = internal_to_time_value_or_infinite(internal.start, internal.type, NULL);
range.end = internal_to_time_value_or_infinite(internal.end, internal.type, NULL);

return range;
}

static List *
cagg_find_aggref_and_var_cols(ContinuousAgg *cagg, Hypertable *mat_ht)
{
List *retlist = NIL;
ListCell *lc;
Query *cagg_view_query = ts_continuous_agg_get_query(cagg);

foreach (lc, cagg_view_query->targetList)
{
TargetEntry *tle = castNode(TargetEntry, lfirst(lc));

if (!tle->resjunk && (tle->ressortgroupref == 0 ||
get_sortgroupref_clause_noerr(tle->ressortgroupref,
cagg_view_query->groupClause) == NULL))
retlist = lappend(retlist, get_attname(mat_ht->main_table_relid, tle->resno, false));
}

return retlist;
}

static char *
build_merge_insert_columns(List *strings, const char *separator, const char *prefix)
{
StringInfo ret = makeStringInfo();

Assert(strings != NIL);

ListCell *lc;
foreach (lc, strings)
{
char *grpcol = (char *) lfirst(lc);
if (ret->len > 0)
appendStringInfoString(ret, separator);

if (prefix)
appendStringInfoString(ret, prefix);
appendStringInfoString(ret, quote_identifier(grpcol));
}

elog(DEBUG2, "%s: %s", __func__, ret->data);
return ret->data;
}

static char *
build_merge_join_clause(List *column_names)
{
StringInfo ret = makeStringInfo();

Assert(column_names != NIL);

ListCell *lc;
foreach (lc, column_names)
{
char *column = (char *) lfirst(lc);

if (ret->len > 0)
appendStringInfoString(ret, " AND ");

appendStringInfoString(ret, "P.");
appendStringInfoString(ret, quote_identifier(column));
appendStringInfoString(ret, " = M.");
appendStringInfoString(ret, quote_identifier(column));
}

elog(DEBUG2, "%s: %s", __func__, ret->data);
return ret->data;
}

static char *
build_merge_update_clause(List *column_names)
{
StringInfo ret = makeStringInfo();

Assert(column_names != NIL);

ListCell *lc;
foreach (lc, column_names)
{
char *column = (char *) lfirst(lc);

if (ret->len > 0)
appendStringInfoString(ret, ", ");

appendStringInfoString(ret, quote_identifier(column));
appendStringInfoString(ret, " = P.");
appendStringInfoString(ret, quote_identifier(column));
}

elog(DEBUG2, "%s: %s", __func__, ret->data);
return ret->data;
}

static MaterializationPlan *
get_materialization_plan(MaterializationPlanType plan_type)
{
Expand Down Expand Up @@ -453,186 +633,6 @@ free_materialization_plans()
}
}

static bool
ranges_overlap(InternalTimeRange invalidation_range, InternalTimeRange new_materialization_range)
{
Assert(invalidation_range.start <= invalidation_range.end);
Assert(new_materialization_range.start <= new_materialization_range.end);
return !(invalidation_range.end < new_materialization_range.start ||
new_materialization_range.end < invalidation_range.start);
}

static int64
range_length(const InternalTimeRange range)
{
Assert(range.end >= range.start);

return int64_saturating_sub(range.end, range.start);
}

static Datum
time_range_internal_to_min_time_value(Oid type)
{
switch (type)
{
case TIMESTAMPOID:
return TimestampGetDatum(DT_NOBEGIN);
case TIMESTAMPTZOID:
return TimestampTzGetDatum(DT_NOBEGIN);
case DATEOID:
return DateADTGetDatum(DATEVAL_NOBEGIN);
default:
return ts_internal_to_time_value(PG_INT64_MIN, type);
}
}

static Datum
time_range_internal_to_max_time_value(Oid type)
{
switch (type)
{
case TIMESTAMPOID:
return TimestampGetDatum(DT_NOEND);
case TIMESTAMPTZOID:
return TimestampTzGetDatum(DT_NOEND);
case DATEOID:
return DateADTGetDatum(DATEVAL_NOEND);
break;
default:
return ts_internal_to_time_value(PG_INT64_MAX, type);
}
}

static Datum
internal_to_time_value_or_infinite(int64 internal, Oid time_type, bool *is_infinite_out)
{
/* MIN and MAX can occur due to NULL thresholds, or due to a lack of invalidations. Since our
* regular conversion function errors in those cases, and we want to use those as markers for an
* open threshold in one direction, we special case this here*/
if (internal == PG_INT64_MIN)
{
if (is_infinite_out != NULL)
*is_infinite_out = true;
return time_range_internal_to_min_time_value(time_type);
}
else if (internal == PG_INT64_MAX)
{
if (is_infinite_out != NULL)
*is_infinite_out = true;
return time_range_internal_to_max_time_value(time_type);
}
else
{
if (is_infinite_out != NULL)
*is_infinite_out = false;
return ts_internal_to_time_value(internal, time_type);
}
}

static TimeRange
internal_time_range_to_time_range(InternalTimeRange internal)
{
TimeRange range;
range.type = internal.type;

range.start = internal_to_time_value_or_infinite(internal.start, internal.type, NULL);
range.end = internal_to_time_value_or_infinite(internal.end, internal.type, NULL);

return range;
}

static List *
cagg_find_aggref_and_var_cols(ContinuousAgg *cagg, Hypertable *mat_ht)
{
List *retlist = NIL;
ListCell *lc;
Query *cagg_view_query = ts_continuous_agg_get_query(cagg);

foreach (lc, cagg_view_query->targetList)
{
TargetEntry *tle = castNode(TargetEntry, lfirst(lc));

if (!tle->resjunk && (tle->ressortgroupref == 0 ||
get_sortgroupref_clause_noerr(tle->ressortgroupref,
cagg_view_query->groupClause) == NULL))
retlist = lappend(retlist, get_attname(mat_ht->main_table_relid, tle->resno, false));
}

return retlist;
}

static char *
build_merge_insert_columns(List *strings, const char *separator, const char *prefix)
{
StringInfo ret = makeStringInfo();

Assert(strings != NIL);

ListCell *lc;
foreach (lc, strings)
{
char *grpcol = (char *) lfirst(lc);
if (ret->len > 0)
appendStringInfoString(ret, separator);

if (prefix)
appendStringInfoString(ret, prefix);
appendStringInfoString(ret, quote_identifier(grpcol));
}

elog(DEBUG2, "%s: %s", __func__, ret->data);
return ret->data;
}

static char *
build_merge_join_clause(List *column_names)
{
StringInfo ret = makeStringInfo();

Assert(column_names != NIL);

ListCell *lc;
foreach (lc, column_names)
{
char *column = (char *) lfirst(lc);

if (ret->len > 0)
appendStringInfoString(ret, " AND ");

appendStringInfoString(ret, "P.");
appendStringInfoString(ret, quote_identifier(column));
appendStringInfoString(ret, " = M.");
appendStringInfoString(ret, quote_identifier(column));
}

elog(DEBUG2, "%s: %s", __func__, ret->data);
return ret->data;
}

static char *
build_merge_update_clause(List *column_names)
{
StringInfo ret = makeStringInfo();

Assert(column_names != NIL);

ListCell *lc;
foreach (lc, column_names)
{
char *column = (char *) lfirst(lc);

if (ret->len > 0)
appendStringInfoString(ret, ", ");

appendStringInfoString(ret, quote_identifier(column));
appendStringInfoString(ret, " = P.");
appendStringInfoString(ret, quote_identifier(column));
}

elog(DEBUG2, "%s: %s", __func__, ret->data);
return ret->data;
}

static void
update_watermark(MaterializationContext *context)
{
Expand Down

0 comments on commit 4c47fdb

Please sign in to comment.