Skip to content

Commit

Permalink
Fix CAgg migration performance regression
Browse files Browse the repository at this point in the history
When creating the migration plan we need to figure out the boundaries
(min and max) of the primary dimension to split the COPY data into
multiple transactions to don't exauste the instance resources.

The problem is we're reading the boundaries direct from the CAgg view
and when the realtime is enabled it lead to big performance penalty
(this was one of the reasons for the new format).

Also in the `COPY DATA` migration plan step we changed the batch size
used that previously was based on the materialization hypertable
partition range and now will be the Continuous Aggregate bucked size
multiplied by 10 (ten).

Fixed it by querying direct the materialization hypertable instead.

Also in ef2cfe3 we introduced a problem that now we're not explicitly
updating the watermark but instead warning the user to manually execute
the refresh procedure to update it, but without update the watermark the
query performance for realtime Continuous Aggregate can be awful so we
introduced a small procedure to update the watermark for the new
migrated cagg during the migration.
  • Loading branch information
fabriziomello committed Dec 11, 2024
1 parent c205899 commit 7035bac
Show file tree
Hide file tree
Showing 5 changed files with 384 additions and 146 deletions.
109 changes: 91 additions & 18 deletions sql/cagg_migrate.sql
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ DECLARE
_bucket_column_type TEXT;
_interval_type TEXT;
_interval_value TEXT;
_nbuckets INTEGER := 10; -- number of buckets per transaction
BEGIN
IF _timescaledb_functions.cagg_migrate_plan_exists(_cagg_data.mat_hypertable_id) IS TRUE THEN
RAISE EXCEPTION 'plan already exists for materialized hypertable %', _cagg_data.mat_hypertable_id;
Expand Down Expand Up @@ -118,8 +119,12 @@ BEGIN
AND hypertable_name = _matht.table_name
AND dimension_type = 'Time';

-- Get the current cagg bucket width
SELECT bucket_width
INTO _interval_value
FROM _timescaledb_functions.cagg_get_bucket_function_info(_cagg_data.mat_hypertable_id);

IF _integer_interval IS NOT NULL THEN
_interval_value := _integer_interval::TEXT;
_interval_type := _bucket_column_type;
IF _bucket_column_type = 'bigint' THEN
_watermark := COALESCE(_timescaledb_functions.cagg_watermark(_cagg_data.mat_hypertable_id)::bigint, '-9223372036854775808'::bigint)::TEXT;
Expand All @@ -129,7 +134,6 @@ BEGIN
_watermark := COALESCE(_timescaledb_functions.cagg_watermark(_cagg_data.mat_hypertable_id)::smallint, '-32768'::smallint)::TEXT;
END IF;
ELSE
_interval_value := _time_interval::TEXT;
_interval_type := 'interval';

-- We expect an ISO date later in parsing (i.e., min value has to be '4714-11-24 00:53:28+00:53:28 BC')
Expand Down Expand Up @@ -177,16 +181,17 @@ BEGIN
'COPY DATA',
jsonb_build_object (
'start_ts', start::text,
'end_ts', (start + CAST(%8$L AS %9$s))::text,
'end_ts', (start + (CAST(%8$L AS %9$s) * %10$s) )::text,
'bucket_column_name', bucket_column_name,
'bucket_column_type', bucket_column_type,
'cagg_name_new', cagg_name_new
)
FROM boundaries,
LATERAL generate_series(min, max, CAST(%8$L AS %9$s)) AS start;
LATERAL generate_series(min, max, (CAST(%8$L AS %9$s) * %10$s)) AS start;
$$,
_bucket_column_name, _bucket_column_type, _cagg_name_new, _cagg_data.user_view_schema,
_cagg_data.user_view_name, _watermark, _cagg_data.mat_hypertable_id, _interval_value, _interval_type
_bucket_column_name, _bucket_column_type, _cagg_name_new, _matht.schema_name,
_matht.table_name, _watermark, _cagg_data.mat_hypertable_id, _interval_value,
_interval_type, _nbuckets
);

EXECUTE _sql;
Expand Down Expand Up @@ -355,6 +360,14 @@ BEGIN
END;
$BODY$ SET search_path TO pg_catalog, pg_temp;

CREATE OR REPLACE PROCEDURE _timescaledb_functions.cagg_migrate_update_watermark(_mat_hypertable_id INTEGER)
LANGUAGE sql AS
$BODY$
INSERT INTO _timescaledb_catalog.continuous_aggs_watermark
VALUES (_mat_hypertable_id, _timescaledb_functions.cagg_watermark_materialized(_mat_hypertable_id))
ON CONFLICT (mat_hypertable_id) DO UPDATE SET watermark = excluded.watermark;
$BODY$ SECURITY DEFINER SET search_path TO pg_catalog, pg_temp;

