Skip to content

Commit

Permalink
Fix CAgg migration performance regression
Browse files Browse the repository at this point in the history
When creating the migration plan we need to figure out the boundaries
(min and max) of the primary dimension to split the COPY data into
multiple transactions to don't exauste the instance resources.

The problem is we're reading the boundaries direct from the CAgg view
and when the realtime is enabled it lead to big performance penalty
(this was one of the reasons for the new format).

Also in the `COPY DATA` migration plan step we changed the batch size
used that previously was based on the materialization hypertable
partition range and now will be the Continuous Aggregate bucked size
multiplied by 10 (ten).

Fixed it to query direct the materialization hypertable instead.
  • Loading branch information
fabriziomello committed Dec 6, 2024
1 parent 1a291aa commit b62c923
Show file tree
Hide file tree
Showing 2 changed files with 359 additions and 144 deletions.
86 changes: 69 additions & 17 deletions sql/cagg_migrate.sql
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ DECLARE
_bucket_column_type TEXT;
_interval_type TEXT;
_interval_value TEXT;
_nbuckets INTEGER := 10; -- number of buckets per transaction
BEGIN
IF _timescaledb_functions.cagg_migrate_plan_exists(_cagg_data.mat_hypertable_id) IS TRUE THEN
RAISE EXCEPTION 'plan already exists for materialized hypertable %', _cagg_data.mat_hypertable_id;
Expand Down Expand Up @@ -118,8 +119,12 @@ BEGIN
AND hypertable_name = _matht.table_name
AND dimension_type = 'Time';

-- Get the current cagg bucket width
SELECT bucket_width
INTO _interval_value
FROM _timescaledb_functions.cagg_get_bucket_function_info(_cagg_data.mat_hypertable_id);

IF _integer_interval IS NOT NULL THEN
_interval_value := _integer_interval::TEXT;
_interval_type := _bucket_column_type;
IF _bucket_column_type = 'bigint' THEN
_watermark := COALESCE(_timescaledb_functions.cagg_watermark(_cagg_data.mat_hypertable_id)::bigint, '-9223372036854775808'::bigint)::TEXT;
Expand All @@ -129,7 +134,6 @@ BEGIN
_watermark := COALESCE(_timescaledb_functions.cagg_watermark(_cagg_data.mat_hypertable_id)::smallint, '-32768'::smallint)::TEXT;
END IF;
ELSE
_interval_value := _time_interval::TEXT;
_interval_type := 'interval';

-- We expect an ISO date later in parsing (i.e., min value has to be '4714-11-24 00:53:28+00:53:28 BC')
Expand Down Expand Up @@ -177,16 +181,17 @@ BEGIN
'COPY DATA',
jsonb_build_object (
'start_ts', start::text,
'end_ts', (start + CAST(%8$L AS %9$s))::text,
'end_ts', (start + (CAST(%8$L AS %9$s) * %10$s) )::text,
'bucket_column_name', bucket_column_name,
'bucket_column_type', bucket_column_type,
'cagg_name_new', cagg_name_new
)
FROM boundaries,
LATERAL generate_series(min, max, CAST(%8$L AS %9$s)) AS start;
LATERAL generate_series(min, max, (CAST(%8$L AS %9$s) * %10$s)) AS start;
$$,
_bucket_column_name, _bucket_column_type, _cagg_name_new, _cagg_data.user_view_schema,
_cagg_data.user_view_name, _watermark, _cagg_data.mat_hypertable_id, _interval_value, _interval_type
_bucket_column_name, _bucket_column_type, _cagg_name_new, _matht.schema_name,
_matht.table_name, _watermark, _cagg_data.mat_hypertable_id, _interval_value,
_interval_type, _nbuckets
);

EXECUTE _sql;
Expand Down Expand Up @@ -407,6 +412,11 @@ DECLARE
_stmt TEXT;
_mat_schema_name TEXT;
_mat_table_name TEXT;
_mat_schema_name_old TEXT;
_mat_table_name_old TEXT;
_query TEXT;
_select_columns TEXT;
_groupby_columns TEXT;
BEGIN
SELECT h.schema_name, h.table_name
INTO _mat_schema_name, _mat_table_name
Expand All @@ -415,17 +425,59 @@ BEGIN
WHERE user_view_schema = _cagg_data.user_view_schema
AND user_view_name = _plan_step.config->>'cagg_name_new';

_stmt := format(
'INSERT INTO %I.%I SELECT * FROM %I.%I WHERE %I >= %L AND %I < %L',
_mat_schema_name,
_mat_table_name,
_cagg_data.user_view_schema,
_cagg_data.user_view_name,
_plan_step.config->>'bucket_column_name',
_plan_step.config->>'start_ts',
_plan_step.config->>'bucket_column_name',
_plan_step.config->>'end_ts'
);
-- For realtime CAggs we need to read direct from the materialization hypertable
IF _cagg_data.materialized_only IS FALSE THEN
SELECT h.schema_name, h.table_name
INTO _mat_schema_name_old, _mat_table_name_old
FROM _timescaledb_catalog.continuous_agg ca
JOIN _timescaledb_catalog.hypertable h ON (h.id = ca.mat_hypertable_id)
WHERE user_view_schema = _cagg_data.user_view_schema
AND user_view_name = _cagg_data.user_view_name;

_query :=
split_part(
pg_get_viewdef(format('%I.%I', _cagg_data.user_view_schema, _cagg_data.user_view_name)),
'UNION ALL',
1);

_groupby_columns :=
split_part(
_query,
'GROUP BY ',
2);

_select_columns :=
split_part(
_query,
format('FROM %I.%I', _mat_schema_name_old, _mat_table_name_old),
1);

_stmt := format(
'INSERT INTO %I.%I %s FROM %I.%I WHERE %I >= %L AND %I < %L GROUP BY %s',
_mat_schema_name,
_mat_table_name,
_select_columns,
_mat_schema_name_old,
_mat_table_name_old,
_plan_step.config->>'bucket_column_name',
_plan_step.config->>'start_ts',
_plan_step.config->>'bucket_column_name',
_plan_step.config->>'end_ts',
_groupby_columns
);
ELSE
_stmt := format(
'INSERT INTO %I.%I SELECT * FROM %I.%I WHERE %I >= %L AND %I < %L',
_mat_schema_name,
_mat_table_name,
_mat_schema_name_old,
_mat_table_name_old,
_plan_step.config->>'bucket_column_name',
_plan_step.config->>'start_ts',
_plan_step.config->>'bucket_column_name',
_plan_step.config->>'end_ts'
);
END IF;

EXECUTE _stmt;
END;
Expand Down
Loading

0 comments on commit b62c923

Please sign in to comment.