From c501c66cbe8991e36c471fdce67b1edfdaa9ea3e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabr=C3=ADzio=20de=20Royes=20Mello?= Date: Tue, 10 Oct 2023 16:06:29 -0300 Subject: [PATCH] PG16: Fix multinode deparsing issues With the changes introduced in PG16 by moving the permission checking information out of the range table entries to a new data struct named `RTEPermissionInfo` we can't use the rte->updatedcols anymore for the target_attrs when deparsing, so we need to build new target_attrs based on the `get_rel_all_updated_cols` and for our multinode implementation we need to get rid the generated always attributes to don't risk to build the parameters in `stmt_params_create` using this column. Postgres FDW also have it own logic to skip generated columns as well. --- tsl/src/fdw/deparse.c | 11 +- tsl/src/fdw/modify_plan.c | 115 +-- ...gw_dist_ht.out => cagg_bgw_dist_ht-13.out} | 0 tsl/test/expected/cagg_bgw_dist_ht-14.out | 744 ++++++++++++++++++ tsl/test/expected/cagg_bgw_dist_ht-15.out | 744 ++++++++++++++++++ tsl/test/expected/cagg_bgw_dist_ht-16.out | 744 ++++++++++++++++++ tsl/test/expected/dist_hypertable-16.out | 5 +- tsl/test/sql/.gitignore | 1 + tsl/test/sql/CMakeLists.txt | 2 +- ...gw_dist_ht.sql => cagg_bgw_dist_ht.sql.in} | 0 10 files changed, 2305 insertions(+), 61 deletions(-) rename tsl/test/expected/{cagg_bgw_dist_ht.out => cagg_bgw_dist_ht-13.out} (100%) create mode 100644 tsl/test/expected/cagg_bgw_dist_ht-14.out create mode 100644 tsl/test/expected/cagg_bgw_dist_ht-15.out create mode 100644 tsl/test/expected/cagg_bgw_dist_ht-16.out rename tsl/test/sql/{cagg_bgw_dist_ht.sql => cagg_bgw_dist_ht.sql.in} (100%) diff --git a/tsl/src/fdw/deparse.c b/tsl/src/fdw/deparse.c index 4c0623093b8..679745c9b6c 100644 --- a/tsl/src/fdw/deparse.c +++ b/tsl/src/fdw/deparse.c @@ -1956,6 +1956,7 @@ void deparseUpdateSql(StringInfo buf, RangeTblEntry *rte, Index rtindex, Relation rel, List *targetAttrs, List *returningList, List **retrieved_attrs) { + TupleDesc tupdesc = RelationGetDescr(rel); AttrNumber pindex; bool first; ListCell *lc; @@ -1969,14 +1970,20 @@ deparseUpdateSql(StringInfo buf, RangeTblEntry *rte, Index rtindex, Relation rel foreach (lc, targetAttrs) { int attnum = lfirst_int(lc); + Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1); if (!first) appendStringInfoString(buf, ", "); first = false; deparseColumnRef(buf, rtindex, attnum, rte, false); - appendStringInfo(buf, " = $%d", pindex); - pindex++; + if (attr->attgenerated) + appendStringInfoString(buf, " = DEFAULT"); + else + { + appendStringInfo(buf, " = $%d", pindex); + pindex++; + } } appendStringInfoString(buf, " WHERE ctid = $1"); diff --git a/tsl/src/fdw/modify_plan.c b/tsl/src/fdw/modify_plan.c index 5a9dc8fafd2..ef9e8a76d9c 100644 --- a/tsl/src/fdw/modify_plan.c +++ b/tsl/src/fdw/modify_plan.c @@ -8,6 +8,8 @@ #include #include #include +#include +#include #include #include "deparse.h" @@ -15,44 +17,6 @@ #include "modify_plan.h" #include "ts_catalog/chunk_data_node.h" -static List * -get_insert_attrs(Relation rel) -{ - TupleDesc tupdesc = RelationGetDescr(rel); - List *attrs = NIL; - int i; - - for (i = 0; i < tupdesc->natts; i++) - { - Form_pg_attribute attr = TupleDescAttr(tupdesc, i); - - if (!attr->attisdropped) - attrs = lappend_int(attrs, AttrOffsetGetAttrNumber(i)); - } - - return attrs; -} - -static List * -get_update_attrs(Bitmapset *updatedCols) -{ - List *attrs = NIL; - int col = -1; - - while ((col = bms_next_member(updatedCols, col)) >= 0) - { - /* bit numbers are offset by FirstLowInvalidHeapAttributeNumber */ - AttrNumber attno = col + FirstLowInvalidHeapAttributeNumber; - - if (attno <= InvalidAttrNumber) /* shouldn't happen */ - elog(ERROR, "system-column update is not supported"); - - attrs = lappend_int(attrs, attno); - } - - return attrs; -} - /* get a list of "live" DNs associated with this chunk */ List * get_chunk_data_nodes(Oid relid) @@ -144,6 +108,63 @@ fdw_plan_foreign_modify(PlannerInfo *root, ModifyTable *plan, Index result_relat if (plan->returningLists) returning_list = (List *) list_nth(plan->returningLists, subplan_index); + /* + * Core code already has some lock on each rel being planned, so we can + * use NoLock here. + */ + rel = table_open(rte->relid, NoLock); + TupleDesc tupdesc = RelationGetDescr(rel); + + /* + * In an INSERT, we transmit all columns that are defined in the foreign + * table. In an UPDATE, if there are BEFORE ROW UPDATE triggers on the + * foreign table, we transmit all columns like INSERT; else we transmit + * only columns that were explicitly targets of the UPDATE, so as to avoid + * unnecessary data transmission. (We can't do that for INSERT since we + * would miss sending default values for columns not listed in the source + * statement, and for UPDATE if there are BEFORE ROW UPDATE triggers since + * those triggers might change values for non-target columns, in which + * case we would miss sending changed values for those columns.) + */ + if (operation == CMD_INSERT || + (operation == CMD_UPDATE && + rel->trigdesc && + rel->trigdesc->trig_update_before_row)) + { + int attnum; + + for (attnum = 1; attnum <= tupdesc->natts; attnum++) + { + Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1); + + if (!attr->attisdropped) + target_attrs = lappend_int(target_attrs, attnum); + } + } + else if (operation == CMD_UPDATE) + { + int col; + RelOptInfo *rel = find_base_rel(root, result_relation); + Bitmapset *allUpdatedCols = get_rel_all_updated_cols(root, rel); + + col = -1; + while ((col = bms_next_member(allUpdatedCols, col)) >= 0) + { + /* bit numbers are offset by FirstLowInvalidHeapAttributeNumber */ + AttrNumber attno = col + FirstLowInvalidHeapAttributeNumber; + Form_pg_attribute attr = TupleDescAttr(tupdesc, attno - 1); + + if (attno <= InvalidAttrNumber) /* shouldn't happen */ + elog(ERROR, "system-column update is not supported"); + + /* Ignore generated columns */ + if (attr->attgenerated) + continue; + + target_attrs = lappend_int(target_attrs, attno); + } + } + /* * ON CONFLICT DO UPDATE and DO NOTHING case with inference specification * should have already been rejected in the optimizer, as presently there @@ -158,12 +179,6 @@ fdw_plan_foreign_modify(PlannerInfo *root, ModifyTable *plan, Index result_relat errmsg("ON CONFLICT DO UPDATE not supported" " on distributed hypertables"))); - /* - * Core code already has some lock on each rel being planned, so we can - * use NoLock here. - */ - rel = table_open(rte->relid, NoLock); - /* * Construct the SQL command string * @@ -176,7 +191,6 @@ fdw_plan_foreign_modify(PlannerInfo *root, ModifyTable *plan, Index result_relat switch (operation) { case CMD_INSERT: - target_attrs = get_insert_attrs(rel); deparseInsertSql(&sql, rte, result_relation, @@ -189,17 +203,6 @@ fdw_plan_foreign_modify(PlannerInfo *root, ModifyTable *plan, Index result_relat break; case CMD_UPDATE: { -#if PG16_LT - Bitmapset *updatedCols = rte->updatedCols; -#else - Bitmapset *updatedCols = NULL; - if (rte->perminfoindex > 0) - { - RTEPermissionInfo *perminfo = getRTEPermissionInfo(root->parse->rteperminfos, rte); - updatedCols = perminfo->updatedCols; - } -#endif - target_attrs = get_update_attrs(updatedCols); deparseUpdateSql(&sql, rte, result_relation, diff --git a/tsl/test/expected/cagg_bgw_dist_ht.out b/tsl/test/expected/cagg_bgw_dist_ht-13.out similarity index 100% rename from tsl/test/expected/cagg_bgw_dist_ht.out rename to tsl/test/expected/cagg_bgw_dist_ht-13.out diff --git a/tsl/test/expected/cagg_bgw_dist_ht-14.out b/tsl/test/expected/cagg_bgw_dist_ht-14.out new file mode 100644 index 00000000000..e7ca0d9ee6c --- /dev/null +++ b/tsl/test/expected/cagg_bgw_dist_ht-14.out @@ -0,0 +1,744 @@ +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. +------------------------------------ +-- Set up a distributed environment +------------------------------------ +\c :TEST_DBNAME :ROLE_CLUSTER_SUPERUSER +\set DATA_NODE_1 :TEST_DBNAME _1 +\set DATA_NODE_2 :TEST_DBNAME _2 +\set DATA_NODE_3 :TEST_DBNAME _3 +\ir include/remote_exec.sql +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. +CREATE SCHEMA IF NOT EXISTS test; +psql:include/remote_exec.sql:5: NOTICE: schema "test" already exists, skipping +GRANT USAGE ON SCHEMA test TO PUBLIC; +CREATE OR REPLACE FUNCTION test.remote_exec(srv_name name[], command text) +RETURNS VOID +AS :TSL_MODULE_PATHNAME, 'ts_remote_exec' +LANGUAGE C; +CREATE OR REPLACE FUNCTION test.remote_exec_get_result_strings(srv_name name[], command text) +RETURNS TABLE("table_record" CSTRING[]) +AS :TSL_MODULE_PATHNAME, 'ts_remote_exec_get_result_strings' +LANGUAGE C; +SELECT node_name, database, node_created, database_created, extension_created +FROM ( + SELECT (add_data_node(name, host => 'localhost', DATABASE => name)).* + FROM (VALUES (:'DATA_NODE_1'), (:'DATA_NODE_2'), (:'DATA_NODE_3')) v(name) +) a; + node_name | database | node_created | database_created | extension_created +-----------------------+-----------------------+--------------+------------------+------------------- + db_cagg_bgw_dist_ht_1 | db_cagg_bgw_dist_ht_1 | t | t | t + db_cagg_bgw_dist_ht_2 | db_cagg_bgw_dist_ht_2 | t | t | t + db_cagg_bgw_dist_ht_3 | db_cagg_bgw_dist_ht_3 | t | t | t +(3 rows) + +GRANT USAGE ON FOREIGN SERVER :DATA_NODE_1, :DATA_NODE_2, :DATA_NODE_3 TO PUBLIC; +-- though user on access node has required GRANTS, this will propagate GRANTS to the connected data nodes +GRANT CREATE ON SCHEMA public TO :ROLE_DEFAULT_PERM_USER; +\set IS_DISTRIBUTED TRUE +\ir include/cagg_bgw_common.sql +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. +-- +-- Setup +-- +\c :TEST_DBNAME :ROLE_CLUSTER_SUPERUSER +CREATE OR REPLACE FUNCTION ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(timeout INT = -1, mock_start_time INT = 0) RETURNS VOID +AS :MODULE_PATHNAME LANGUAGE C VOLATILE; +CREATE OR REPLACE FUNCTION ts_bgw_db_scheduler_test_run(timeout INT = -1, mock_start_time INT = 0) RETURNS VOID +AS :MODULE_PATHNAME LANGUAGE C VOLATILE; +CREATE OR REPLACE FUNCTION ts_bgw_db_scheduler_test_wait_for_scheduler_finish() RETURNS VOID +AS :MODULE_PATHNAME LANGUAGE C VOLATILE; +CREATE OR REPLACE FUNCTION ts_bgw_params_create() RETURNS VOID +AS :MODULE_PATHNAME LANGUAGE C VOLATILE; +CREATE OR REPLACE FUNCTION ts_bgw_params_destroy() RETURNS VOID +AS :MODULE_PATHNAME LANGUAGE C VOLATILE; +CREATE OR REPLACE FUNCTION ts_bgw_params_reset_time(set_time BIGINT = 0, wait BOOLEAN = false) RETURNS VOID +AS :MODULE_PATHNAME LANGUAGE C VOLATILE; +--test that this all works under the community license +ALTER DATABASE :TEST_DBNAME SET timescaledb.license_key='Community'; +--create a function with no permissions to execute +CREATE FUNCTION get_constant_no_perms() RETURNS INTEGER LANGUAGE SQL IMMUTABLE AS +$BODY$ + SELECT 10; +$BODY$; +REVOKE EXECUTE ON FUNCTION get_constant_no_perms() FROM PUBLIC; +\set WAIT_ON_JOB 0 +\set IMMEDIATELY_SET_UNTIL 1 +\set WAIT_FOR_OTHER_TO_ADVANCE 2 +CREATE OR REPLACE FUNCTION ts_bgw_params_mock_wait_returns_immediately(new_val INTEGER) RETURNS VOID +AS :MODULE_PATHNAME LANGUAGE C VOLATILE; +-- Remove any default jobs, e.g., telemetry +DELETE FROM _timescaledb_config.bgw_job WHERE TRUE; +TRUNCATE _timescaledb_internal.bgw_job_stat; +\c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER +CREATE TABLE public.bgw_log( + msg_no INT, + mock_time BIGINT, + application_name TEXT, + msg TEXT +); +CREATE VIEW sorted_bgw_log AS + SELECT msg_no, + mock_time, + application_name, + regexp_replace(regexp_replace(msg, '(Wait until|started at|execution time) [0-9]+(\.[0-9]+)?', '\1 (RANDOM)', 'g'), 'background worker "[^"]+"','connection') AS msg + FROM bgw_log ORDER BY mock_time, application_name COLLATE "C", msg_no; +CREATE TABLE public.bgw_dsm_handle_store( + handle BIGINT +); +INSERT INTO public.bgw_dsm_handle_store VALUES (0); +SELECT ts_bgw_params_create(); + ts_bgw_params_create +---------------------- + +(1 row) + +SELECT * FROM _timescaledb_config.bgw_job; + id | application_name | schedule_interval | max_runtime | max_retries | retry_period | proc_schema | proc_name | owner | scheduled | fixed_schedule | initial_start | hypertable_id | config | check_schema | check_name | timezone +----+------------------+-------------------+-------------+-------------+--------------+-------------+-----------+-------+-----------+----------------+---------------+---------------+--------+--------------+------------+---------- +(0 rows) + +SELECT * FROM timescaledb_information.job_stats; + hypertable_schema | hypertable_name | job_id | last_run_started_at | last_successful_finish | last_run_status | job_status | last_run_duration | next_start | total_runs | total_successes | total_failures +-------------------+-----------------+--------+---------------------+------------------------+-----------------+------------+-------------------+------------+------------+-----------------+---------------- +(0 rows) + +SELECT * FROM _timescaledb_catalog.continuous_agg; + mat_hypertable_id | raw_hypertable_id | parent_mat_hypertable_id | user_view_schema | user_view_name | partial_view_schema | partial_view_name | bucket_width | direct_view_schema | direct_view_name | materialized_only | finalized +-------------------+-------------------+--------------------------+------------------+----------------+---------------------+-------------------+--------------+--------------------+------------------+-------------------+----------- +(0 rows) + +-- though user on access node has required GRANTS, this will propagate GRANTS to the connected data nodes +GRANT CREATE ON SCHEMA public TO :ROLE_DEFAULT_PERM_USER; +psql:include/cagg_bgw_common.sql:76: WARNING: no privileges were granted for "public" +\c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER +CREATE TABLE test_continuous_agg_table(time int, data int); +\if :IS_DISTRIBUTED +SELECT create_distributed_hypertable('test_continuous_agg_table', 'time', chunk_time_interval => 10, replication_factor => 2); +psql:include/cagg_bgw_common.sql:80: NOTICE: adding not-null constraint to column "time" + create_distributed_hypertable +---------------------------------------- + (1,public,test_continuous_agg_table,t) +(1 row) + +\else +SELECT create_hypertable('test_continuous_agg_table', 'time', chunk_time_interval => 10); +\endif +CREATE OR REPLACE FUNCTION integer_now_test() returns int LANGUAGE SQL STABLE as $$ SELECT coalesce(max(time), 0) FROM test_continuous_agg_table $$; +\if :IS_DISTRIBUTED +CALL distributed_exec($DIST$ +CREATE OR REPLACE FUNCTION integer_now_test() returns int LANGUAGE SQL STABLE as $$ SELECT coalesce(max(time), 0) FROM test_continuous_agg_table $$; +$DIST$); +\endif +SELECT set_integer_now_func('test_continuous_agg_table', 'integer_now_test'); + set_integer_now_func +---------------------- + +(1 row) + +CREATE MATERIALIZED VIEW test_continuous_agg_view + WITH (timescaledb.continuous, timescaledb.materialized_only=true) + AS SELECT time_bucket('2', time), SUM(data) as value + FROM test_continuous_agg_table + GROUP BY 1 WITH NO DATA; +SELECT add_continuous_aggregate_policy('test_continuous_agg_view', NULL, 4::integer, '12 h'::interval); + add_continuous_aggregate_policy +--------------------------------- + 1000 +(1 row) + +SELECT id as raw_table_id FROM _timescaledb_catalog.hypertable WHERE table_name='test_continuous_agg_table' \gset +-- min distance from end should be 1 +SELECT mat_hypertable_id, user_view_schema, user_view_name, bucket_width +FROM _timescaledb_catalog.continuous_agg; + mat_hypertable_id | user_view_schema | user_view_name | bucket_width +-------------------+------------------+--------------------------+-------------- + 2 | public | test_continuous_agg_view | 2 +(1 row) + +SELECT mat_hypertable_id FROM _timescaledb_catalog.continuous_agg \gset +SELECT id AS job_id FROM _timescaledb_config.bgw_job where hypertable_id=:mat_hypertable_id \gset +-- job was created +SELECT * FROM _timescaledb_config.bgw_job where hypertable_id=:mat_hypertable_id; + id | application_name | schedule_interval | max_runtime | max_retries | retry_period | proc_schema | proc_name | owner | scheduled | fixed_schedule | initial_start | hypertable_id | config | check_schema | check_name | timezone +------+--------------------------------------------+-------------------+-------------+-------------+--------------+------------------------+-------------------------------------+-------------------+-----------+----------------+---------------+---------------+-----------------------------------------------------------------+------------------------+-------------------------------------------+---------- + 1000 | Refresh Continuous Aggregate Policy [1000] | @ 12 hours | @ 0 | -1 | @ 12 hours | _timescaledb_functions | policy_refresh_continuous_aggregate | default_perm_user | t | f | | 2 | {"end_offset": 4, "start_offset": null, "mat_hypertable_id": 2} | _timescaledb_functions | policy_refresh_continuous_aggregate_check | +(1 row) + +-- create 10 time buckets +INSERT INTO test_continuous_agg_table + SELECT i, i FROM + (SELECT generate_series(0, 10) as i) AS j; +-- no stats +SELECT job_id, next_start, last_finish as until_next, last_run_success, total_runs, total_successes, total_failures, total_crashes + FROM _timescaledb_internal.bgw_job_stat + ORDER BY job_id; + job_id | next_start | until_next | last_run_success | total_runs | total_successes | total_failures | total_crashes +--------+------------+------------+------------------+------------+-----------------+----------------+--------------- +(0 rows) + +-- no data in view +SELECT * FROM test_continuous_agg_view ORDER BY 1; + time_bucket | value +-------------+------- +(0 rows) + +-- run first time +SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(25); + ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish +------------------------------------------------------------ + +(1 row) + +SELECT * FROM sorted_bgw_log; + msg_no | mock_time | application_name | msg +--------+-----------+--------------------------------------------+-------------------------------------------------------------------------------------------------------- + 0 | 0 | DB Scheduler | [TESTING] Registered new background worker + 1 | 0 | DB Scheduler | [TESTING] Wait until (RANDOM), started at (RANDOM) + 0 | 0 | Refresh Continuous Aggregate Policy [1000] | refreshing continuous aggregate "test_continuous_agg_view" in window [ -2147483648, 6 ] + 1 | 0 | Refresh Continuous Aggregate Policy [1000] | deleted 0 row(s) from materialization table "_timescaledb_internal._materialized_hypertable_2" + 2 | 0 | Refresh Continuous Aggregate Policy [1000] | inserted 3 row(s) into materialization table "_timescaledb_internal._materialized_hypertable_2" + 3 | 0 | Refresh Continuous Aggregate Policy [1000] | job 1000 (Refresh Continuous Aggregate Policy [1000]) exiting with success: execution time (RANDOM) ms +(6 rows) + +SELECT * FROM _timescaledb_config.bgw_job where id=:job_id; + id | application_name | schedule_interval | max_runtime | max_retries | retry_period | proc_schema | proc_name | owner | scheduled | fixed_schedule | initial_start | hypertable_id | config | check_schema | check_name | timezone +------+--------------------------------------------+-------------------+-------------+-------------+--------------+------------------------+-------------------------------------+-------------------+-----------+----------------+---------------+---------------+-----------------------------------------------------------------+------------------------+-------------------------------------------+---------- + 1000 | Refresh Continuous Aggregate Policy [1000] | @ 12 hours | @ 0 | -1 | @ 12 hours | _timescaledb_functions | policy_refresh_continuous_aggregate | default_perm_user | t | f | | 2 | {"end_offset": 4, "start_offset": null, "mat_hypertable_id": 2} | _timescaledb_functions | policy_refresh_continuous_aggregate_check | +(1 row) + +-- job ran once, successfully +SELECT job_id, next_start-last_finish as until_next, last_run_success, total_runs, total_successes, total_failures, total_crashes + FROM _timescaledb_internal.bgw_job_stat + where job_id=:job_id; + job_id | until_next | last_run_success | total_runs | total_successes | total_failures | total_crashes +--------+------------+------------------+------------+-----------------+----------------+--------------- + 1000 | @ 12 hours | t | 1 | 1 | 0 | 0 +(1 row) + +--clear log for next run of scheduler. +TRUNCATE public.bgw_log; +CREATE FUNCTION wait_for_timer_to_run(started_at INTEGER, spins INTEGER=:TEST_SPINWAIT_ITERS) RETURNS BOOLEAN LANGUAGE PLPGSQL AS +$BODY$ +DECLARE + num_runs INTEGER; + message TEXT; +BEGIN + select format('[TESTING] Wait until %%, started at %s', started_at) into message; + FOR i in 1..spins + LOOP + SELECT COUNT(*) from bgw_log where msg LIKE message INTO num_runs; + if (num_runs > 0) THEN + RETURN true; + ELSE + PERFORM pg_sleep(0.1); + END IF; + END LOOP; + RETURN false; +END +$BODY$; +CREATE FUNCTION wait_for_job_to_run(job_param_id INTEGER, expected_runs INTEGER, spins INTEGER=:TEST_SPINWAIT_ITERS) RETURNS BOOLEAN LANGUAGE PLPGSQL AS +$BODY$ +DECLARE + num_runs INTEGER; +BEGIN + FOR i in 1..spins + LOOP + SELECT total_successes FROM _timescaledb_internal.bgw_job_stat WHERE job_id=job_param_id INTO num_runs; + if (num_runs = expected_runs) THEN + RETURN true; + ELSEIF (num_runs > expected_runs) THEN + RAISE 'num_runs > expected'; + ELSE + PERFORM pg_sleep(0.1); + END IF; + END LOOP; + RETURN false; +END +$BODY$; +--make sure there is 1 job to start with +SELECT wait_for_job_to_run(:job_id, 1); + wait_for_job_to_run +--------------------- + t +(1 row) + +SELECT ts_bgw_params_mock_wait_returns_immediately(:WAIT_FOR_OTHER_TO_ADVANCE); + ts_bgw_params_mock_wait_returns_immediately +--------------------------------------------- + +(1 row) + +--start the scheduler on 0 time +SELECT ts_bgw_params_reset_time(0, true); + ts_bgw_params_reset_time +-------------------------- + +(1 row) + +SELECT ts_bgw_db_scheduler_test_run(extract(epoch from interval '24 hour')::int * 1000, 0); + ts_bgw_db_scheduler_test_run +------------------------------ + +(1 row) + +SELECT wait_for_timer_to_run(0); + wait_for_timer_to_run +----------------------- + t +(1 row) + +--advance to 12:00 so that it runs one more time; now we know the +--scheduler has loaded up the job with the old schedule_interval +SELECT ts_bgw_params_reset_time(extract(epoch from interval '12 hour')::bigint * 1000000, true); + ts_bgw_params_reset_time +-------------------------- + +(1 row) + +SELECT wait_for_job_to_run(:job_id, 2); + wait_for_job_to_run +--------------------- + t +(1 row) + +--advance clock 1us to make the scheduler realize the job is done +SELECT ts_bgw_params_reset_time((extract(epoch from interval '12 hour')::bigint * 1000000)+1, true); + ts_bgw_params_reset_time +-------------------------- + +(1 row) + +--alter the refresh interval and check if next_start is altered +SELECT alter_job(:job_id, schedule_interval => '1m', retry_period => '1m'); + alter_job +------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + (1000,"@ 1 min","@ 0",-1,"@ 1 min",t,"{""end_offset"": 4, ""start_offset"": null, ""mat_hypertable_id"": 2}","Sat Jan 01 04:01:00 2000 PST",_timescaledb_functions.policy_refresh_continuous_aggregate_check,f,,) +(1 row) + +SELECT job_id, next_start - last_finish as until_next, total_runs +FROM _timescaledb_internal.bgw_job_stat +WHERE job_id=:job_id;; + job_id | until_next | total_runs +--------+------------+------------ + 1000 | @ 1 min | 2 +(1 row) + +--advance to 12:02, job should have run at 12:01 +SELECT ts_bgw_params_reset_time((extract(epoch from interval '12 hour')::bigint * 1000000)+(extract(epoch from interval '2 minute')::bigint * 1000000), true); + ts_bgw_params_reset_time +-------------------------- + +(1 row) + +SELECT wait_for_job_to_run(:job_id, 3); + wait_for_job_to_run +--------------------- + t +(1 row) + +--next run in 1 minute +SELECT job_id, next_start-last_finish as until_next, total_runs +FROM _timescaledb_internal.bgw_job_stat +WHERE job_id=:job_id; + job_id | until_next | total_runs +--------+------------+------------ + 1000 | @ 1 min | 3 +(1 row) + +--change next run to be after 30s instead +SELECT (next_start - '30s'::interval) AS "NEW_NEXT_START" +FROM _timescaledb_internal.bgw_job_stat +WHERE job_id=:job_id \gset +SELECT alter_job(:job_id, next_start => :'NEW_NEXT_START'); + alter_job +------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + (1000,"@ 1 min","@ 0",-1,"@ 1 min",t,"{""end_offset"": 4, ""start_offset"": null, ""mat_hypertable_id"": 2}","Sat Jan 01 04:02:30 2000 PST",_timescaledb_functions.policy_refresh_continuous_aggregate_check,f,,) +(1 row) + +SELECT ts_bgw_params_reset_time((extract(epoch from interval '12 hour')::bigint * 1000000)+(extract(epoch from interval '2 minute 30 seconds')::bigint * 1000000), true); + ts_bgw_params_reset_time +-------------------------- + +(1 row) + +SELECT wait_for_job_to_run(:job_id, 4); + wait_for_job_to_run +--------------------- + t +(1 row) + +--advance clock to quit scheduler +SELECT ts_bgw_params_reset_time(extract(epoch from interval '25 hour')::bigint * 1000000, true); + ts_bgw_params_reset_time +-------------------------- + +(1 row) + +select ts_bgw_db_scheduler_test_wait_for_scheduler_finish(); + ts_bgw_db_scheduler_test_wait_for_scheduler_finish +---------------------------------------------------- + +(1 row) + +SELECT ts_bgw_params_mock_wait_returns_immediately(:WAIT_ON_JOB); + ts_bgw_params_mock_wait_returns_immediately +--------------------------------------------- + +(1 row) + +TRUNCATE public.bgw_log; +-- data before 8 +SELECT * FROM test_continuous_agg_view ORDER BY 1; + time_bucket | value +-------------+------- + 0 | 1 + 2 | 5 + 4 | 9 +(3 rows) + +-- invalidations test by running job multiple times +SELECT ts_bgw_params_reset_time(); + ts_bgw_params_reset_time +-------------------------- + +(1 row) + +DROP MATERIALIZED VIEW test_continuous_agg_view; +psql:include/cagg_bgw_common.sql:234: NOTICE: drop cascades to table _timescaledb_internal._hyper_2_3_chunk +CREATE MATERIALIZED VIEW test_continuous_agg_view + WITH (timescaledb.continuous, + timescaledb.materialized_only=true) + AS SELECT time_bucket('2', time), SUM(data) as value + FROM test_continuous_agg_table + GROUP BY 1 WITH NO DATA; +SELECT add_continuous_aggregate_policy('test_continuous_agg_view', 100::integer, -2::integer, '12 h'::interval); + add_continuous_aggregate_policy +--------------------------------- + 1001 +(1 row) + +SELECT mat_hypertable_id FROM _timescaledb_catalog.continuous_agg \gset +SELECT id AS job_id FROM _timescaledb_config.bgw_job WHERE hypertable_id=:mat_hypertable_id \gset +SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(25); + ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish +------------------------------------------------------------ + +(1 row) + +SELECT * FROM sorted_bgw_log; + msg_no | mock_time | application_name | msg +--------+-----------+--------------------------------------------+-------------------------------------------------------------------------------------------------------- + 0 | 0 | DB Scheduler | [TESTING] Registered new background worker + 1 | 0 | DB Scheduler | [TESTING] Wait until (RANDOM), started at (RANDOM) + 0 | 0 | Refresh Continuous Aggregate Policy [1001] | refreshing continuous aggregate "test_continuous_agg_view" in window [ -90, 12 ] + 1 | 0 | Refresh Continuous Aggregate Policy [1001] | deleted 0 row(s) from materialization table "_timescaledb_internal._materialized_hypertable_3" + 2 | 0 | Refresh Continuous Aggregate Policy [1001] | inserted 6 row(s) into materialization table "_timescaledb_internal._materialized_hypertable_3" + 3 | 0 | Refresh Continuous Aggregate Policy [1001] | job 1001 (Refresh Continuous Aggregate Policy [1001]) exiting with success: execution time (RANDOM) ms +(6 rows) + +-- job ran once, successfully +SELECT job_id, last_finish - next_start as until_next, last_run_success, total_runs, total_successes, total_failures, total_crashes + FROM _timescaledb_internal.bgw_job_stat + where job_id=:job_id; + job_id | until_next | last_run_success | total_runs | total_successes | total_failures | total_crashes +--------+----------------+------------------+------------+-----------------+----------------+--------------- + 1001 | @ 12 hours ago | t | 1 | 1 | 0 | 0 +(1 row) + +-- should have refreshed everything we have so far +SELECT * FROM test_continuous_agg_view ORDER BY 1; + time_bucket | value +-------------+------- + 0 | 1 + 2 | 5 + 4 | 9 + 6 | 13 + 8 | 17 + 10 | 10 +(6 rows) + +-- invalidate some data +UPDATE test_continuous_agg_table +SET data = 11 WHERE time = 6; +--advance time by 12h so that job runs one more time +SELECT ts_bgw_params_reset_time(extract(epoch from interval '12 hour')::bigint * 1000000, true); + ts_bgw_params_reset_time +-------------------------- + +(1 row) + +SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(25, 25); + ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish +------------------------------------------------------------ + +(1 row) + +SELECT * FROM sorted_bgw_log; + msg_no | mock_time | application_name | msg +--------+-------------+--------------------------------------------+-------------------------------------------------------------------------------------------------------- + 0 | 0 | DB Scheduler | [TESTING] Registered new background worker + 1 | 0 | DB Scheduler | [TESTING] Wait until (RANDOM), started at (RANDOM) + 0 | 0 | Refresh Continuous Aggregate Policy [1001] | refreshing continuous aggregate "test_continuous_agg_view" in window [ -90, 12 ] + 1 | 0 | Refresh Continuous Aggregate Policy [1001] | deleted 0 row(s) from materialization table "_timescaledb_internal._materialized_hypertable_3" + 2 | 0 | Refresh Continuous Aggregate Policy [1001] | inserted 6 row(s) into materialization table "_timescaledb_internal._materialized_hypertable_3" + 3 | 0 | Refresh Continuous Aggregate Policy [1001] | job 1001 (Refresh Continuous Aggregate Policy [1001]) exiting with success: execution time (RANDOM) ms + 0 | 43200000000 | DB Scheduler | [TESTING] Registered new background worker + 1 | 43200000000 | DB Scheduler | [TESTING] Wait until (RANDOM), started at (RANDOM) + 0 | 43200000000 | Refresh Continuous Aggregate Policy [1001] | refreshing continuous aggregate "test_continuous_agg_view" in window [ -90, 12 ] + 1 | 43200000000 | Refresh Continuous Aggregate Policy [1001] | deleted 1 row(s) from materialization table "_timescaledb_internal._materialized_hypertable_3" + 2 | 43200000000 | Refresh Continuous Aggregate Policy [1001] | inserted 1 row(s) into materialization table "_timescaledb_internal._materialized_hypertable_3" + 3 | 43200000000 | Refresh Continuous Aggregate Policy [1001] | job 1001 (Refresh Continuous Aggregate Policy [1001]) exiting with success: execution time (RANDOM) ms +(12 rows) + +SELECT job_id, next_start - last_finish as until_next, last_run_success, total_runs, total_successes, total_failures, total_crashes + FROM _timescaledb_internal.bgw_job_stat + where job_id=:job_id; + job_id | until_next | last_run_success | total_runs | total_successes | total_failures | total_crashes +--------+------------+------------------+------------+-----------------+----------------+--------------- + 1001 | @ 12 hours | t | 2 | 2 | 0 | 0 +(1 row) + +-- should have updated data for time=6 +SELECT * FROM test_continuous_agg_view ORDER BY 1; + time_bucket | value +-------------+------- + 0 | 1 + 2 | 5 + 4 | 9 + 6 | 18 + 8 | 17 + 10 | 10 +(6 rows) + +\x on +--check the information views -- +select view_name, view_owner, materialization_hypertable_schema, materialization_hypertable_name +from timescaledb_information.continuous_aggregates +where view_name::text like '%test_continuous_agg_view'; +-[ RECORD 1 ]---------------------+--------------------------- +view_name | test_continuous_agg_view +view_owner | default_perm_user +materialization_hypertable_schema | _timescaledb_internal +materialization_hypertable_name | _materialized_hypertable_3 + +select view_name, view_definition from timescaledb_information.continuous_aggregates +where view_name::text like '%test_continuous_agg_view'; +-[ RECORD 1 ]---+------------------------------------------------------------------------- +view_name | test_continuous_agg_view +view_definition | SELECT time_bucket(2, test_continuous_agg_table."time") AS time_bucket,+ + | sum(test_continuous_agg_table.data) AS value + + | FROM test_continuous_agg_table + + | GROUP BY (time_bucket(2, test_continuous_agg_table."time")); + +select job_status, last_run_duration +from timescaledb_information.job_stats ps, timescaledb_information.continuous_aggregates cagg +where cagg.view_name::text like '%test_continuous_agg_view' +and cagg.materialization_hypertable_name = ps.hypertable_name; +-[ RECORD 1 ]-----+---------- +job_status | Scheduled +last_run_duration | + +\x off +DROP MATERIALIZED VIEW test_continuous_agg_view; +psql:include/cagg_bgw_common.sql:294: NOTICE: drop cascades to table _timescaledb_internal._hyper_3_4_chunk +--create a view with a function that it has no permission to execute +CREATE MATERIALIZED VIEW test_continuous_agg_view + WITH (timescaledb.continuous, + timescaledb.materialized_only=true) + AS SELECT time_bucket('2', time), SUM(data) as value, get_constant_no_perms() + FROM test_continuous_agg_table + GROUP BY 1 WITH NO DATA; +SELECT add_continuous_aggregate_policy('test_continuous_agg_view', 100::integer, -2::integer, '12 h'::interval); + add_continuous_aggregate_policy +--------------------------------- + 1002 +(1 row) + +SELECT id AS job_id FROM _timescaledb_config.bgw_job ORDER BY id desc limit 1 \gset +SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(25); + ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish +------------------------------------------------------------ + +(1 row) + +-- job fails +SELECT job_id, last_run_success, total_runs, total_successes, total_failures, total_crashes + FROM _timescaledb_internal.bgw_job_stat + where job_id=:job_id; + job_id | last_run_success | total_runs | total_successes | total_failures | total_crashes +--------+------------------+------------+-----------------+----------------+--------------- + 1002 | f | 1 | 0 | 1 | 0 +(1 row) + +DROP MATERIALIZED VIEW test_continuous_agg_view; +--advance clock to quit scheduler +SELECT ts_bgw_params_reset_time(extract(epoch from interval '25 hour')::bigint * 1000000, true); + ts_bgw_params_reset_time +-------------------------- + +(1 row) + +select ts_bgw_db_scheduler_test_wait_for_scheduler_finish(); + ts_bgw_db_scheduler_test_wait_for_scheduler_finish +---------------------------------------------------- + +(1 row) + +SELECT ts_bgw_params_mock_wait_returns_immediately(:WAIT_ON_JOB); + ts_bgw_params_mock_wait_returns_immediately +--------------------------------------------- + +(1 row) + +--clear log for next run of the scheduler +TRUNCATE public.bgw_log; +SELECT ts_bgw_params_reset_time(); + ts_bgw_params_reset_time +-------------------------- + +(1 row) + +-- +-- Test creating continuous aggregate with a user that is the non-owner of the raw table +-- +CREATE TABLE test_continuous_agg_table_w_grant(time int, data int); +\if :IS_DISTRIBUTED +SELECT create_distributed_hypertable('test_continuous_agg_table_w_grant', 'time', chunk_time_interval => 10, replication_factor => 2); +psql:include/cagg_bgw_common.sql:330: NOTICE: adding not-null constraint to column "time" + create_distributed_hypertable +------------------------------------------------ + (5,public,test_continuous_agg_table_w_grant,t) +(1 row) + +\else +SELECT create_hypertable('test_continuous_agg_table_w_grant', 'time', chunk_time_interval => 10); +\endif +CREATE OR REPLACE FUNCTION integer_now_test1() returns int LANGUAGE SQL STABLE as $$ SELECT coalesce(max(time), 0) FROM test_continuous_agg_table_w_grant $$; +\if :IS_DISTRIBUTED +CALL distributed_exec($DIST$ +CREATE OR REPLACE FUNCTION integer_now_test1() returns int LANGUAGE SQL STABLE as $$ SELECT coalesce(max(time), 0) FROM test_continuous_agg_table_w_grant $$; +$DIST$); +\endif +SELECT set_integer_now_func('test_continuous_agg_table_w_grant', 'integer_now_test1'); + set_integer_now_func +---------------------- + +(1 row) + +GRANT SELECT, TRIGGER ON test_continuous_agg_table_w_grant TO public; +INSERT INTO test_continuous_agg_table_w_grant + SELECT 1 , 1; +\c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER_2 +-- make sure view can be created +CREATE MATERIALIZED VIEW test_continuous_agg_view_user_2 + WITH ( timescaledb.continuous, + timescaledb.materialized_only=true) + AS SELECT time_bucket('2', time), SUM(data) as value + FROM test_continuous_agg_table_w_grant + GROUP BY 1 WITH NO DATA; +SELECT add_continuous_aggregate_policy('test_continuous_agg_view_user_2', NULL, -2::integer, '12 h'::interval); + add_continuous_aggregate_policy +--------------------------------- + 1003 +(1 row) + +SELECT id AS job_id FROM _timescaledb_config.bgw_job ORDER BY id desc limit 1 \gset +SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(25); + ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish +------------------------------------------------------------ + +(1 row) + +SELECT id, owner FROM _timescaledb_config.bgw_job WHERE id = :job_id ; + id | owner +------+--------------------- + 1003 | default_perm_user_2 +(1 row) + +SELECT job_id, next_start - last_finish as until_next, last_run_success, total_runs, total_successes, total_failures, total_crashes + FROM _timescaledb_internal.bgw_job_stat + where job_id=:job_id; + job_id | until_next | last_run_success | total_runs | total_successes | total_failures | total_crashes +--------+------------+------------------+------------+-----------------+----------------+--------------- + 1003 | @ 12 hours | t | 1 | 1 | 0 | 0 +(1 row) + +--view is populated +SELECT * FROM test_continuous_agg_view_user_2 ORDER BY 1; + time_bucket | value +-------------+------- + 0 | 1 +(1 row) + +\c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER +--revoke permissions from the continuous agg view owner to select from raw table +--no further updates to cont agg should happen +REVOKE SELECT ON test_continuous_agg_table_w_grant FROM public; +--add new data to table +INSERT INTO test_continuous_agg_table_w_grant VALUES(5,1); +\c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER_2 +--advance time by 12h so that job tries to run one more time +SELECT ts_bgw_params_reset_time(extract(epoch from interval '12 hour')::bigint * 1000000, true); + ts_bgw_params_reset_time +-------------------------- + +(1 row) + +SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(25, 25); + ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish +------------------------------------------------------------ + +(1 row) + +--should show a failing execution because no longer has permissions (due to lack of permission on partial view owner's part) +SELECT job_id, last_run_success, total_runs, total_successes, total_failures, total_crashes + FROM _timescaledb_internal.bgw_job_stat + where job_id=:job_id; + job_id | last_run_success | total_runs | total_successes | total_failures | total_crashes +--------+------------------+------------+-----------------+----------------+--------------- + 1003 | f | 2 | 1 | 1 | 0 +(1 row) + +--view was NOT updated; but the old stuff is still there +SELECT * FROM test_continuous_agg_view_user_2; + time_bucket | value +-------------+------- + 0 | 1 +(1 row) + +\c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER +SELECT * from sorted_bgw_log; + msg_no | mock_time | application_name | msg +--------+-------------+--------------------------------------------+-------------------------------------------------------------------------------------------------------- + 0 | 0 | DB Scheduler | [TESTING] Registered new background worker + 1 | 0 | DB Scheduler | [TESTING] Wait until (RANDOM), started at (RANDOM) + 0 | 0 | Refresh Continuous Aggregate Policy [1003] | refreshing continuous aggregate "test_continuous_agg_view_user_2" in window [ -2147483648, 2 ] + 1 | 0 | Refresh Continuous Aggregate Policy [1003] | deleted 0 row(s) from materialization table "_timescaledb_internal._materialized_hypertable_6" + 2 | 0 | Refresh Continuous Aggregate Policy [1003] | inserted 1 row(s) into materialization table "_timescaledb_internal._materialized_hypertable_6" + 3 | 0 | Refresh Continuous Aggregate Policy [1003] | job 1003 (Refresh Continuous Aggregate Policy [1003]) exiting with success: execution time (RANDOM) ms + 0 | 43200000000 | DB Scheduler | [TESTING] Registered new background worker + 1 | 43200000000 | DB Scheduler | [TESTING] Wait until (RANDOM), started at (RANDOM) + 0 | 43200000000 | Refresh Continuous Aggregate Policy [1003] | job 1003 threw an error + 1 | 43200000000 | Refresh Continuous Aggregate Policy [1003] | permission denied for table test_continuous_agg_table_w_grant +(10 rows) + +-- Count the number of continuous aggregate policies +SELECT count(*) FROM _timescaledb_config.bgw_job + WHERE proc_schema = '_timescaledb_functions' + AND proc_name = 'policy_refresh_continuous_aggregate'; + count +------- + 1 +(1 row) + +-- cleanup +\c :TEST_DBNAME :ROLE_CLUSTER_SUPERUSER; +DROP DATABASE :DATA_NODE_1 WITH (FORCE); +DROP DATABASE :DATA_NODE_2 WITH (FORCE); +DROP DATABASE :DATA_NODE_3 WITH (FORCE); diff --git a/tsl/test/expected/cagg_bgw_dist_ht-15.out b/tsl/test/expected/cagg_bgw_dist_ht-15.out new file mode 100644 index 00000000000..e7ca0d9ee6c --- /dev/null +++ b/tsl/test/expected/cagg_bgw_dist_ht-15.out @@ -0,0 +1,744 @@ +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. +------------------------------------ +-- Set up a distributed environment +------------------------------------ +\c :TEST_DBNAME :ROLE_CLUSTER_SUPERUSER +\set DATA_NODE_1 :TEST_DBNAME _1 +\set DATA_NODE_2 :TEST_DBNAME _2 +\set DATA_NODE_3 :TEST_DBNAME _3 +\ir include/remote_exec.sql +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. +CREATE SCHEMA IF NOT EXISTS test; +psql:include/remote_exec.sql:5: NOTICE: schema "test" already exists, skipping +GRANT USAGE ON SCHEMA test TO PUBLIC; +CREATE OR REPLACE FUNCTION test.remote_exec(srv_name name[], command text) +RETURNS VOID +AS :TSL_MODULE_PATHNAME, 'ts_remote_exec' +LANGUAGE C; +CREATE OR REPLACE FUNCTION test.remote_exec_get_result_strings(srv_name name[], command text) +RETURNS TABLE("table_record" CSTRING[]) +AS :TSL_MODULE_PATHNAME, 'ts_remote_exec_get_result_strings' +LANGUAGE C; +SELECT node_name, database, node_created, database_created, extension_created +FROM ( + SELECT (add_data_node(name, host => 'localhost', DATABASE => name)).* + FROM (VALUES (:'DATA_NODE_1'), (:'DATA_NODE_2'), (:'DATA_NODE_3')) v(name) +) a; + node_name | database | node_created | database_created | extension_created +-----------------------+-----------------------+--------------+------------------+------------------- + db_cagg_bgw_dist_ht_1 | db_cagg_bgw_dist_ht_1 | t | t | t + db_cagg_bgw_dist_ht_2 | db_cagg_bgw_dist_ht_2 | t | t | t + db_cagg_bgw_dist_ht_3 | db_cagg_bgw_dist_ht_3 | t | t | t +(3 rows) + +GRANT USAGE ON FOREIGN SERVER :DATA_NODE_1, :DATA_NODE_2, :DATA_NODE_3 TO PUBLIC; +-- though user on access node has required GRANTS, this will propagate GRANTS to the connected data nodes +GRANT CREATE ON SCHEMA public TO :ROLE_DEFAULT_PERM_USER; +\set IS_DISTRIBUTED TRUE +\ir include/cagg_bgw_common.sql +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. +-- +-- Setup +-- +\c :TEST_DBNAME :ROLE_CLUSTER_SUPERUSER +CREATE OR REPLACE FUNCTION ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(timeout INT = -1, mock_start_time INT = 0) RETURNS VOID +AS :MODULE_PATHNAME LANGUAGE C VOLATILE; +CREATE OR REPLACE FUNCTION ts_bgw_db_scheduler_test_run(timeout INT = -1, mock_start_time INT = 0) RETURNS VOID +AS :MODULE_PATHNAME LANGUAGE C VOLATILE; +CREATE OR REPLACE FUNCTION ts_bgw_db_scheduler_test_wait_for_scheduler_finish() RETURNS VOID +AS :MODULE_PATHNAME LANGUAGE C VOLATILE; +CREATE OR REPLACE FUNCTION ts_bgw_params_create() RETURNS VOID +AS :MODULE_PATHNAME LANGUAGE C VOLATILE; +CREATE OR REPLACE FUNCTION ts_bgw_params_destroy() RETURNS VOID +AS :MODULE_PATHNAME LANGUAGE C VOLATILE; +CREATE OR REPLACE FUNCTION ts_bgw_params_reset_time(set_time BIGINT = 0, wait BOOLEAN = false) RETURNS VOID +AS :MODULE_PATHNAME LANGUAGE C VOLATILE; +--test that this all works under the community license +ALTER DATABASE :TEST_DBNAME SET timescaledb.license_key='Community'; +--create a function with no permissions to execute +CREATE FUNCTION get_constant_no_perms() RETURNS INTEGER LANGUAGE SQL IMMUTABLE AS +$BODY$ + SELECT 10; +$BODY$; +REVOKE EXECUTE ON FUNCTION get_constant_no_perms() FROM PUBLIC; +\set WAIT_ON_JOB 0 +\set IMMEDIATELY_SET_UNTIL 1 +\set WAIT_FOR_OTHER_TO_ADVANCE 2 +CREATE OR REPLACE FUNCTION ts_bgw_params_mock_wait_returns_immediately(new_val INTEGER) RETURNS VOID +AS :MODULE_PATHNAME LANGUAGE C VOLATILE; +-- Remove any default jobs, e.g., telemetry +DELETE FROM _timescaledb_config.bgw_job WHERE TRUE; +TRUNCATE _timescaledb_internal.bgw_job_stat; +\c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER +CREATE TABLE public.bgw_log( + msg_no INT, + mock_time BIGINT, + application_name TEXT, + msg TEXT +); +CREATE VIEW sorted_bgw_log AS + SELECT msg_no, + mock_time, + application_name, + regexp_replace(regexp_replace(msg, '(Wait until|started at|execution time) [0-9]+(\.[0-9]+)?', '\1 (RANDOM)', 'g'), 'background worker "[^"]+"','connection') AS msg + FROM bgw_log ORDER BY mock_time, application_name COLLATE "C", msg_no; +CREATE TABLE public.bgw_dsm_handle_store( + handle BIGINT +); +INSERT INTO public.bgw_dsm_handle_store VALUES (0); +SELECT ts_bgw_params_create(); + ts_bgw_params_create +---------------------- + +(1 row) + +SELECT * FROM _timescaledb_config.bgw_job; + id | application_name | schedule_interval | max_runtime | max_retries | retry_period | proc_schema | proc_name | owner | scheduled | fixed_schedule | initial_start | hypertable_id | config | check_schema | check_name | timezone +----+------------------+-------------------+-------------+-------------+--------------+-------------+-----------+-------+-----------+----------------+---------------+---------------+--------+--------------+------------+---------- +(0 rows) + +SELECT * FROM timescaledb_information.job_stats; + hypertable_schema | hypertable_name | job_id | last_run_started_at | last_successful_finish | last_run_status | job_status | last_run_duration | next_start | total_runs | total_successes | total_failures +-------------------+-----------------+--------+---------------------+------------------------+-----------------+------------+-------------------+------------+------------+-----------------+---------------- +(0 rows) + +SELECT * FROM _timescaledb_catalog.continuous_agg; + mat_hypertable_id | raw_hypertable_id | parent_mat_hypertable_id | user_view_schema | user_view_name | partial_view_schema | partial_view_name | bucket_width | direct_view_schema | direct_view_name | materialized_only | finalized +-------------------+-------------------+--------------------------+------------------+----------------+---------------------+-------------------+--------------+--------------------+------------------+-------------------+----------- +(0 rows) + +-- though user on access node has required GRANTS, this will propagate GRANTS to the connected data nodes +GRANT CREATE ON SCHEMA public TO :ROLE_DEFAULT_PERM_USER; +psql:include/cagg_bgw_common.sql:76: WARNING: no privileges were granted for "public" +\c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER +CREATE TABLE test_continuous_agg_table(time int, data int); +\if :IS_DISTRIBUTED +SELECT create_distributed_hypertable('test_continuous_agg_table', 'time', chunk_time_interval => 10, replication_factor => 2); +psql:include/cagg_bgw_common.sql:80: NOTICE: adding not-null constraint to column "time" + create_distributed_hypertable +---------------------------------------- + (1,public,test_continuous_agg_table,t) +(1 row) + +\else +SELECT create_hypertable('test_continuous_agg_table', 'time', chunk_time_interval => 10); +\endif +CREATE OR REPLACE FUNCTION integer_now_test() returns int LANGUAGE SQL STABLE as $$ SELECT coalesce(max(time), 0) FROM test_continuous_agg_table $$; +\if :IS_DISTRIBUTED +CALL distributed_exec($DIST$ +CREATE OR REPLACE FUNCTION integer_now_test() returns int LANGUAGE SQL STABLE as $$ SELECT coalesce(max(time), 0) FROM test_continuous_agg_table $$; +$DIST$); +\endif +SELECT set_integer_now_func('test_continuous_agg_table', 'integer_now_test'); + set_integer_now_func +---------------------- + +(1 row) + +CREATE MATERIALIZED VIEW test_continuous_agg_view + WITH (timescaledb.continuous, timescaledb.materialized_only=true) + AS SELECT time_bucket('2', time), SUM(data) as value + FROM test_continuous_agg_table + GROUP BY 1 WITH NO DATA; +SELECT add_continuous_aggregate_policy('test_continuous_agg_view', NULL, 4::integer, '12 h'::interval); + add_continuous_aggregate_policy +--------------------------------- + 1000 +(1 row) + +SELECT id as raw_table_id FROM _timescaledb_catalog.hypertable WHERE table_name='test_continuous_agg_table' \gset +-- min distance from end should be 1 +SELECT mat_hypertable_id, user_view_schema, user_view_name, bucket_width +FROM _timescaledb_catalog.continuous_agg; + mat_hypertable_id | user_view_schema | user_view_name | bucket_width +-------------------+------------------+--------------------------+-------------- + 2 | public | test_continuous_agg_view | 2 +(1 row) + +SELECT mat_hypertable_id FROM _timescaledb_catalog.continuous_agg \gset +SELECT id AS job_id FROM _timescaledb_config.bgw_job where hypertable_id=:mat_hypertable_id \gset +-- job was created +SELECT * FROM _timescaledb_config.bgw_job where hypertable_id=:mat_hypertable_id; + id | application_name | schedule_interval | max_runtime | max_retries | retry_period | proc_schema | proc_name | owner | scheduled | fixed_schedule | initial_start | hypertable_id | config | check_schema | check_name | timezone +------+--------------------------------------------+-------------------+-------------+-------------+--------------+------------------------+-------------------------------------+-------------------+-----------+----------------+---------------+---------------+-----------------------------------------------------------------+------------------------+-------------------------------------------+---------- + 1000 | Refresh Continuous Aggregate Policy [1000] | @ 12 hours | @ 0 | -1 | @ 12 hours | _timescaledb_functions | policy_refresh_continuous_aggregate | default_perm_user | t | f | | 2 | {"end_offset": 4, "start_offset": null, "mat_hypertable_id": 2} | _timescaledb_functions | policy_refresh_continuous_aggregate_check | +(1 row) + +-- create 10 time buckets +INSERT INTO test_continuous_agg_table + SELECT i, i FROM + (SELECT generate_series(0, 10) as i) AS j; +-- no stats +SELECT job_id, next_start, last_finish as until_next, last_run_success, total_runs, total_successes, total_failures, total_crashes + FROM _timescaledb_internal.bgw_job_stat + ORDER BY job_id; + job_id | next_start | until_next | last_run_success | total_runs | total_successes | total_failures | total_crashes +--------+------------+------------+------------------+------------+-----------------+----------------+--------------- +(0 rows) + +-- no data in view +SELECT * FROM test_continuous_agg_view ORDER BY 1; + time_bucket | value +-------------+------- +(0 rows) + +-- run first time +SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(25); + ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish +------------------------------------------------------------ + +(1 row) + +SELECT * FROM sorted_bgw_log; + msg_no | mock_time | application_name | msg +--------+-----------+--------------------------------------------+-------------------------------------------------------------------------------------------------------- + 0 | 0 | DB Scheduler | [TESTING] Registered new background worker + 1 | 0 | DB Scheduler | [TESTING] Wait until (RANDOM), started at (RANDOM) + 0 | 0 | Refresh Continuous Aggregate Policy [1000] | refreshing continuous aggregate "test_continuous_agg_view" in window [ -2147483648, 6 ] + 1 | 0 | Refresh Continuous Aggregate Policy [1000] | deleted 0 row(s) from materialization table "_timescaledb_internal._materialized_hypertable_2" + 2 | 0 | Refresh Continuous Aggregate Policy [1000] | inserted 3 row(s) into materialization table "_timescaledb_internal._materialized_hypertable_2" + 3 | 0 | Refresh Continuous Aggregate Policy [1000] | job 1000 (Refresh Continuous Aggregate Policy [1000]) exiting with success: execution time (RANDOM) ms +(6 rows) + +SELECT * FROM _timescaledb_config.bgw_job where id=:job_id; + id | application_name | schedule_interval | max_runtime | max_retries | retry_period | proc_schema | proc_name | owner | scheduled | fixed_schedule | initial_start | hypertable_id | config | check_schema | check_name | timezone +------+--------------------------------------------+-------------------+-------------+-------------+--------------+------------------------+-------------------------------------+-------------------+-----------+----------------+---------------+---------------+-----------------------------------------------------------------+------------------------+-------------------------------------------+---------- + 1000 | Refresh Continuous Aggregate Policy [1000] | @ 12 hours | @ 0 | -1 | @ 12 hours | _timescaledb_functions | policy_refresh_continuous_aggregate | default_perm_user | t | f | | 2 | {"end_offset": 4, "start_offset": null, "mat_hypertable_id": 2} | _timescaledb_functions | policy_refresh_continuous_aggregate_check | +(1 row) + +-- job ran once, successfully +SELECT job_id, next_start-last_finish as until_next, last_run_success, total_runs, total_successes, total_failures, total_crashes + FROM _timescaledb_internal.bgw_job_stat + where job_id=:job_id; + job_id | until_next | last_run_success | total_runs | total_successes | total_failures | total_crashes +--------+------------+------------------+------------+-----------------+----------------+--------------- + 1000 | @ 12 hours | t | 1 | 1 | 0 | 0 +(1 row) + +--clear log for next run of scheduler. +TRUNCATE public.bgw_log; +CREATE FUNCTION wait_for_timer_to_run(started_at INTEGER, spins INTEGER=:TEST_SPINWAIT_ITERS) RETURNS BOOLEAN LANGUAGE PLPGSQL AS +$BODY$ +DECLARE + num_runs INTEGER; + message TEXT; +BEGIN + select format('[TESTING] Wait until %%, started at %s', started_at) into message; + FOR i in 1..spins + LOOP + SELECT COUNT(*) from bgw_log where msg LIKE message INTO num_runs; + if (num_runs > 0) THEN + RETURN true; + ELSE + PERFORM pg_sleep(0.1); + END IF; + END LOOP; + RETURN false; +END +$BODY$; +CREATE FUNCTION wait_for_job_to_run(job_param_id INTEGER, expected_runs INTEGER, spins INTEGER=:TEST_SPINWAIT_ITERS) RETURNS BOOLEAN LANGUAGE PLPGSQL AS +$BODY$ +DECLARE + num_runs INTEGER; +BEGIN + FOR i in 1..spins + LOOP + SELECT total_successes FROM _timescaledb_internal.bgw_job_stat WHERE job_id=job_param_id INTO num_runs; + if (num_runs = expected_runs) THEN + RETURN true; + ELSEIF (num_runs > expected_runs) THEN + RAISE 'num_runs > expected'; + ELSE + PERFORM pg_sleep(0.1); + END IF; + END LOOP; + RETURN false; +END +$BODY$; +--make sure there is 1 job to start with +SELECT wait_for_job_to_run(:job_id, 1); + wait_for_job_to_run +--------------------- + t +(1 row) + +SELECT ts_bgw_params_mock_wait_returns_immediately(:WAIT_FOR_OTHER_TO_ADVANCE); + ts_bgw_params_mock_wait_returns_immediately +--------------------------------------------- + +(1 row) + +--start the scheduler on 0 time +SELECT ts_bgw_params_reset_time(0, true); + ts_bgw_params_reset_time +-------------------------- + +(1 row) + +SELECT ts_bgw_db_scheduler_test_run(extract(epoch from interval '24 hour')::int * 1000, 0); + ts_bgw_db_scheduler_test_run +------------------------------ + +(1 row) + +SELECT wait_for_timer_to_run(0); + wait_for_timer_to_run +----------------------- + t +(1 row) + +--advance to 12:00 so that it runs one more time; now we know the +--scheduler has loaded up the job with the old schedule_interval +SELECT ts_bgw_params_reset_time(extract(epoch from interval '12 hour')::bigint * 1000000, true); + ts_bgw_params_reset_time +-------------------------- + +(1 row) + +SELECT wait_for_job_to_run(:job_id, 2); + wait_for_job_to_run +--------------------- + t +(1 row) + +--advance clock 1us to make the scheduler realize the job is done +SELECT ts_bgw_params_reset_time((extract(epoch from interval '12 hour')::bigint * 1000000)+1, true); + ts_bgw_params_reset_time +-------------------------- + +(1 row) + +--alter the refresh interval and check if next_start is altered +SELECT alter_job(:job_id, schedule_interval => '1m', retry_period => '1m'); + alter_job +------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + (1000,"@ 1 min","@ 0",-1,"@ 1 min",t,"{""end_offset"": 4, ""start_offset"": null, ""mat_hypertable_id"": 2}","Sat Jan 01 04:01:00 2000 PST",_timescaledb_functions.policy_refresh_continuous_aggregate_check,f,,) +(1 row) + +SELECT job_id, next_start - last_finish as until_next, total_runs +FROM _timescaledb_internal.bgw_job_stat +WHERE job_id=:job_id;; + job_id | until_next | total_runs +--------+------------+------------ + 1000 | @ 1 min | 2 +(1 row) + +--advance to 12:02, job should have run at 12:01 +SELECT ts_bgw_params_reset_time((extract(epoch from interval '12 hour')::bigint * 1000000)+(extract(epoch from interval '2 minute')::bigint * 1000000), true); + ts_bgw_params_reset_time +-------------------------- + +(1 row) + +SELECT wait_for_job_to_run(:job_id, 3); + wait_for_job_to_run +--------------------- + t +(1 row) + +--next run in 1 minute +SELECT job_id, next_start-last_finish as until_next, total_runs +FROM _timescaledb_internal.bgw_job_stat +WHERE job_id=:job_id; + job_id | until_next | total_runs +--------+------------+------------ + 1000 | @ 1 min | 3 +(1 row) + +--change next run to be after 30s instead +SELECT (next_start - '30s'::interval) AS "NEW_NEXT_START" +FROM _timescaledb_internal.bgw_job_stat +WHERE job_id=:job_id \gset +SELECT alter_job(:job_id, next_start => :'NEW_NEXT_START'); + alter_job +------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + (1000,"@ 1 min","@ 0",-1,"@ 1 min",t,"{""end_offset"": 4, ""start_offset"": null, ""mat_hypertable_id"": 2}","Sat Jan 01 04:02:30 2000 PST",_timescaledb_functions.policy_refresh_continuous_aggregate_check,f,,) +(1 row) + +SELECT ts_bgw_params_reset_time((extract(epoch from interval '12 hour')::bigint * 1000000)+(extract(epoch from interval '2 minute 30 seconds')::bigint * 1000000), true); + ts_bgw_params_reset_time +-------------------------- + +(1 row) + +SELECT wait_for_job_to_run(:job_id, 4); + wait_for_job_to_run +--------------------- + t +(1 row) + +--advance clock to quit scheduler +SELECT ts_bgw_params_reset_time(extract(epoch from interval '25 hour')::bigint * 1000000, true); + ts_bgw_params_reset_time +-------------------------- + +(1 row) + +select ts_bgw_db_scheduler_test_wait_for_scheduler_finish(); + ts_bgw_db_scheduler_test_wait_for_scheduler_finish +---------------------------------------------------- + +(1 row) + +SELECT ts_bgw_params_mock_wait_returns_immediately(:WAIT_ON_JOB); + ts_bgw_params_mock_wait_returns_immediately +--------------------------------------------- + +(1 row) + +TRUNCATE public.bgw_log; +-- data before 8 +SELECT * FROM test_continuous_agg_view ORDER BY 1; + time_bucket | value +-------------+------- + 0 | 1 + 2 | 5 + 4 | 9 +(3 rows) + +-- invalidations test by running job multiple times +SELECT ts_bgw_params_reset_time(); + ts_bgw_params_reset_time +-------------------------- + +(1 row) + +DROP MATERIALIZED VIEW test_continuous_agg_view; +psql:include/cagg_bgw_common.sql:234: NOTICE: drop cascades to table _timescaledb_internal._hyper_2_3_chunk +CREATE MATERIALIZED VIEW test_continuous_agg_view + WITH (timescaledb.continuous, + timescaledb.materialized_only=true) + AS SELECT time_bucket('2', time), SUM(data) as value + FROM test_continuous_agg_table + GROUP BY 1 WITH NO DATA; +SELECT add_continuous_aggregate_policy('test_continuous_agg_view', 100::integer, -2::integer, '12 h'::interval); + add_continuous_aggregate_policy +--------------------------------- + 1001 +(1 row) + +SELECT mat_hypertable_id FROM _timescaledb_catalog.continuous_agg \gset +SELECT id AS job_id FROM _timescaledb_config.bgw_job WHERE hypertable_id=:mat_hypertable_id \gset +SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(25); + ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish +------------------------------------------------------------ + +(1 row) + +SELECT * FROM sorted_bgw_log; + msg_no | mock_time | application_name | msg +--------+-----------+--------------------------------------------+-------------------------------------------------------------------------------------------------------- + 0 | 0 | DB Scheduler | [TESTING] Registered new background worker + 1 | 0 | DB Scheduler | [TESTING] Wait until (RANDOM), started at (RANDOM) + 0 | 0 | Refresh Continuous Aggregate Policy [1001] | refreshing continuous aggregate "test_continuous_agg_view" in window [ -90, 12 ] + 1 | 0 | Refresh Continuous Aggregate Policy [1001] | deleted 0 row(s) from materialization table "_timescaledb_internal._materialized_hypertable_3" + 2 | 0 | Refresh Continuous Aggregate Policy [1001] | inserted 6 row(s) into materialization table "_timescaledb_internal._materialized_hypertable_3" + 3 | 0 | Refresh Continuous Aggregate Policy [1001] | job 1001 (Refresh Continuous Aggregate Policy [1001]) exiting with success: execution time (RANDOM) ms +(6 rows) + +-- job ran once, successfully +SELECT job_id, last_finish - next_start as until_next, last_run_success, total_runs, total_successes, total_failures, total_crashes + FROM _timescaledb_internal.bgw_job_stat + where job_id=:job_id; + job_id | until_next | last_run_success | total_runs | total_successes | total_failures | total_crashes +--------+----------------+------------------+------------+-----------------+----------------+--------------- + 1001 | @ 12 hours ago | t | 1 | 1 | 0 | 0 +(1 row) + +-- should have refreshed everything we have so far +SELECT * FROM test_continuous_agg_view ORDER BY 1; + time_bucket | value +-------------+------- + 0 | 1 + 2 | 5 + 4 | 9 + 6 | 13 + 8 | 17 + 10 | 10 +(6 rows) + +-- invalidate some data +UPDATE test_continuous_agg_table +SET data = 11 WHERE time = 6; +--advance time by 12h so that job runs one more time +SELECT ts_bgw_params_reset_time(extract(epoch from interval '12 hour')::bigint * 1000000, true); + ts_bgw_params_reset_time +-------------------------- + +(1 row) + +SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(25, 25); + ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish +------------------------------------------------------------ + +(1 row) + +SELECT * FROM sorted_bgw_log; + msg_no | mock_time | application_name | msg +--------+-------------+--------------------------------------------+-------------------------------------------------------------------------------------------------------- + 0 | 0 | DB Scheduler | [TESTING] Registered new background worker + 1 | 0 | DB Scheduler | [TESTING] Wait until (RANDOM), started at (RANDOM) + 0 | 0 | Refresh Continuous Aggregate Policy [1001] | refreshing continuous aggregate "test_continuous_agg_view" in window [ -90, 12 ] + 1 | 0 | Refresh Continuous Aggregate Policy [1001] | deleted 0 row(s) from materialization table "_timescaledb_internal._materialized_hypertable_3" + 2 | 0 | Refresh Continuous Aggregate Policy [1001] | inserted 6 row(s) into materialization table "_timescaledb_internal._materialized_hypertable_3" + 3 | 0 | Refresh Continuous Aggregate Policy [1001] | job 1001 (Refresh Continuous Aggregate Policy [1001]) exiting with success: execution time (RANDOM) ms + 0 | 43200000000 | DB Scheduler | [TESTING] Registered new background worker + 1 | 43200000000 | DB Scheduler | [TESTING] Wait until (RANDOM), started at (RANDOM) + 0 | 43200000000 | Refresh Continuous Aggregate Policy [1001] | refreshing continuous aggregate "test_continuous_agg_view" in window [ -90, 12 ] + 1 | 43200000000 | Refresh Continuous Aggregate Policy [1001] | deleted 1 row(s) from materialization table "_timescaledb_internal._materialized_hypertable_3" + 2 | 43200000000 | Refresh Continuous Aggregate Policy [1001] | inserted 1 row(s) into materialization table "_timescaledb_internal._materialized_hypertable_3" + 3 | 43200000000 | Refresh Continuous Aggregate Policy [1001] | job 1001 (Refresh Continuous Aggregate Policy [1001]) exiting with success: execution time (RANDOM) ms +(12 rows) + +SELECT job_id, next_start - last_finish as until_next, last_run_success, total_runs, total_successes, total_failures, total_crashes + FROM _timescaledb_internal.bgw_job_stat + where job_id=:job_id; + job_id | until_next | last_run_success | total_runs | total_successes | total_failures | total_crashes +--------+------------+------------------+------------+-----------------+----------------+--------------- + 1001 | @ 12 hours | t | 2 | 2 | 0 | 0 +(1 row) + +-- should have updated data for time=6 +SELECT * FROM test_continuous_agg_view ORDER BY 1; + time_bucket | value +-------------+------- + 0 | 1 + 2 | 5 + 4 | 9 + 6 | 18 + 8 | 17 + 10 | 10 +(6 rows) + +\x on +--check the information views -- +select view_name, view_owner, materialization_hypertable_schema, materialization_hypertable_name +from timescaledb_information.continuous_aggregates +where view_name::text like '%test_continuous_agg_view'; +-[ RECORD 1 ]---------------------+--------------------------- +view_name | test_continuous_agg_view +view_owner | default_perm_user +materialization_hypertable_schema | _timescaledb_internal +materialization_hypertable_name | _materialized_hypertable_3 + +select view_name, view_definition from timescaledb_information.continuous_aggregates +where view_name::text like '%test_continuous_agg_view'; +-[ RECORD 1 ]---+------------------------------------------------------------------------- +view_name | test_continuous_agg_view +view_definition | SELECT time_bucket(2, test_continuous_agg_table."time") AS time_bucket,+ + | sum(test_continuous_agg_table.data) AS value + + | FROM test_continuous_agg_table + + | GROUP BY (time_bucket(2, test_continuous_agg_table."time")); + +select job_status, last_run_duration +from timescaledb_information.job_stats ps, timescaledb_information.continuous_aggregates cagg +where cagg.view_name::text like '%test_continuous_agg_view' +and cagg.materialization_hypertable_name = ps.hypertable_name; +-[ RECORD 1 ]-----+---------- +job_status | Scheduled +last_run_duration | + +\x off +DROP MATERIALIZED VIEW test_continuous_agg_view; +psql:include/cagg_bgw_common.sql:294: NOTICE: drop cascades to table _timescaledb_internal._hyper_3_4_chunk +--create a view with a function that it has no permission to execute +CREATE MATERIALIZED VIEW test_continuous_agg_view + WITH (timescaledb.continuous, + timescaledb.materialized_only=true) + AS SELECT time_bucket('2', time), SUM(data) as value, get_constant_no_perms() + FROM test_continuous_agg_table + GROUP BY 1 WITH NO DATA; +SELECT add_continuous_aggregate_policy('test_continuous_agg_view', 100::integer, -2::integer, '12 h'::interval); + add_continuous_aggregate_policy +--------------------------------- + 1002 +(1 row) + +SELECT id AS job_id FROM _timescaledb_config.bgw_job ORDER BY id desc limit 1 \gset +SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(25); + ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish +------------------------------------------------------------ + +(1 row) + +-- job fails +SELECT job_id, last_run_success, total_runs, total_successes, total_failures, total_crashes + FROM _timescaledb_internal.bgw_job_stat + where job_id=:job_id; + job_id | last_run_success | total_runs | total_successes | total_failures | total_crashes +--------+------------------+------------+-----------------+----------------+--------------- + 1002 | f | 1 | 0 | 1 | 0 +(1 row) + +DROP MATERIALIZED VIEW test_continuous_agg_view; +--advance clock to quit scheduler +SELECT ts_bgw_params_reset_time(extract(epoch from interval '25 hour')::bigint * 1000000, true); + ts_bgw_params_reset_time +-------------------------- + +(1 row) + +select ts_bgw_db_scheduler_test_wait_for_scheduler_finish(); + ts_bgw_db_scheduler_test_wait_for_scheduler_finish +---------------------------------------------------- + +(1 row) + +SELECT ts_bgw_params_mock_wait_returns_immediately(:WAIT_ON_JOB); + ts_bgw_params_mock_wait_returns_immediately +--------------------------------------------- + +(1 row) + +--clear log for next run of the scheduler +TRUNCATE public.bgw_log; +SELECT ts_bgw_params_reset_time(); + ts_bgw_params_reset_time +-------------------------- + +(1 row) + +-- +-- Test creating continuous aggregate with a user that is the non-owner of the raw table +-- +CREATE TABLE test_continuous_agg_table_w_grant(time int, data int); +\if :IS_DISTRIBUTED +SELECT create_distributed_hypertable('test_continuous_agg_table_w_grant', 'time', chunk_time_interval => 10, replication_factor => 2); +psql:include/cagg_bgw_common.sql:330: NOTICE: adding not-null constraint to column "time" + create_distributed_hypertable +------------------------------------------------ + (5,public,test_continuous_agg_table_w_grant,t) +(1 row) + +\else +SELECT create_hypertable('test_continuous_agg_table_w_grant', 'time', chunk_time_interval => 10); +\endif +CREATE OR REPLACE FUNCTION integer_now_test1() returns int LANGUAGE SQL STABLE as $$ SELECT coalesce(max(time), 0) FROM test_continuous_agg_table_w_grant $$; +\if :IS_DISTRIBUTED +CALL distributed_exec($DIST$ +CREATE OR REPLACE FUNCTION integer_now_test1() returns int LANGUAGE SQL STABLE as $$ SELECT coalesce(max(time), 0) FROM test_continuous_agg_table_w_grant $$; +$DIST$); +\endif +SELECT set_integer_now_func('test_continuous_agg_table_w_grant', 'integer_now_test1'); + set_integer_now_func +---------------------- + +(1 row) + +GRANT SELECT, TRIGGER ON test_continuous_agg_table_w_grant TO public; +INSERT INTO test_continuous_agg_table_w_grant + SELECT 1 , 1; +\c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER_2 +-- make sure view can be created +CREATE MATERIALIZED VIEW test_continuous_agg_view_user_2 + WITH ( timescaledb.continuous, + timescaledb.materialized_only=true) + AS SELECT time_bucket('2', time), SUM(data) as value + FROM test_continuous_agg_table_w_grant + GROUP BY 1 WITH NO DATA; +SELECT add_continuous_aggregate_policy('test_continuous_agg_view_user_2', NULL, -2::integer, '12 h'::interval); + add_continuous_aggregate_policy +--------------------------------- + 1003 +(1 row) + +SELECT id AS job_id FROM _timescaledb_config.bgw_job ORDER BY id desc limit 1 \gset +SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(25); + ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish +------------------------------------------------------------ + +(1 row) + +SELECT id, owner FROM _timescaledb_config.bgw_job WHERE id = :job_id ; + id | owner +------+--------------------- + 1003 | default_perm_user_2 +(1 row) + +SELECT job_id, next_start - last_finish as until_next, last_run_success, total_runs, total_successes, total_failures, total_crashes + FROM _timescaledb_internal.bgw_job_stat + where job_id=:job_id; + job_id | until_next | last_run_success | total_runs | total_successes | total_failures | total_crashes +--------+------------+------------------+------------+-----------------+----------------+--------------- + 1003 | @ 12 hours | t | 1 | 1 | 0 | 0 +(1 row) + +--view is populated +SELECT * FROM test_continuous_agg_view_user_2 ORDER BY 1; + time_bucket | value +-------------+------- + 0 | 1 +(1 row) + +\c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER +--revoke permissions from the continuous agg view owner to select from raw table +--no further updates to cont agg should happen +REVOKE SELECT ON test_continuous_agg_table_w_grant FROM public; +--add new data to table +INSERT INTO test_continuous_agg_table_w_grant VALUES(5,1); +\c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER_2 +--advance time by 12h so that job tries to run one more time +SELECT ts_bgw_params_reset_time(extract(epoch from interval '12 hour')::bigint * 1000000, true); + ts_bgw_params_reset_time +-------------------------- + +(1 row) + +SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(25, 25); + ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish +------------------------------------------------------------ + +(1 row) + +--should show a failing execution because no longer has permissions (due to lack of permission on partial view owner's part) +SELECT job_id, last_run_success, total_runs, total_successes, total_failures, total_crashes + FROM _timescaledb_internal.bgw_job_stat + where job_id=:job_id; + job_id | last_run_success | total_runs | total_successes | total_failures | total_crashes +--------+------------------+------------+-----------------+----------------+--------------- + 1003 | f | 2 | 1 | 1 | 0 +(1 row) + +--view was NOT updated; but the old stuff is still there +SELECT * FROM test_continuous_agg_view_user_2; + time_bucket | value +-------------+------- + 0 | 1 +(1 row) + +\c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER +SELECT * from sorted_bgw_log; + msg_no | mock_time | application_name | msg +--------+-------------+--------------------------------------------+-------------------------------------------------------------------------------------------------------- + 0 | 0 | DB Scheduler | [TESTING] Registered new background worker + 1 | 0 | DB Scheduler | [TESTING] Wait until (RANDOM), started at (RANDOM) + 0 | 0 | Refresh Continuous Aggregate Policy [1003] | refreshing continuous aggregate "test_continuous_agg_view_user_2" in window [ -2147483648, 2 ] + 1 | 0 | Refresh Continuous Aggregate Policy [1003] | deleted 0 row(s) from materialization table "_timescaledb_internal._materialized_hypertable_6" + 2 | 0 | Refresh Continuous Aggregate Policy [1003] | inserted 1 row(s) into materialization table "_timescaledb_internal._materialized_hypertable_6" + 3 | 0 | Refresh Continuous Aggregate Policy [1003] | job 1003 (Refresh Continuous Aggregate Policy [1003]) exiting with success: execution time (RANDOM) ms + 0 | 43200000000 | DB Scheduler | [TESTING] Registered new background worker + 1 | 43200000000 | DB Scheduler | [TESTING] Wait until (RANDOM), started at (RANDOM) + 0 | 43200000000 | Refresh Continuous Aggregate Policy [1003] | job 1003 threw an error + 1 | 43200000000 | Refresh Continuous Aggregate Policy [1003] | permission denied for table test_continuous_agg_table_w_grant +(10 rows) + +-- Count the number of continuous aggregate policies +SELECT count(*) FROM _timescaledb_config.bgw_job + WHERE proc_schema = '_timescaledb_functions' + AND proc_name = 'policy_refresh_continuous_aggregate'; + count +------- + 1 +(1 row) + +-- cleanup +\c :TEST_DBNAME :ROLE_CLUSTER_SUPERUSER; +DROP DATABASE :DATA_NODE_1 WITH (FORCE); +DROP DATABASE :DATA_NODE_2 WITH (FORCE); +DROP DATABASE :DATA_NODE_3 WITH (FORCE); diff --git a/tsl/test/expected/cagg_bgw_dist_ht-16.out b/tsl/test/expected/cagg_bgw_dist_ht-16.out new file mode 100644 index 00000000000..1051d4f21ca --- /dev/null +++ b/tsl/test/expected/cagg_bgw_dist_ht-16.out @@ -0,0 +1,744 @@ +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. +------------------------------------ +-- Set up a distributed environment +------------------------------------ +\c :TEST_DBNAME :ROLE_CLUSTER_SUPERUSER +\set DATA_NODE_1 :TEST_DBNAME _1 +\set DATA_NODE_2 :TEST_DBNAME _2 +\set DATA_NODE_3 :TEST_DBNAME _3 +\ir include/remote_exec.sql +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. +CREATE SCHEMA IF NOT EXISTS test; +psql:include/remote_exec.sql:5: NOTICE: schema "test" already exists, skipping +GRANT USAGE ON SCHEMA test TO PUBLIC; +CREATE OR REPLACE FUNCTION test.remote_exec(srv_name name[], command text) +RETURNS VOID +AS :TSL_MODULE_PATHNAME, 'ts_remote_exec' +LANGUAGE C; +CREATE OR REPLACE FUNCTION test.remote_exec_get_result_strings(srv_name name[], command text) +RETURNS TABLE("table_record" CSTRING[]) +AS :TSL_MODULE_PATHNAME, 'ts_remote_exec_get_result_strings' +LANGUAGE C; +SELECT node_name, database, node_created, database_created, extension_created +FROM ( + SELECT (add_data_node(name, host => 'localhost', DATABASE => name)).* + FROM (VALUES (:'DATA_NODE_1'), (:'DATA_NODE_2'), (:'DATA_NODE_3')) v(name) +) a; + node_name | database | node_created | database_created | extension_created +-----------------------+-----------------------+--------------+------------------+------------------- + db_cagg_bgw_dist_ht_1 | db_cagg_bgw_dist_ht_1 | t | t | t + db_cagg_bgw_dist_ht_2 | db_cagg_bgw_dist_ht_2 | t | t | t + db_cagg_bgw_dist_ht_3 | db_cagg_bgw_dist_ht_3 | t | t | t +(3 rows) + +GRANT USAGE ON FOREIGN SERVER :DATA_NODE_1, :DATA_NODE_2, :DATA_NODE_3 TO PUBLIC; +-- though user on access node has required GRANTS, this will propagate GRANTS to the connected data nodes +GRANT CREATE ON SCHEMA public TO :ROLE_DEFAULT_PERM_USER; +\set IS_DISTRIBUTED TRUE +\ir include/cagg_bgw_common.sql +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. +-- +-- Setup +-- +\c :TEST_DBNAME :ROLE_CLUSTER_SUPERUSER +CREATE OR REPLACE FUNCTION ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(timeout INT = -1, mock_start_time INT = 0) RETURNS VOID +AS :MODULE_PATHNAME LANGUAGE C VOLATILE; +CREATE OR REPLACE FUNCTION ts_bgw_db_scheduler_test_run(timeout INT = -1, mock_start_time INT = 0) RETURNS VOID +AS :MODULE_PATHNAME LANGUAGE C VOLATILE; +CREATE OR REPLACE FUNCTION ts_bgw_db_scheduler_test_wait_for_scheduler_finish() RETURNS VOID +AS :MODULE_PATHNAME LANGUAGE C VOLATILE; +CREATE OR REPLACE FUNCTION ts_bgw_params_create() RETURNS VOID +AS :MODULE_PATHNAME LANGUAGE C VOLATILE; +CREATE OR REPLACE FUNCTION ts_bgw_params_destroy() RETURNS VOID +AS :MODULE_PATHNAME LANGUAGE C VOLATILE; +CREATE OR REPLACE FUNCTION ts_bgw_params_reset_time(set_time BIGINT = 0, wait BOOLEAN = false) RETURNS VOID +AS :MODULE_PATHNAME LANGUAGE C VOLATILE; +--test that this all works under the community license +ALTER DATABASE :TEST_DBNAME SET timescaledb.license_key='Community'; +--create a function with no permissions to execute +CREATE FUNCTION get_constant_no_perms() RETURNS INTEGER LANGUAGE SQL IMMUTABLE AS +$BODY$ + SELECT 10; +$BODY$; +REVOKE EXECUTE ON FUNCTION get_constant_no_perms() FROM PUBLIC; +\set WAIT_ON_JOB 0 +\set IMMEDIATELY_SET_UNTIL 1 +\set WAIT_FOR_OTHER_TO_ADVANCE 2 +CREATE OR REPLACE FUNCTION ts_bgw_params_mock_wait_returns_immediately(new_val INTEGER) RETURNS VOID +AS :MODULE_PATHNAME LANGUAGE C VOLATILE; +-- Remove any default jobs, e.g., telemetry +DELETE FROM _timescaledb_config.bgw_job WHERE TRUE; +TRUNCATE _timescaledb_internal.bgw_job_stat; +\c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER +CREATE TABLE public.bgw_log( + msg_no INT, + mock_time BIGINT, + application_name TEXT, + msg TEXT +); +CREATE VIEW sorted_bgw_log AS + SELECT msg_no, + mock_time, + application_name, + regexp_replace(regexp_replace(msg, '(Wait until|started at|execution time) [0-9]+(\.[0-9]+)?', '\1 (RANDOM)', 'g'), 'background worker "[^"]+"','connection') AS msg + FROM bgw_log ORDER BY mock_time, application_name COLLATE "C", msg_no; +CREATE TABLE public.bgw_dsm_handle_store( + handle BIGINT +); +INSERT INTO public.bgw_dsm_handle_store VALUES (0); +SELECT ts_bgw_params_create(); + ts_bgw_params_create +---------------------- + +(1 row) + +SELECT * FROM _timescaledb_config.bgw_job; + id | application_name | schedule_interval | max_runtime | max_retries | retry_period | proc_schema | proc_name | owner | scheduled | fixed_schedule | initial_start | hypertable_id | config | check_schema | check_name | timezone +----+------------------+-------------------+-------------+-------------+--------------+-------------+-----------+-------+-----------+----------------+---------------+---------------+--------+--------------+------------+---------- +(0 rows) + +SELECT * FROM timescaledb_information.job_stats; + hypertable_schema | hypertable_name | job_id | last_run_started_at | last_successful_finish | last_run_status | job_status | last_run_duration | next_start | total_runs | total_successes | total_failures +-------------------+-----------------+--------+---------------------+------------------------+-----------------+------------+-------------------+------------+------------+-----------------+---------------- +(0 rows) + +SELECT * FROM _timescaledb_catalog.continuous_agg; + mat_hypertable_id | raw_hypertable_id | parent_mat_hypertable_id | user_view_schema | user_view_name | partial_view_schema | partial_view_name | bucket_width | direct_view_schema | direct_view_name | materialized_only | finalized +-------------------+-------------------+--------------------------+------------------+----------------+---------------------+-------------------+--------------+--------------------+------------------+-------------------+----------- +(0 rows) + +-- though user on access node has required GRANTS, this will propagate GRANTS to the connected data nodes +GRANT CREATE ON SCHEMA public TO :ROLE_DEFAULT_PERM_USER; +psql:include/cagg_bgw_common.sql:76: WARNING: no privileges were granted for "public" +\c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER +CREATE TABLE test_continuous_agg_table(time int, data int); +\if :IS_DISTRIBUTED +SELECT create_distributed_hypertable('test_continuous_agg_table', 'time', chunk_time_interval => 10, replication_factor => 2); +psql:include/cagg_bgw_common.sql:80: NOTICE: adding not-null constraint to column "time" + create_distributed_hypertable +---------------------------------------- + (1,public,test_continuous_agg_table,t) +(1 row) + +\else +SELECT create_hypertable('test_continuous_agg_table', 'time', chunk_time_interval => 10); +\endif +CREATE OR REPLACE FUNCTION integer_now_test() returns int LANGUAGE SQL STABLE as $$ SELECT coalesce(max(time), 0) FROM test_continuous_agg_table $$; +\if :IS_DISTRIBUTED +CALL distributed_exec($DIST$ +CREATE OR REPLACE FUNCTION integer_now_test() returns int LANGUAGE SQL STABLE as $$ SELECT coalesce(max(time), 0) FROM test_continuous_agg_table $$; +$DIST$); +\endif +SELECT set_integer_now_func('test_continuous_agg_table', 'integer_now_test'); + set_integer_now_func +---------------------- + +(1 row) + +CREATE MATERIALIZED VIEW test_continuous_agg_view + WITH (timescaledb.continuous, timescaledb.materialized_only=true) + AS SELECT time_bucket('2', time), SUM(data) as value + FROM test_continuous_agg_table + GROUP BY 1 WITH NO DATA; +SELECT add_continuous_aggregate_policy('test_continuous_agg_view', NULL, 4::integer, '12 h'::interval); + add_continuous_aggregate_policy +--------------------------------- + 1000 +(1 row) + +SELECT id as raw_table_id FROM _timescaledb_catalog.hypertable WHERE table_name='test_continuous_agg_table' \gset +-- min distance from end should be 1 +SELECT mat_hypertable_id, user_view_schema, user_view_name, bucket_width +FROM _timescaledb_catalog.continuous_agg; + mat_hypertable_id | user_view_schema | user_view_name | bucket_width +-------------------+------------------+--------------------------+-------------- + 2 | public | test_continuous_agg_view | 2 +(1 row) + +SELECT mat_hypertable_id FROM _timescaledb_catalog.continuous_agg \gset +SELECT id AS job_id FROM _timescaledb_config.bgw_job where hypertable_id=:mat_hypertable_id \gset +-- job was created +SELECT * FROM _timescaledb_config.bgw_job where hypertable_id=:mat_hypertable_id; + id | application_name | schedule_interval | max_runtime | max_retries | retry_period | proc_schema | proc_name | owner | scheduled | fixed_schedule | initial_start | hypertable_id | config | check_schema | check_name | timezone +------+--------------------------------------------+-------------------+-------------+-------------+--------------+------------------------+-------------------------------------+-------------------+-----------+----------------+---------------+---------------+-----------------------------------------------------------------+------------------------+-------------------------------------------+---------- + 1000 | Refresh Continuous Aggregate Policy [1000] | @ 12 hours | @ 0 | -1 | @ 12 hours | _timescaledb_functions | policy_refresh_continuous_aggregate | default_perm_user | t | f | | 2 | {"end_offset": 4, "start_offset": null, "mat_hypertable_id": 2} | _timescaledb_functions | policy_refresh_continuous_aggregate_check | +(1 row) + +-- create 10 time buckets +INSERT INTO test_continuous_agg_table + SELECT i, i FROM + (SELECT generate_series(0, 10) as i) AS j; +-- no stats +SELECT job_id, next_start, last_finish as until_next, last_run_success, total_runs, total_successes, total_failures, total_crashes + FROM _timescaledb_internal.bgw_job_stat + ORDER BY job_id; + job_id | next_start | until_next | last_run_success | total_runs | total_successes | total_failures | total_crashes +--------+------------+------------+------------------+------------+-----------------+----------------+--------------- +(0 rows) + +-- no data in view +SELECT * FROM test_continuous_agg_view ORDER BY 1; + time_bucket | value +-------------+------- +(0 rows) + +-- run first time +SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(25); + ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish +------------------------------------------------------------ + +(1 row) + +SELECT * FROM sorted_bgw_log; + msg_no | mock_time | application_name | msg +--------+-----------+--------------------------------------------+-------------------------------------------------------------------------------------------------------- + 0 | 0 | DB Scheduler | [TESTING] Registered new background worker + 1 | 0 | DB Scheduler | [TESTING] Wait until (RANDOM), started at (RANDOM) + 0 | 0 | Refresh Continuous Aggregate Policy [1000] | refreshing continuous aggregate "test_continuous_agg_view" in window [ -2147483648, 6 ] + 1 | 0 | Refresh Continuous Aggregate Policy [1000] | deleted 0 row(s) from materialization table "_timescaledb_internal._materialized_hypertable_2" + 2 | 0 | Refresh Continuous Aggregate Policy [1000] | inserted 3 row(s) into materialization table "_timescaledb_internal._materialized_hypertable_2" + 3 | 0 | Refresh Continuous Aggregate Policy [1000] | job 1000 (Refresh Continuous Aggregate Policy [1000]) exiting with success: execution time (RANDOM) ms +(6 rows) + +SELECT * FROM _timescaledb_config.bgw_job where id=:job_id; + id | application_name | schedule_interval | max_runtime | max_retries | retry_period | proc_schema | proc_name | owner | scheduled | fixed_schedule | initial_start | hypertable_id | config | check_schema | check_name | timezone +------+--------------------------------------------+-------------------+-------------+-------------+--------------+------------------------+-------------------------------------+-------------------+-----------+----------------+---------------+---------------+-----------------------------------------------------------------+------------------------+-------------------------------------------+---------- + 1000 | Refresh Continuous Aggregate Policy [1000] | @ 12 hours | @ 0 | -1 | @ 12 hours | _timescaledb_functions | policy_refresh_continuous_aggregate | default_perm_user | t | f | | 2 | {"end_offset": 4, "start_offset": null, "mat_hypertable_id": 2} | _timescaledb_functions | policy_refresh_continuous_aggregate_check | +(1 row) + +-- job ran once, successfully +SELECT job_id, next_start-last_finish as until_next, last_run_success, total_runs, total_successes, total_failures, total_crashes + FROM _timescaledb_internal.bgw_job_stat + where job_id=:job_id; + job_id | until_next | last_run_success | total_runs | total_successes | total_failures | total_crashes +--------+------------+------------------+------------+-----------------+----------------+--------------- + 1000 | @ 12 hours | t | 1 | 1 | 0 | 0 +(1 row) + +--clear log for next run of scheduler. +TRUNCATE public.bgw_log; +CREATE FUNCTION wait_for_timer_to_run(started_at INTEGER, spins INTEGER=:TEST_SPINWAIT_ITERS) RETURNS BOOLEAN LANGUAGE PLPGSQL AS +$BODY$ +DECLARE + num_runs INTEGER; + message TEXT; +BEGIN + select format('[TESTING] Wait until %%, started at %s', started_at) into message; + FOR i in 1..spins + LOOP + SELECT COUNT(*) from bgw_log where msg LIKE message INTO num_runs; + if (num_runs > 0) THEN + RETURN true; + ELSE + PERFORM pg_sleep(0.1); + END IF; + END LOOP; + RETURN false; +END +$BODY$; +CREATE FUNCTION wait_for_job_to_run(job_param_id INTEGER, expected_runs INTEGER, spins INTEGER=:TEST_SPINWAIT_ITERS) RETURNS BOOLEAN LANGUAGE PLPGSQL AS +$BODY$ +DECLARE + num_runs INTEGER; +BEGIN + FOR i in 1..spins + LOOP + SELECT total_successes FROM _timescaledb_internal.bgw_job_stat WHERE job_id=job_param_id INTO num_runs; + if (num_runs = expected_runs) THEN + RETURN true; + ELSEIF (num_runs > expected_runs) THEN + RAISE 'num_runs > expected'; + ELSE + PERFORM pg_sleep(0.1); + END IF; + END LOOP; + RETURN false; +END +$BODY$; +--make sure there is 1 job to start with +SELECT wait_for_job_to_run(:job_id, 1); + wait_for_job_to_run +--------------------- + t +(1 row) + +SELECT ts_bgw_params_mock_wait_returns_immediately(:WAIT_FOR_OTHER_TO_ADVANCE); + ts_bgw_params_mock_wait_returns_immediately +--------------------------------------------- + +(1 row) + +--start the scheduler on 0 time +SELECT ts_bgw_params_reset_time(0, true); + ts_bgw_params_reset_time +-------------------------- + +(1 row) + +SELECT ts_bgw_db_scheduler_test_run(extract(epoch from interval '24 hour')::int * 1000, 0); + ts_bgw_db_scheduler_test_run +------------------------------ + +(1 row) + +SELECT wait_for_timer_to_run(0); + wait_for_timer_to_run +----------------------- + t +(1 row) + +--advance to 12:00 so that it runs one more time; now we know the +--scheduler has loaded up the job with the old schedule_interval +SELECT ts_bgw_params_reset_time(extract(epoch from interval '12 hour')::bigint * 1000000, true); + ts_bgw_params_reset_time +-------------------------- + +(1 row) + +SELECT wait_for_job_to_run(:job_id, 2); + wait_for_job_to_run +--------------------- + t +(1 row) + +--advance clock 1us to make the scheduler realize the job is done +SELECT ts_bgw_params_reset_time((extract(epoch from interval '12 hour')::bigint * 1000000)+1, true); + ts_bgw_params_reset_time +-------------------------- + +(1 row) + +--alter the refresh interval and check if next_start is altered +SELECT alter_job(:job_id, schedule_interval => '1m', retry_period => '1m'); + alter_job +------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + (1000,"@ 1 min","@ 0",-1,"@ 1 min",t,"{""end_offset"": 4, ""start_offset"": null, ""mat_hypertable_id"": 2}","Sat Jan 01 04:01:00 2000 PST",_timescaledb_functions.policy_refresh_continuous_aggregate_check,f,,) +(1 row) + +SELECT job_id, next_start - last_finish as until_next, total_runs +FROM _timescaledb_internal.bgw_job_stat +WHERE job_id=:job_id;; + job_id | until_next | total_runs +--------+------------+------------ + 1000 | @ 1 min | 2 +(1 row) + +--advance to 12:02, job should have run at 12:01 +SELECT ts_bgw_params_reset_time((extract(epoch from interval '12 hour')::bigint * 1000000)+(extract(epoch from interval '2 minute')::bigint * 1000000), true); + ts_bgw_params_reset_time +-------------------------- + +(1 row) + +SELECT wait_for_job_to_run(:job_id, 3); + wait_for_job_to_run +--------------------- + t +(1 row) + +--next run in 1 minute +SELECT job_id, next_start-last_finish as until_next, total_runs +FROM _timescaledb_internal.bgw_job_stat +WHERE job_id=:job_id; + job_id | until_next | total_runs +--------+------------+------------ + 1000 | @ 1 min | 3 +(1 row) + +--change next run to be after 30s instead +SELECT (next_start - '30s'::interval) AS "NEW_NEXT_START" +FROM _timescaledb_internal.bgw_job_stat +WHERE job_id=:job_id \gset +SELECT alter_job(:job_id, next_start => :'NEW_NEXT_START'); + alter_job +------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + (1000,"@ 1 min","@ 0",-1,"@ 1 min",t,"{""end_offset"": 4, ""start_offset"": null, ""mat_hypertable_id"": 2}","Sat Jan 01 04:02:30 2000 PST",_timescaledb_functions.policy_refresh_continuous_aggregate_check,f,,) +(1 row) + +SELECT ts_bgw_params_reset_time((extract(epoch from interval '12 hour')::bigint * 1000000)+(extract(epoch from interval '2 minute 30 seconds')::bigint * 1000000), true); + ts_bgw_params_reset_time +-------------------------- + +(1 row) + +SELECT wait_for_job_to_run(:job_id, 4); + wait_for_job_to_run +--------------------- + t +(1 row) + +--advance clock to quit scheduler +SELECT ts_bgw_params_reset_time(extract(epoch from interval '25 hour')::bigint * 1000000, true); + ts_bgw_params_reset_time +-------------------------- + +(1 row) + +select ts_bgw_db_scheduler_test_wait_for_scheduler_finish(); + ts_bgw_db_scheduler_test_wait_for_scheduler_finish +---------------------------------------------------- + +(1 row) + +SELECT ts_bgw_params_mock_wait_returns_immediately(:WAIT_ON_JOB); + ts_bgw_params_mock_wait_returns_immediately +--------------------------------------------- + +(1 row) + +TRUNCATE public.bgw_log; +-- data before 8 +SELECT * FROM test_continuous_agg_view ORDER BY 1; + time_bucket | value +-------------+------- + 0 | 1 + 2 | 5 + 4 | 9 +(3 rows) + +-- invalidations test by running job multiple times +SELECT ts_bgw_params_reset_time(); + ts_bgw_params_reset_time +-------------------------- + +(1 row) + +DROP MATERIALIZED VIEW test_continuous_agg_view; +psql:include/cagg_bgw_common.sql:234: NOTICE: drop cascades to table _timescaledb_internal._hyper_2_3_chunk +CREATE MATERIALIZED VIEW test_continuous_agg_view + WITH (timescaledb.continuous, + timescaledb.materialized_only=true) + AS SELECT time_bucket('2', time), SUM(data) as value + FROM test_continuous_agg_table + GROUP BY 1 WITH NO DATA; +SELECT add_continuous_aggregate_policy('test_continuous_agg_view', 100::integer, -2::integer, '12 h'::interval); + add_continuous_aggregate_policy +--------------------------------- + 1001 +(1 row) + +SELECT mat_hypertable_id FROM _timescaledb_catalog.continuous_agg \gset +SELECT id AS job_id FROM _timescaledb_config.bgw_job WHERE hypertable_id=:mat_hypertable_id \gset +SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(25); + ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish +------------------------------------------------------------ + +(1 row) + +SELECT * FROM sorted_bgw_log; + msg_no | mock_time | application_name | msg +--------+-----------+--------------------------------------------+-------------------------------------------------------------------------------------------------------- + 0 | 0 | DB Scheduler | [TESTING] Registered new background worker + 1 | 0 | DB Scheduler | [TESTING] Wait until (RANDOM), started at (RANDOM) + 0 | 0 | Refresh Continuous Aggregate Policy [1001] | refreshing continuous aggregate "test_continuous_agg_view" in window [ -90, 12 ] + 1 | 0 | Refresh Continuous Aggregate Policy [1001] | deleted 0 row(s) from materialization table "_timescaledb_internal._materialized_hypertable_3" + 2 | 0 | Refresh Continuous Aggregate Policy [1001] | inserted 6 row(s) into materialization table "_timescaledb_internal._materialized_hypertable_3" + 3 | 0 | Refresh Continuous Aggregate Policy [1001] | job 1001 (Refresh Continuous Aggregate Policy [1001]) exiting with success: execution time (RANDOM) ms +(6 rows) + +-- job ran once, successfully +SELECT job_id, last_finish - next_start as until_next, last_run_success, total_runs, total_successes, total_failures, total_crashes + FROM _timescaledb_internal.bgw_job_stat + where job_id=:job_id; + job_id | until_next | last_run_success | total_runs | total_successes | total_failures | total_crashes +--------+----------------+------------------+------------+-----------------+----------------+--------------- + 1001 | @ 12 hours ago | t | 1 | 1 | 0 | 0 +(1 row) + +-- should have refreshed everything we have so far +SELECT * FROM test_continuous_agg_view ORDER BY 1; + time_bucket | value +-------------+------- + 0 | 1 + 2 | 5 + 4 | 9 + 6 | 13 + 8 | 17 + 10 | 10 +(6 rows) + +-- invalidate some data +UPDATE test_continuous_agg_table +SET data = 11 WHERE time = 6; +--advance time by 12h so that job runs one more time +SELECT ts_bgw_params_reset_time(extract(epoch from interval '12 hour')::bigint * 1000000, true); + ts_bgw_params_reset_time +-------------------------- + +(1 row) + +SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(25, 25); + ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish +------------------------------------------------------------ + +(1 row) + +SELECT * FROM sorted_bgw_log; + msg_no | mock_time | application_name | msg +--------+-------------+--------------------------------------------+-------------------------------------------------------------------------------------------------------- + 0 | 0 | DB Scheduler | [TESTING] Registered new background worker + 1 | 0 | DB Scheduler | [TESTING] Wait until (RANDOM), started at (RANDOM) + 0 | 0 | Refresh Continuous Aggregate Policy [1001] | refreshing continuous aggregate "test_continuous_agg_view" in window [ -90, 12 ] + 1 | 0 | Refresh Continuous Aggregate Policy [1001] | deleted 0 row(s) from materialization table "_timescaledb_internal._materialized_hypertable_3" + 2 | 0 | Refresh Continuous Aggregate Policy [1001] | inserted 6 row(s) into materialization table "_timescaledb_internal._materialized_hypertable_3" + 3 | 0 | Refresh Continuous Aggregate Policy [1001] | job 1001 (Refresh Continuous Aggregate Policy [1001]) exiting with success: execution time (RANDOM) ms + 0 | 43200000000 | DB Scheduler | [TESTING] Registered new background worker + 1 | 43200000000 | DB Scheduler | [TESTING] Wait until (RANDOM), started at (RANDOM) + 0 | 43200000000 | Refresh Continuous Aggregate Policy [1001] | refreshing continuous aggregate "test_continuous_agg_view" in window [ -90, 12 ] + 1 | 43200000000 | Refresh Continuous Aggregate Policy [1001] | deleted 1 row(s) from materialization table "_timescaledb_internal._materialized_hypertable_3" + 2 | 43200000000 | Refresh Continuous Aggregate Policy [1001] | inserted 1 row(s) into materialization table "_timescaledb_internal._materialized_hypertable_3" + 3 | 43200000000 | Refresh Continuous Aggregate Policy [1001] | job 1001 (Refresh Continuous Aggregate Policy [1001]) exiting with success: execution time (RANDOM) ms +(12 rows) + +SELECT job_id, next_start - last_finish as until_next, last_run_success, total_runs, total_successes, total_failures, total_crashes + FROM _timescaledb_internal.bgw_job_stat + where job_id=:job_id; + job_id | until_next | last_run_success | total_runs | total_successes | total_failures | total_crashes +--------+------------+------------------+------------+-----------------+----------------+--------------- + 1001 | @ 12 hours | t | 2 | 2 | 0 | 0 +(1 row) + +-- should have updated data for time=6 +SELECT * FROM test_continuous_agg_view ORDER BY 1; + time_bucket | value +-------------+------- + 0 | 1 + 2 | 5 + 4 | 9 + 6 | 18 + 8 | 17 + 10 | 10 +(6 rows) + +\x on +--check the information views -- +select view_name, view_owner, materialization_hypertable_schema, materialization_hypertable_name +from timescaledb_information.continuous_aggregates +where view_name::text like '%test_continuous_agg_view'; +-[ RECORD 1 ]---------------------+--------------------------- +view_name | test_continuous_agg_view +view_owner | default_perm_user +materialization_hypertable_schema | _timescaledb_internal +materialization_hypertable_name | _materialized_hypertable_3 + +select view_name, view_definition from timescaledb_information.continuous_aggregates +where view_name::text like '%test_continuous_agg_view'; +-[ RECORD 1 ]---+----------------------------------------------- +view_name | test_continuous_agg_view +view_definition | SELECT time_bucket(2, "time") AS time_bucket,+ + | sum(data) AS value + + | FROM test_continuous_agg_table + + | GROUP BY (time_bucket(2, "time")); + +select job_status, last_run_duration +from timescaledb_information.job_stats ps, timescaledb_information.continuous_aggregates cagg +where cagg.view_name::text like '%test_continuous_agg_view' +and cagg.materialization_hypertable_name = ps.hypertable_name; +-[ RECORD 1 ]-----+---------- +job_status | Scheduled +last_run_duration | + +\x off +DROP MATERIALIZED VIEW test_continuous_agg_view; +psql:include/cagg_bgw_common.sql:294: NOTICE: drop cascades to table _timescaledb_internal._hyper_3_4_chunk +--create a view with a function that it has no permission to execute +CREATE MATERIALIZED VIEW test_continuous_agg_view + WITH (timescaledb.continuous, + timescaledb.materialized_only=true) + AS SELECT time_bucket('2', time), SUM(data) as value, get_constant_no_perms() + FROM test_continuous_agg_table + GROUP BY 1 WITH NO DATA; +SELECT add_continuous_aggregate_policy('test_continuous_agg_view', 100::integer, -2::integer, '12 h'::interval); + add_continuous_aggregate_policy +--------------------------------- + 1002 +(1 row) + +SELECT id AS job_id FROM _timescaledb_config.bgw_job ORDER BY id desc limit 1 \gset +SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(25); + ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish +------------------------------------------------------------ + +(1 row) + +-- job fails +SELECT job_id, last_run_success, total_runs, total_successes, total_failures, total_crashes + FROM _timescaledb_internal.bgw_job_stat + where job_id=:job_id; + job_id | last_run_success | total_runs | total_successes | total_failures | total_crashes +--------+------------------+------------+-----------------+----------------+--------------- + 1002 | f | 1 | 0 | 1 | 0 +(1 row) + +DROP MATERIALIZED VIEW test_continuous_agg_view; +--advance clock to quit scheduler +SELECT ts_bgw_params_reset_time(extract(epoch from interval '25 hour')::bigint * 1000000, true); + ts_bgw_params_reset_time +-------------------------- + +(1 row) + +select ts_bgw_db_scheduler_test_wait_for_scheduler_finish(); + ts_bgw_db_scheduler_test_wait_for_scheduler_finish +---------------------------------------------------- + +(1 row) + +SELECT ts_bgw_params_mock_wait_returns_immediately(:WAIT_ON_JOB); + ts_bgw_params_mock_wait_returns_immediately +--------------------------------------------- + +(1 row) + +--clear log for next run of the scheduler +TRUNCATE public.bgw_log; +SELECT ts_bgw_params_reset_time(); + ts_bgw_params_reset_time +-------------------------- + +(1 row) + +-- +-- Test creating continuous aggregate with a user that is the non-owner of the raw table +-- +CREATE TABLE test_continuous_agg_table_w_grant(time int, data int); +\if :IS_DISTRIBUTED +SELECT create_distributed_hypertable('test_continuous_agg_table_w_grant', 'time', chunk_time_interval => 10, replication_factor => 2); +psql:include/cagg_bgw_common.sql:330: NOTICE: adding not-null constraint to column "time" + create_distributed_hypertable +------------------------------------------------ + (5,public,test_continuous_agg_table_w_grant,t) +(1 row) + +\else +SELECT create_hypertable('test_continuous_agg_table_w_grant', 'time', chunk_time_interval => 10); +\endif +CREATE OR REPLACE FUNCTION integer_now_test1() returns int LANGUAGE SQL STABLE as $$ SELECT coalesce(max(time), 0) FROM test_continuous_agg_table_w_grant $$; +\if :IS_DISTRIBUTED +CALL distributed_exec($DIST$ +CREATE OR REPLACE FUNCTION integer_now_test1() returns int LANGUAGE SQL STABLE as $$ SELECT coalesce(max(time), 0) FROM test_continuous_agg_table_w_grant $$; +$DIST$); +\endif +SELECT set_integer_now_func('test_continuous_agg_table_w_grant', 'integer_now_test1'); + set_integer_now_func +---------------------- + +(1 row) + +GRANT SELECT, TRIGGER ON test_continuous_agg_table_w_grant TO public; +INSERT INTO test_continuous_agg_table_w_grant + SELECT 1 , 1; +\c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER_2 +-- make sure view can be created +CREATE MATERIALIZED VIEW test_continuous_agg_view_user_2 + WITH ( timescaledb.continuous, + timescaledb.materialized_only=true) + AS SELECT time_bucket('2', time), SUM(data) as value + FROM test_continuous_agg_table_w_grant + GROUP BY 1 WITH NO DATA; +SELECT add_continuous_aggregate_policy('test_continuous_agg_view_user_2', NULL, -2::integer, '12 h'::interval); + add_continuous_aggregate_policy +--------------------------------- + 1003 +(1 row) + +SELECT id AS job_id FROM _timescaledb_config.bgw_job ORDER BY id desc limit 1 \gset +SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(25); + ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish +------------------------------------------------------------ + +(1 row) + +SELECT id, owner FROM _timescaledb_config.bgw_job WHERE id = :job_id ; + id | owner +------+--------------------- + 1003 | default_perm_user_2 +(1 row) + +SELECT job_id, next_start - last_finish as until_next, last_run_success, total_runs, total_successes, total_failures, total_crashes + FROM _timescaledb_internal.bgw_job_stat + where job_id=:job_id; + job_id | until_next | last_run_success | total_runs | total_successes | total_failures | total_crashes +--------+------------+------------------+------------+-----------------+----------------+--------------- + 1003 | @ 12 hours | t | 1 | 1 | 0 | 0 +(1 row) + +--view is populated +SELECT * FROM test_continuous_agg_view_user_2 ORDER BY 1; + time_bucket | value +-------------+------- + 0 | 1 +(1 row) + +\c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER +--revoke permissions from the continuous agg view owner to select from raw table +--no further updates to cont agg should happen +REVOKE SELECT ON test_continuous_agg_table_w_grant FROM public; +--add new data to table +INSERT INTO test_continuous_agg_table_w_grant VALUES(5,1); +\c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER_2 +--advance time by 12h so that job tries to run one more time +SELECT ts_bgw_params_reset_time(extract(epoch from interval '12 hour')::bigint * 1000000, true); + ts_bgw_params_reset_time +-------------------------- + +(1 row) + +SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(25, 25); + ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish +------------------------------------------------------------ + +(1 row) + +--should show a failing execution because no longer has permissions (due to lack of permission on partial view owner's part) +SELECT job_id, last_run_success, total_runs, total_successes, total_failures, total_crashes + FROM _timescaledb_internal.bgw_job_stat + where job_id=:job_id; + job_id | last_run_success | total_runs | total_successes | total_failures | total_crashes +--------+------------------+------------+-----------------+----------------+--------------- + 1003 | f | 2 | 1 | 1 | 0 +(1 row) + +--view was NOT updated; but the old stuff is still there +SELECT * FROM test_continuous_agg_view_user_2; + time_bucket | value +-------------+------- + 0 | 1 +(1 row) + +\c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER +SELECT * from sorted_bgw_log; + msg_no | mock_time | application_name | msg +--------+-------------+--------------------------------------------+-------------------------------------------------------------------------------------------------------- + 0 | 0 | DB Scheduler | [TESTING] Registered new background worker + 1 | 0 | DB Scheduler | [TESTING] Wait until (RANDOM), started at (RANDOM) + 0 | 0 | Refresh Continuous Aggregate Policy [1003] | refreshing continuous aggregate "test_continuous_agg_view_user_2" in window [ -2147483648, 2 ] + 1 | 0 | Refresh Continuous Aggregate Policy [1003] | deleted 0 row(s) from materialization table "_timescaledb_internal._materialized_hypertable_6" + 2 | 0 | Refresh Continuous Aggregate Policy [1003] | inserted 1 row(s) into materialization table "_timescaledb_internal._materialized_hypertable_6" + 3 | 0 | Refresh Continuous Aggregate Policy [1003] | job 1003 (Refresh Continuous Aggregate Policy [1003]) exiting with success: execution time (RANDOM) ms + 0 | 43200000000 | DB Scheduler | [TESTING] Registered new background worker + 1 | 43200000000 | DB Scheduler | [TESTING] Wait until (RANDOM), started at (RANDOM) + 0 | 43200000000 | Refresh Continuous Aggregate Policy [1003] | job 1003 threw an error + 1 | 43200000000 | Refresh Continuous Aggregate Policy [1003] | permission denied for table test_continuous_agg_table_w_grant +(10 rows) + +-- Count the number of continuous aggregate policies +SELECT count(*) FROM _timescaledb_config.bgw_job + WHERE proc_schema = '_timescaledb_functions' + AND proc_name = 'policy_refresh_continuous_aggregate'; + count +------- + 1 +(1 row) + +-- cleanup +\c :TEST_DBNAME :ROLE_CLUSTER_SUPERUSER; +DROP DATABASE :DATA_NODE_1 WITH (FORCE); +DROP DATABASE :DATA_NODE_2 WITH (FORCE); +DROP DATABASE :DATA_NODE_3 WITH (FORCE); diff --git a/tsl/test/expected/dist_hypertable-16.out b/tsl/test/expected/dist_hypertable-16.out index 23f808ff7ba..bf21e134df3 100644 --- a/tsl/test/expected/dist_hypertable-16.out +++ b/tsl/test/expected/dist_hypertable-16.out @@ -891,9 +891,10 @@ FROM disttable ORDER BY device, temp; QUERY PLAN ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- - Sort + Incremental Sort Output: disttable.device, disttable.temp, (avg(disttable.temp) OVER (?)) Sort Key: disttable.device, disttable.temp + Presorted Key: disttable.device -> WindowAgg Output: disttable.device, disttable.temp, avg(disttable.temp) OVER (?) -> Custom Scan (AsyncAppend) @@ -915,7 +916,7 @@ ORDER BY device, temp; Data node: db_dist_hypertable_3 Chunks: _dist_hyper_1_2_chunk, _dist_hyper_1_6_chunk Remote SQL: SELECT device, temp FROM public.disttable WHERE _timescaledb_functions.chunks_in(public.disttable.*, ARRAY[1, 2]) ORDER BY device ASC NULLS LAST -(24 rows) +(25 rows) SELECT device, temp, avg(temp) OVER (PARTITION BY device) FROM disttable diff --git a/tsl/test/sql/.gitignore b/tsl/test/sql/.gitignore index 7c80876e95a..35b90ef95ec 100644 --- a/tsl/test/sql/.gitignore +++ b/tsl/test/sql/.gitignore @@ -1,5 +1,6 @@ /*.pgbinary /cagg_bgw-*.sql +/cagg_bgw_dist_ht-*.sql /cagg_ddl-*.sql /cagg_ddl_dist_ht-*.sql /cagg_errors_deprecated-*.sql diff --git a/tsl/test/sql/CMakeLists.txt b/tsl/test/sql/CMakeLists.txt index 9a623ffee2d..21fbc4f6a31 100644 --- a/tsl/test/sql/CMakeLists.txt +++ b/tsl/test/sql/CMakeLists.txt @@ -92,7 +92,6 @@ if(CMAKE_BUILD_TYPE MATCHES Debug) list( APPEND TEST_FILES - cagg_bgw_dist_ht.sql cagg_migrate_dist_ht.sql cagg_on_cagg_dist_ht.sql cagg_on_cagg_joins_dist_ht.sql @@ -194,6 +193,7 @@ if(CMAKE_BUILD_TYPE MATCHES Debug) list( APPEND TEST_TEMPLATES + cagg_bgw_dist_ht.sql.in cagg_ddl_dist_ht.sql.in cagg_invalidation_dist_ht.sql.in dist_hypertable.sql.in diff --git a/tsl/test/sql/cagg_bgw_dist_ht.sql b/tsl/test/sql/cagg_bgw_dist_ht.sql.in similarity index 100% rename from tsl/test/sql/cagg_bgw_dist_ht.sql rename to tsl/test/sql/cagg_bgw_dist_ht.sql.in