-- Refresh new cagg created by the migration
CREATE OR REPLACE PROCEDURE _timescaledb_functions.cagg_migrate_execute_refresh_new_cagg (
_cagg_data _timescaledb_catalog.continuous_agg,
Expand All @@ -365,6 +378,7 @@ $BODY$
DECLARE
_cagg_name TEXT;
_override BOOLEAN;
_mat_hypertable_id INTEGER;
BEGIN
SELECT (config->>'override')::BOOLEAN
INTO _override
Expand All @@ -378,6 +392,18 @@ BEGIN
_cagg_name = _cagg_data.user_view_name;
END IF;

--
-- Update new cagg watermark
--
SELECT h.id
INTO _mat_hypertable_id
FROM _timescaledb_catalog.continuous_agg ca
JOIN _timescaledb_catalog.hypertable h ON (h.id = ca.mat_hypertable_id)
WHERE user_view_schema = _cagg_data.user_view_schema
AND user_view_name = _plan_step.config->>'cagg_name_new';

CALL _timescaledb_functions.cagg_migrate_update_watermark(_mat_hypertable_id);

--
-- Since we're still having problems with the `refresh_continuous_aggregate` executed inside procedures
-- and the issue isn't easy/trivial to fix we decided to skip this step here WARNING users to do it
Expand Down Expand Up @@ -407,6 +433,11 @@ DECLARE
_stmt TEXT;
_mat_schema_name TEXT;
_mat_table_name TEXT;
_mat_schema_name_old TEXT;
_mat_table_name_old TEXT;
_query TEXT;
_select_columns TEXT;
_groupby_columns TEXT;
BEGIN
SELECT h.schema_name, h.table_name
INTO _mat_schema_name, _mat_table_name
Expand All @@ -415,17 +446,59 @@ BEGIN
WHERE user_view_schema = _cagg_data.user_view_schema
AND user_view_name = _plan_step.config->>'cagg_name_new';

_stmt := format(
'INSERT INTO %I.%I SELECT * FROM %I.%I WHERE %I >= %L AND %I < %L',
_mat_schema_name,
_mat_table_name,
_cagg_data.user_view_schema,
_cagg_data.user_view_name,
_plan_step.config->>'bucket_column_name',
_plan_step.config->>'start_ts',
_plan_step.config->>'bucket_column_name',
_plan_step.config->>'end_ts'
);
-- For realtime CAggs we need to read direct from the materialization hypertable
IF _cagg_data.materialized_only IS FALSE THEN
SELECT h.schema_name, h.table_name
INTO _mat_schema_name_old, _mat_table_name_old
FROM _timescaledb_catalog.continuous_agg ca
JOIN _timescaledb_catalog.hypertable h ON (h.id = ca.mat_hypertable_id)
WHERE user_view_schema = _cagg_data.user_view_schema
AND user_view_name = _cagg_data.user_view_name;

_query :=
split_part(
pg_get_viewdef(format('%I.%I', _cagg_data.user_view_schema, _cagg_data.user_view_name)),
'UNION ALL',
1);

_groupby_columns :=
split_part(
_query,
'GROUP BY ',
2);

_select_columns :=
split_part(
_query,
format('FROM %I.%I', _mat_schema_name_old, _mat_table_name_old),
1);

_stmt := format(
'INSERT INTO %I.%I %s FROM %I.%I WHERE %I >= %L AND %I < %L GROUP BY %s',
_mat_schema_name,
_mat_table_name,
_select_columns,
_mat_schema_name_old,
_mat_table_name_old,
_plan_step.config->>'bucket_column_name',
_plan_step.config->>'start_ts',
_plan_step.config->>'bucket_column_name',
_plan_step.config->>'end_ts',
_groupby_columns
);
ELSE
_stmt := format(
'INSERT INTO %I.%I SELECT * FROM %I.%I WHERE %I >= %L AND %I < %L',
_mat_schema_name,
_mat_table_name,
_mat_schema_name_old,
_mat_table_name_old,
_plan_step.config->>'bucket_column_name',
_plan_step.config->>'start_ts',
_plan_step.config->>'bucket_column_name',
_plan_step.config->>'end_ts'
);
END IF;

EXECUTE _stmt;
END;
Expand Down Expand Up @@ -600,4 +673,4 @@ $BODY$;
-- Migrate a CAgg which is using the experimental time_bucket_ng function
-- into a CAgg using the regular time_bucket function
CREATE OR REPLACE PROCEDURE _timescaledb_functions.cagg_migrate_to_time_bucket(cagg REGCLASS)
AS '@MODULE_PATHNAME@', 'ts_continuous_agg_migrate_to_time_bucket' LANGUAGE C;
AS '@MODULE_PATHNAME@', 'ts_continuous_agg_migrate_to_time_bucket' LANGUAGE C;
1 change: 1 addition & 0 deletions sql/updates/reverse-dev.sql
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,4 @@ ALTER EXTENSION timescaledb DROP VIEW timescaledb_information.chunk_columnstore_
DROP VIEW timescaledb_information.hypertable_columnstore_settings;
DROP VIEW timescaledb_information.chunk_columnstore_settings;

DROP PROCEDURE IF EXISTS _timescaledb_functions.cagg_migrate_update_watermark(INTEGER);
Loading

0 comments on commit 7035bac

Please sign in to comment.