From 44817252b576ac52eb249126790bb7d30db5e3de Mon Sep 17 00:00:00 2001 From: Nikhil Sontakke Date: Tue, 24 Oct 2023 18:18:43 +0530 Subject: [PATCH] Use creation time in retention/compression policy The retention and compression policies can now use drop_created_before and compress_created_before arguments respectively to specify chunk selection using their creation times. We don't support creation times for CAggs, yet. --- .github/workflows/pgspot.yaml | 4 + .unreleased/feature_6227 | 1 + sql/compat.sql | 6 +- sql/policy_api.sql | 14 +- sql/policy_internal.sql | 49 +++++-- sql/updates/latest-dev.sql | 153 ++++++++++++++++++++ sql/updates/reverse-dev.sql | 136 +++++++++++++++++ tsl/src/bgw_policy/compression_api.c | 91 ++++++++++-- tsl/src/bgw_policy/compression_api.h | 4 +- tsl/src/bgw_policy/job.c | 18 ++- tsl/src/bgw_policy/job.h | 1 + tsl/src/bgw_policy/policies_v2.c | 4 + tsl/src/bgw_policy/policies_v2.h | 4 + tsl/src/bgw_policy/policy_utils.c | 2 +- tsl/src/bgw_policy/retention_api.c | 98 ++++++++++--- tsl/src/bgw_policy/retention_api.h | 7 +- tsl/src/chunk.c | 49 ++++--- tsl/src/chunk.h | 3 +- tsl/test/expected/bgw_policy.out | 32 ++-- tsl/test/expected/cagg_policy.out | 5 +- tsl/test/expected/compression_bgw-13.out | 2 +- tsl/test/expected/compression_bgw-14.out | 2 +- tsl/test/expected/compression_bgw-15.out | 2 +- tsl/test/expected/compression_bgw-16.out | 2 +- tsl/test/expected/compression_errors-13.out | 8 +- tsl/test/expected/compression_errors-14.out | 8 +- tsl/test/expected/compression_errors-15.out | 8 +- tsl/test/expected/compression_errors-16.out | 8 +- tsl/test/expected/policy_generalization.out | 109 ++++++++++++++ tsl/test/expected/tsl_tables.out | 3 +- tsl/test/shared/expected/compat.out | 4 +- tsl/test/shared/expected/extension.out | 8 +- tsl/test/shared/sql/compat.sql | 2 +- tsl/test/sql/bgw_policy.sql | 12 +- tsl/test/sql/cagg_policy.sql | 5 +- tsl/test/sql/policy_generalization.sql | 47 ++++++ tsl/test/sql/tsl_tables.sql | 1 + 37 files changed, 772 insertions(+), 140 deletions(-) create mode 100644 .unreleased/feature_6227 diff --git a/.github/workflows/pgspot.yaml b/.github/workflows/pgspot.yaml index ef0116adc6f..1912610f886 100644 --- a/.github/workflows/pgspot.yaml +++ b/.github/workflows/pgspot.yaml @@ -21,6 +21,10 @@ jobs: --proc-without-search-path 'extschema.recompress_chunk(chunk regclass,if_not_compressed boolean)' --proc-without-search-path '_timescaledb_internal.policy_compression(job_id integer,config jsonb)' --proc-without-search-path '_timescaledb_functions.policy_compression(job_id integer,config jsonb)' + --proc-without-search-path + '_timescaledb_internal.policy_compression_execute(job_id integer,htid integer,lag anyelement,maxchunks integer,verbose_log boolean,recompress_enabled boolean,use_creation_time boolean)' + --proc-without-search-path + '_timescaledb_functions.policy_compression_execute(job_id integer,htid integer,lag anyelement,maxchunks integer,verbose_log boolean,recompress_enabled boolean,use_creation_time boolean)' --proc-without-search-path '_timescaledb_internal.policy_compression_execute(job_id integer,htid integer,lag anyelement,maxchunks integer,verbose_log boolean,recompress_enabled boolean)' --proc-without-search-path diff --git a/.unreleased/feature_6227 b/.unreleased/feature_6227 new file mode 100644 index 00000000000..39ba8386bee --- /dev/null +++ b/.unreleased/feature_6227 @@ -0,0 +1 @@ +Implements: #6227 Use creation time in retention/compression policy diff --git a/sql/compat.sql b/sql/compat.sql index e98375f07ee..9cd31c94b0d 100644 --- a/sql/compat.sql +++ b/sql/compat.sql @@ -1015,12 +1015,12 @@ END$$ SET search_path TO pg_catalog,pg_temp; -CREATE OR REPLACE PROCEDURE _timescaledb_internal.policy_compression_execute(job_id integer,htid integer,lag anyelement,maxchunks integer,verbose_log boolean,recompress_enabled boolean) LANGUAGE PLPGSQL AS $$ +CREATE OR REPLACE PROCEDURE _timescaledb_internal.policy_compression_execute(job_id integer,htid integer,lag anyelement,maxchunks integer,verbose_log boolean,recompress_enabled boolean,use_creation_time boolean) LANGUAGE PLPGSQL AS $$ BEGIN IF current_setting('timescaledb.enable_deprecation_warnings', true)::bool THEN - RAISE WARNING 'procedure _timescaledb_internal.policy_compression_execute(integer,integer,anyelement,integer,boolean,boolean) is deprecated and has been moved to _timescaledb_functions schema. this compatibility function will be removed in a future version.'; + RAISE WARNING 'procedure _timescaledb_internal.policy_compression_execute(integer,integer,anyelement,integer,boolean,boolean,boolean) is deprecated and has been moved to _timescaledb_functions schema. this compatibility function will be removed in a future version.'; END IF; - CALL _timescaledb_functions.policy_compression_execute($1,$2,$3,$4,$5,$6); + CALL _timescaledb_functions.policy_compression_execute($1,$2,$3,$4,$5,$6,$7); END$$ SET search_path TO pg_catalog,pg_temp; diff --git a/sql/policy_api.sql b/sql/policy_api.sql index 0900ee361d1..00ae6bd0ea2 100644 --- a/sql/policy_api.sql +++ b/sql/policy_api.sql @@ -12,11 +12,12 @@ -- might be kept, but data within the window will never be deleted. CREATE OR REPLACE FUNCTION @extschema@.add_retention_policy( relation REGCLASS, - drop_after "any", + drop_after "any" = NULL, if_not_exists BOOL = false, schedule_interval INTERVAL = NULL, initial_start TIMESTAMPTZ = NULL, - timezone TEXT = NULL + timezone TEXT = NULL, + drop_created_before INTERVAL = NULL ) RETURNS INTEGER AS '@MODULE_PATHNAME@', 'ts_policy_retention_add' LANGUAGE C VOLATILE; @@ -45,11 +46,13 @@ LANGUAGE C VOLATILE STRICT; /* compression policy */ CREATE OR REPLACE FUNCTION @extschema@.add_compression_policy( - hypertable REGCLASS, compress_after "any", + hypertable REGCLASS, + compress_after "any" = NULL, if_not_exists BOOL = false, schedule_interval INTERVAL = NULL, initial_start TIMESTAMPTZ = NULL, - timezone TEXT = NULL + timezone TEXT = NULL, + compress_created_before INTERVAL = NULL ) RETURNS INTEGER AS '@MODULE_PATHNAME@', 'ts_policy_compression_add' @@ -83,6 +86,7 @@ LANGUAGE C VOLATILE; /* 1 step policies */ /* Add policies */ +/* Unsupported drop_created_before/compress_created_before in add/alter for caggs */ CREATE OR REPLACE FUNCTION timescaledb_experimental.add_policies( relation REGCLASS, if_not_exists BOOL = false, @@ -128,4 +132,4 @@ CREATE OR REPLACE FUNCTION timescaledb_experimental.show_policies( relation REGCLASS) RETURNS SETOF JSONB AS '@MODULE_PATHNAME@', 'ts_policies_show' -LANGUAGE C VOLATILE; \ No newline at end of file +LANGUAGE C VOLATILE; diff --git a/sql/policy_internal.sql b/sql/policy_internal.sql index 99a32d4a8f8..2ea47567b39 100644 --- a/sql/policy_internal.sql +++ b/sql/policy_internal.sql @@ -41,7 +41,8 @@ _timescaledb_functions.policy_compression_execute( lag ANYELEMENT, maxchunks INTEGER, verbose_log BOOLEAN, - recompress_enabled BOOLEAN) + recompress_enabled BOOLEAN, + use_creation_time BOOLEAN) AS $$ DECLARE htoid REGCLASS; @@ -54,6 +55,7 @@ DECLARE bit_compressed_unordered int := 2; bit_frozen int := 4; bit_compressed_partial int := 8; + creation_lag INTERVAL := NULL; BEGIN -- procedures with SET clause cannot execute transaction @@ -67,14 +69,26 @@ BEGIN -- for the integer cases, we have to compute the lag w.r.t -- the integer_now function and then pass on to show_chunks IF pg_typeof(lag) IN ('BIGINT'::regtype, 'INTEGER'::regtype, 'SMALLINT'::regtype) THEN + -- cannot have use_creation_time set with this + IF use_creation_time IS TRUE THEN + RAISE EXCEPTION 'job % cannot use creation time with integer_now function', job_id; + END IF; lag := _timescaledb_functions.subtract_integer_from_now(htoid, lag::BIGINT); END IF; + -- if use_creation_time has been specified then the lag needs to be used with the + -- "compress_created_before" argument. Otherwise the usual "older_than" argument + -- is good enough + IF use_creation_time IS TRUE THEN + creation_lag := lag; + lag := NULL; + END IF; + FOR chunk_rec IN SELECT show.oid, ch.schema_name, ch.table_name, ch.status FROM - @extschema@.show_chunks(htoid, older_than => lag) AS show(oid) + @extschema@.show_chunks(htoid, older_than => lag, created_before => creation_lag) AS show(oid) INNER JOIN pg_class pgc ON pgc.oid = show.oid INNER JOIN pg_namespace pgns ON pgc.relnamespace = pgns.oid INNER JOIN _timescaledb_catalog.chunk ch ON ch.table_name = pgc.relname AND ch.schema_name = pgns.nspname AND ch.hypertable_id = htid @@ -157,7 +171,9 @@ DECLARE dimtype REGTYPE; dimtypeinput REGPROC; compress_after TEXT; + compress_created_before TEXT; lag_value TEXT; + lag_bigint_value BIGINT; htid INTEGER; htoid REGCLASS; chunk_rec RECORD; @@ -165,6 +181,7 @@ DECLARE maxchunks INTEGER := 0; numchunks INTEGER := 1; recompress_enabled BOOL; + use_creation_time BOOL := FALSE; BEGIN -- procedures with SET clause cannot execute transaction @@ -183,11 +200,6 @@ BEGIN verbose_log := COALESCE(jsonb_object_field_text(config, 'verbose_log')::BOOLEAN, FALSE); maxchunks := COALESCE(jsonb_object_field_text(config, 'maxchunks_to_compress')::INTEGER, 0); recompress_enabled := COALESCE(jsonb_object_field_text(config, 'recompress')::BOOLEAN, TRUE); - compress_after := jsonb_object_field_text(config, 'compress_after'); - - IF compress_after IS NULL THEN - RAISE EXCEPTION 'job % config must have compress_after', job_id; - END IF; -- find primary dimension type -- SELECT dim.column_type INTO dimtype @@ -197,29 +209,40 @@ BEGIN ORDER BY dim.id LIMIT 1; - lag_value := jsonb_object_field_text(config, 'compress_after'); + compress_after := jsonb_object_field_text(config, 'compress_after'); + IF compress_after IS NULL THEN + compress_created_before := jsonb_object_field_text(config, 'compress_created_before'); + IF compress_created_before IS NULL THEN + RAISE EXCEPTION 'job % config must have compress_after or compress_created_before', job_id; + END IF; + lag_value := compress_created_before; + use_creation_time := true; + dimtype := 'INTERVAL' ::regtype; + ELSE + lag_value := compress_after; + END IF; -- execute the properly type casts for the lag value CASE dimtype - WHEN 'TIMESTAMP'::regtype, 'TIMESTAMPTZ'::regtype, 'DATE'::regtype THEN + WHEN 'TIMESTAMP'::regtype, 'TIMESTAMPTZ'::regtype, 'DATE'::regtype, 'INTERVAL' ::regtype THEN CALL _timescaledb_functions.policy_compression_execute( job_id, htid, lag_value::INTERVAL, - maxchunks, verbose_log, recompress_enabled + maxchunks, verbose_log, recompress_enabled, use_creation_time ); WHEN 'BIGINT'::regtype THEN CALL _timescaledb_functions.policy_compression_execute( job_id, htid, lag_value::BIGINT, - maxchunks, verbose_log, recompress_enabled + maxchunks, verbose_log, recompress_enabled, use_creation_time ); WHEN 'INTEGER'::regtype THEN CALL _timescaledb_functions.policy_compression_execute( job_id, htid, lag_value::INTEGER, - maxchunks, verbose_log, recompress_enabled + maxchunks, verbose_log, recompress_enabled, use_creation_time ); WHEN 'SMALLINT'::regtype THEN CALL _timescaledb_functions.policy_compression_execute( job_id, htid, lag_value::SMALLINT, - maxchunks, verbose_log, recompress_enabled + maxchunks, verbose_log, recompress_enabled, use_creation_time ); END CASE; END; diff --git a/sql/updates/latest-dev.sql b/sql/updates/latest-dev.sql index e743b81bd3f..fd64e2bc7ef 100644 --- a/sql/updates/latest-dev.sql +++ b/sql/updates/latest-dev.sql @@ -250,3 +250,156 @@ CREATE FUNCTION @extschema@.show_chunks( created_after "any" = NULL ) RETURNS SETOF REGCLASS AS '@MODULE_PATHNAME@', 'ts_chunk_show_chunks' LANGUAGE C STABLE PARALLEL SAFE; + +DROP FUNCTION @extschema@.add_retention_policy(REGCLASS, "any", BOOL, INTERVAL, TIMESTAMPTZ, TEXT); +CREATE FUNCTION @extschema@.add_retention_policy( + relation REGCLASS, + drop_after "any" = NULL, + if_not_exists BOOL = false, + schedule_interval INTERVAL = NULL, + initial_start TIMESTAMPTZ = NULL, + timezone TEXT = NULL, + drop_created_before INTERVAL = NULL +) +RETURNS INTEGER AS '@MODULE_PATHNAME@', 'ts_policy_retention_add' +LANGUAGE C VOLATILE; + +DROP FUNCTION @extschema@.add_compression_policy(REGCLASS, "any", BOOL, INTERVAL, TIMESTAMPTZ, TEXT); +CREATE FUNCTION @extschema@.add_compression_policy( + hypertable REGCLASS, + compress_after "any" = NULL, + if_not_exists BOOL = false, + schedule_interval INTERVAL = NULL, + initial_start TIMESTAMPTZ = NULL, + timezone TEXT = NULL, + compress_created_before INTERVAL = NULL +) +RETURNS INTEGER +AS '@MODULE_PATHNAME@', 'ts_policy_compression_add' +LANGUAGE C VOLATILE; + +DROP PROCEDURE IF EXISTS _timescaledb_functions.policy_compression_execute(INTEGER, INTEGER, ANYELEMENT, INTEGER, BOOLEAN, BOOLEAN); +DROP PROCEDURE IF EXISTS _timescaledb_internal.policy_compression_execute(INTEGER, INTEGER, ANYELEMENT, INTEGER, BOOLEAN, BOOLEAN); +CREATE PROCEDURE +_timescaledb_functions.policy_compression_execute( + job_id INTEGER, + htid INTEGER, + lag ANYELEMENT, + maxchunks INTEGER, + verbose_log BOOLEAN, + recompress_enabled BOOLEAN, + use_creation_time BOOLEAN) +AS $$ +DECLARE + htoid REGCLASS; + chunk_rec RECORD; + numchunks INTEGER := 1; + _message text; + _detail text; + -- chunk status bits: + bit_compressed int := 1; + bit_compressed_unordered int := 2; + bit_frozen int := 4; + bit_compressed_partial int := 8; + creation_lag INTERVAL := NULL; +BEGIN + + -- procedures with SET clause cannot execute transaction + -- control so we adjust search_path in procedure body + SET LOCAL search_path TO pg_catalog, pg_temp; + + SELECT format('%I.%I', schema_name, table_name) INTO htoid + FROM _timescaledb_catalog.hypertable + WHERE id = htid; + + -- for the integer cases, we have to compute the lag w.r.t + -- the integer_now function and then pass on to show_chunks + IF pg_typeof(lag) IN ('BIGINT'::regtype, 'INTEGER'::regtype, 'SMALLINT'::regtype) THEN + -- cannot have use_creation_time set with this + IF use_creation_time IS TRUE THEN + RAISE EXCEPTION 'job % cannot use creation time with integer_now function', job_id; + END IF; + lag := _timescaledb_functions.subtract_integer_from_now(htoid, lag::BIGINT); + END IF; + + -- if use_creation_time has been specified then the lag needs to be used with the + -- "compress_created_before" argument. Otherwise the usual "older_than" argument + -- is good enough + IF use_creation_time IS TRUE THEN + creation_lag := lag; + lag := NULL; + END IF; + + FOR chunk_rec IN + SELECT + show.oid, ch.schema_name, ch.table_name, ch.status + FROM + @extschema@.show_chunks(htoid, older_than => lag, created_before => creation_lag) AS show(oid) + INNER JOIN pg_class pgc ON pgc.oid = show.oid + INNER JOIN pg_namespace pgns ON pgc.relnamespace = pgns.oid + INNER JOIN _timescaledb_catalog.chunk ch ON ch.table_name = pgc.relname AND ch.schema_name = pgns.nspname AND ch.hypertable_id = htid + WHERE + ch.dropped IS FALSE + AND ( + ch.status = 0 OR + ( + ch.status & bit_compressed > 0 AND ( + ch.status & bit_compressed_unordered > 0 OR + ch.status & bit_compressed_partial > 0 + ) + ) + ) + LOOP + IF chunk_rec.status = 0 THEN + BEGIN + PERFORM @extschema@.compress_chunk( chunk_rec.oid ); + EXCEPTION WHEN OTHERS THEN + GET STACKED DIAGNOSTICS + _message = MESSAGE_TEXT, + _detail = PG_EXCEPTION_DETAIL; + RAISE WARNING 'compressing chunk "%" failed when compression policy is executed', chunk_rec.oid::regclass::text + USING DETAIL = format('Message: (%s), Detail: (%s).', _message, _detail), + ERRCODE = sqlstate; + END; + ELSIF + ( + chunk_rec.status & bit_compressed > 0 AND ( + chunk_rec.status & bit_compressed_unordered > 0 OR + chunk_rec.status & bit_compressed_partial > 0 + ) + ) AND recompress_enabled IS TRUE THEN + BEGIN + PERFORM @extschema@.decompress_chunk(chunk_rec.oid, if_compressed => true); + EXCEPTION WHEN OTHERS THEN + RAISE WARNING 'decompressing chunk "%" failed when compression policy is executed', chunk_rec.oid::regclass::text + USING DETAIL = format('Message: (%s), Detail: (%s).', _message, _detail), + ERRCODE = sqlstate; + END; + -- SET LOCAL is only active until end of transaction. + -- While we could use SET at the start of the function we do not + -- want to bleed out search_path to caller, so we do SET LOCAL + -- again after COMMIT + BEGIN + PERFORM @extschema@.compress_chunk(chunk_rec.oid); + EXCEPTION WHEN OTHERS THEN + RAISE WARNING 'compressing chunk "%" failed when compression policy is executed', chunk_rec.oid::regclass::text + USING DETAIL = format('Message: (%s), Detail: (%s).', _message, _detail), + ERRCODE = sqlstate; + END; + END IF; + COMMIT; + -- SET LOCAL is only active until end of transaction. + -- While we could use SET at the start of the function we do not + -- want to bleed out search_path to caller, so we do SET LOCAL + -- again after COMMIT + SET LOCAL search_path TO pg_catalog, pg_temp; + IF verbose_log THEN + RAISE LOG 'job % completed processing chunk %.%', job_id, chunk_rec.schema_name, chunk_rec.table_name; + END IF; + numchunks := numchunks + 1; + IF maxchunks > 0 AND numchunks >= maxchunks THEN + EXIT; + END IF; + END LOOP; +END; +$$ LANGUAGE PLPGSQL; diff --git a/sql/updates/reverse-dev.sql b/sql/updates/reverse-dev.sql index 0c4f29e3bbd..7f33372ef99 100644 --- a/sql/updates/reverse-dev.sql +++ b/sql/updates/reverse-dev.sql @@ -197,3 +197,139 @@ DROP FUNCTION IF EXISTS _timescaledb_functions.makeaclitem(regrole, regrole, tex DROP FUNCTION IF EXISTS _timescaledb_functions.cagg_validate_query(TEXT); +DROP FUNCTION @extschema@.add_retention_policy(REGCLASS, "any", BOOL, INTERVAL, TIMESTAMPTZ, TEXT, INTERVAL); +CREATE FUNCTION @extschema@.add_retention_policy( + relation REGCLASS, + drop_after "any", + if_not_exists BOOL = false, + schedule_interval INTERVAL = NULL, + initial_start TIMESTAMPTZ = NULL, + timezone TEXT = NULL +) +RETURNS INTEGER AS '@MODULE_PATHNAME@', 'ts_policy_retention_add' +LANGUAGE C VOLATILE; + +DROP FUNCTION @extschema@.add_compression_policy(REGCLASS, "any", BOOL, INTERVAL, TIMESTAMPTZ, TEXT, INTERVAL); +CREATE FUNCTION @extschema@.add_compression_policy( + hypertable REGCLASS, + compress_after "any", + if_not_exists BOOL = false, + schedule_interval INTERVAL = NULL, + initial_start TIMESTAMPTZ = NULL, + timezone TEXT = NULL +) +RETURNS INTEGER +AS '@MODULE_PATHNAME@', 'ts_policy_compression_add' +LANGUAGE C VOLATILE; + +DROP PROCEDURE IF EXISTS _timescaledb_functions.policy_compression_execute(INTEGER, INTEGER, ANYELEMENT, INTEGER, BOOLEAN, BOOLEAN, BOOLEAN); +DROP PROCEDURE IF EXISTS _timescaledb_internal.policy_compression_execute(INTEGER, INTEGER, ANYELEMENT, INTEGER, BOOLEAN, BOOLEAN, BOOLEAN); +CREATE PROCEDURE +_timescaledb_functions.policy_compression_execute( + job_id INTEGER, + htid INTEGER, + lag ANYELEMENT, + maxchunks INTEGER, + verbose_log BOOLEAN, + recompress_enabled BOOLEAN) +AS $$ +DECLARE + htoid REGCLASS; + chunk_rec RECORD; + numchunks INTEGER := 1; + _message text; + _detail text; + -- chunk status bits: + bit_compressed int := 1; + bit_compressed_unordered int := 2; + bit_frozen int := 4; + bit_compressed_partial int := 8; +BEGIN + + -- procedures with SET clause cannot execute transaction + -- control so we adjust search_path in procedure body + SET LOCAL search_path TO pg_catalog, pg_temp; + + SELECT format('%I.%I', schema_name, table_name) INTO htoid + FROM _timescaledb_catalog.hypertable + WHERE id = htid; + + -- for the integer cases, we have to compute the lag w.r.t + -- the integer_now function and then pass on to show_chunks + IF pg_typeof(lag) IN ('BIGINT'::regtype, 'INTEGER'::regtype, 'SMALLINT'::regtype) THEN + lag := _timescaledb_functions.subtract_integer_from_now(htoid, lag::BIGINT); + END IF; + + FOR chunk_rec IN + SELECT + show.oid, ch.schema_name, ch.table_name, ch.status + FROM + @extschema@.show_chunks(htoid, older_than => lag) AS show(oid) + INNER JOIN pg_class pgc ON pgc.oid = show.oid + INNER JOIN pg_namespace pgns ON pgc.relnamespace = pgns.oid + INNER JOIN _timescaledb_catalog.chunk ch ON ch.table_name = pgc.relname AND ch.schema_name = pgns.nspname AND ch.hypertable_id = htid + WHERE + ch.dropped IS FALSE + AND ( + ch.status = 0 OR + ( + ch.status & bit_compressed > 0 AND ( + ch.status & bit_compressed_unordered > 0 OR + ch.status & bit_compressed_partial > 0 + ) + ) + ) + LOOP + IF chunk_rec.status = 0 THEN + BEGIN + PERFORM @extschema@.compress_chunk( chunk_rec.oid ); + EXCEPTION WHEN OTHERS THEN + GET STACKED DIAGNOSTICS + _message = MESSAGE_TEXT, + _detail = PG_EXCEPTION_DETAIL; + RAISE WARNING 'compressing chunk "%" failed when compression policy is executed', chunk_rec.oid::regclass::text + USING DETAIL = format('Message: (%s), Detail: (%s).', _message, _detail), + ERRCODE = sqlstate; + END; + ELSIF + ( + chunk_rec.status & bit_compressed > 0 AND ( + chunk_rec.status & bit_compressed_unordered > 0 OR + chunk_rec.status & bit_compressed_partial > 0 + ) + ) AND recompress_enabled IS TRUE THEN + BEGIN + PERFORM @extschema@.decompress_chunk(chunk_rec.oid, if_compressed => true); + EXCEPTION WHEN OTHERS THEN + RAISE WARNING 'decompressing chunk "%" failed when compression policy is executed', chunk_rec.oid::regclass::text + USING DETAIL = format('Message: (%s), Detail: (%s).', _message, _detail), + ERRCODE = sqlstate; + END; + -- SET LOCAL is only active until end of transaction. + -- While we could use SET at the start of the function we do not + -- want to bleed out search_path to caller, so we do SET LOCAL + -- again after COMMIT + BEGIN + PERFORM @extschema@.compress_chunk(chunk_rec.oid); + EXCEPTION WHEN OTHERS THEN + RAISE WARNING 'compressing chunk "%" failed when compression policy is executed', chunk_rec.oid::regclass::text + USING DETAIL = format('Message: (%s), Detail: (%s).', _message, _detail), + ERRCODE = sqlstate; + END; + END IF; + COMMIT; + -- SET LOCAL is only active until end of transaction. + -- While we could use SET at the start of the function we do not + -- want to bleed out search_path to caller, so we do SET LOCAL + -- again after COMMIT + SET LOCAL search_path TO pg_catalog, pg_temp; + IF verbose_log THEN + RAISE LOG 'job % completed processing chunk %.%', job_id, chunk_rec.schema_name, chunk_rec.table_name; + END IF; + numchunks := numchunks + 1; + IF maxchunks > 0 AND numchunks >= maxchunks THEN + EXIT; + END IF; + END LOOP; +END; +$$ LANGUAGE PLPGSQL; diff --git a/tsl/src/bgw_policy/compression_api.c b/tsl/src/bgw_policy/compression_api.c index d884614aca4..937911d017e 100644 --- a/tsl/src/bgw_policy/compression_api.c +++ b/tsl/src/bgw_policy/compression_api.c @@ -96,6 +96,21 @@ policy_compression_get_compress_after_interval(const Jsonb *config) return interval; } +Interval * +policy_compression_get_compress_created_before_interval(const Jsonb *config) +{ + Interval *interval = + ts_jsonb_get_interval_field(config, POL_COMPRESSION_CONF_KEY_COMPRESS_CREATED_BEFORE); + + if (interval == NULL) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("could not find %s in config for job", + POL_COMPRESSION_CONF_KEY_COMPRESS_CREATED_BEFORE))); + + return interval; +} + int64 policy_recompression_get_recompress_after_int(const Jsonb *config) { @@ -184,7 +199,8 @@ policy_compression_check(PG_FUNCTION_ARGS) /* compression policies are added to hypertables or continuous aggregates */ Datum policy_compression_add_internal(Oid user_rel_oid, Datum compress_after_datum, - Oid compress_after_type, Interval *default_schedule_interval, + Oid compress_after_type, Interval *created_before, + Interval *default_schedule_interval, bool user_defined_schedule_interval, bool if_not_exists, bool fixed_schedule, TimestampTz initial_start, const char *timezone) @@ -201,6 +217,13 @@ policy_compression_add_internal(Oid user_rel_oid, Datum compress_after_datum, hcache = ts_hypertable_cache_pin(); hypertable = validate_compress_chunks_hypertable(hcache, user_rel_oid, &is_cagg); + /* creation time usage not supported with caggs yet */ + if (is_cagg && created_before != NULL) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot use \"compress_created_before\" with continuous aggregate \"%s\" ", + get_rel_name(user_rel_oid)))); + owner_id = ts_hypertable_permissions_check(user_rel_oid, GetUserId()); ts_bgw_job_validate_job_owner(owner_id); @@ -214,6 +237,8 @@ policy_compression_add_internal(Oid user_rel_oid, Datum compress_after_datum, if (jobs != NIL) { + bool is_equal = false; + if (!if_not_exists) { ts_cache_release(hcache); @@ -227,11 +252,25 @@ policy_compression_add_internal(Oid user_rel_oid, Datum compress_after_datum, Assert(list_length(jobs) == 1); BgwJob *existing = linitial(jobs); - if (policy_config_check_hypertable_lag_equality(existing->fd.config, - POL_COMPRESSION_CONF_KEY_COMPRESS_AFTER, - partitioning_type, - compress_after_type, - compress_after_datum)) + if (OidIsValid(compress_after_type)) + is_equal = + policy_config_check_hypertable_lag_equality(existing->fd.config, + POL_COMPRESSION_CONF_KEY_COMPRESS_AFTER, + partitioning_type, + compress_after_type, + compress_after_datum); + else + { + Assert(created_before != NULL); + is_equal = policy_config_check_hypertable_lag_equality( + existing->fd.config, + POL_COMPRESSION_CONF_KEY_COMPRESS_CREATED_BEFORE, + partitioning_type, + INTERVALOID, + IntervalPGetDatum(created_before)); + } + + if (is_equal) { /* If all arguments are the same, do nothing */ ts_cache_release(hcache); @@ -251,6 +290,22 @@ policy_compression_add_internal(Oid user_rel_oid, Datum compress_after_datum, PG_RETURN_INT32(-1); } } + + if (created_before) + { + Assert(!OidIsValid(compress_after_type)); + compress_after_type = INTERVALOID; + } + + if (!is_cagg && IS_INTEGER_TYPE(partitioning_type) && !IS_INTEGER_TYPE(compress_after_type) && + created_before == NULL) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid value for parameter %s", POL_COMPRESSION_CONF_KEY_COMPRESS_AFTER), + errhint("Integer duration in \"compress_after\" or interval time duration" + " in \"compress_created_before\" is required for hypertables with integer " + "time dimension."))); + if (dim && IS_TIMESTAMP_TYPE(ts_dimension_get_partition_type(dim)) && !user_defined_schedule_interval) { @@ -286,9 +341,14 @@ policy_compression_add_internal(Oid user_rel_oid, Datum compress_after_datum, switch (compress_after_type) { case INTERVALOID: - ts_jsonb_add_interval(parse_state, - POL_COMPRESSION_CONF_KEY_COMPRESS_AFTER, - DatumGetIntervalP(compress_after_datum)); + if (created_before) + ts_jsonb_add_interval(parse_state, + POL_COMPRESSION_CONF_KEY_COMPRESS_CREATED_BEFORE, + created_before); + else + ts_jsonb_add_interval(parse_state, + POL_COMPRESSION_CONF_KEY_COMPRESS_AFTER, + DatumGetIntervalP(compress_after_datum)); break; case INT2OID: ts_jsonb_add_int64(parse_state, @@ -360,7 +420,7 @@ policy_compression_add(PG_FUNCTION_ARGS) * The function is not STRICT but we can't allow required args to be NULL * so we need to act like a strict function in those cases */ - if (PG_ARGISNULL(0) || PG_ARGISNULL(1) || PG_ARGISNULL(2)) + if (PG_ARGISNULL(0) || PG_ARGISNULL(2)) { ts_feature_flag_check(FEATURE_POLICY); PG_RETURN_NULL(); @@ -368,7 +428,7 @@ policy_compression_add(PG_FUNCTION_ARGS) Oid user_rel_oid = PG_GETARG_OID(0); Datum compress_after_datum = PG_GETARG_DATUM(1); - Oid compress_after_type = get_fn_expr_argtype(fcinfo->flinfo, 1); + Oid compress_after_type = PG_ARGISNULL(1) ? InvalidOid : get_fn_expr_argtype(fcinfo->flinfo, 1); bool if_not_exists = PG_GETARG_BOOL(2); bool user_defined_schedule_interval = !(PG_ARGISNULL(3)); Interval *default_schedule_interval = @@ -378,10 +438,18 @@ policy_compression_add(PG_FUNCTION_ARGS) bool fixed_schedule = !PG_ARGISNULL(4); text *timezone = PG_ARGISNULL(5) ? NULL : PG_GETARG_TEXT_PP(5); char *valid_timezone = NULL; + Interval *created_before = PG_GETARG_INTERVAL_P(6); ts_feature_flag_check(FEATURE_POLICY); TS_PREVENT_FUNC_IF_READ_ONLY(); + /* compress_after and created_before cannot be specified [or omitted] together */ + if ((PG_ARGISNULL(1) && PG_ARGISNULL(6)) || (!PG_ARGISNULL(1) && !PG_ARGISNULL(6))) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg( + "need to specify one of \"compress_after\" or \"compress_created_before\""))); + /* if users pass in -infinity for initial_start, then use the current_timestamp instead */ if (fixed_schedule) { @@ -397,6 +465,7 @@ policy_compression_add(PG_FUNCTION_ARGS) retval = policy_compression_add_internal(user_rel_oid, compress_after_datum, compress_after_type, + created_before, default_schedule_interval, user_defined_schedule_interval, if_not_exists, diff --git a/tsl/src/bgw_policy/compression_api.h b/tsl/src/bgw_policy/compression_api.h index 7157e9b5fb5..57a8a5c04bc 100644 --- a/tsl/src/bgw_policy/compression_api.h +++ b/tsl/src/bgw_policy/compression_api.h @@ -21,12 +21,14 @@ extern Datum policy_compression_check(PG_FUNCTION_ARGS); int32 policy_compression_get_hypertable_id(const Jsonb *config); int64 policy_compression_get_compress_after_int(const Jsonb *config); Interval *policy_compression_get_compress_after_interval(const Jsonb *config); +Interval *policy_compression_get_compress_created_before_interval(const Jsonb *config); int32 policy_compression_get_maxchunks_per_job(const Jsonb *config); int64 policy_recompression_get_recompress_after_int(const Jsonb *config); Interval *policy_recompression_get_recompress_after_interval(const Jsonb *config); Datum policy_compression_add_internal(Oid user_rel_oid, Datum compress_after_datum, - Oid compress_after_type, Interval *default_schedule_interval, + Oid compress_after_type, Interval *created_before, + Interval *default_schedule_interval, bool user_defined_schedule_interval, bool if_not_exists, bool fixed_schedule, TimestampTz initial_start, const char *timezone); diff --git a/tsl/src/bgw_policy/job.c b/tsl/src/bgw_policy/job.c index ea25c6cda39..07c9a5166e0 100644 --- a/tsl/src/bgw_policy/job.c +++ b/tsl/src/bgw_policy/job.c @@ -73,9 +73,10 @@ log_retention_boundary(int elevel, PolicyRetentionData *policy_data, const char if (OidIsValid(outfuncid)) elog(elevel, - "%s \"%s\": dropping data older than %s", + "%s \"%s\": dropping data %s %s", message, relname, + policy_data->use_creation_time ? "created before" : "older than", DatumGetCString(OidFunctionCall1(outfuncid, boundary))); } @@ -294,7 +295,8 @@ policy_retention_execute(int32 job_id, Jsonb *config) chunk_invoke_drop_chunks(policy_data.object_relid, policy_data.boundary, - policy_data.boundary_type); + policy_data.boundary_type, + policy_data.use_creation_time); return true; } @@ -309,6 +311,9 @@ policy_retention_read_and_validate_config(Jsonb *config, PolicyRetentionData *po Datum boundary; Datum boundary_type; ContinuousAgg *cagg; + Interval *(*interval_getter)(const Jsonb *); + interval_getter = policy_retention_get_drop_after_interval; + bool use_creation_time = false; object_relid = ts_hypertable_id_to_relid(policy_retention_get_hypertable_id(config), false); hypertable = ts_hypertable_cache_get_cache_and_entry(object_relid, CACHE_FLAG_NONE, &hcache); @@ -329,14 +334,14 @@ policy_retention_read_and_validate_config(Jsonb *config, PolicyRetentionData *po /* if there's no int_now function the boundary is considered as an INTERVAL */ boundary_type = INTERVALOID; + interval_getter = policy_retention_get_drop_created_before_interval; + use_creation_time = true; } else boundary_type = ts_dimension_get_partition_type(open_dim); - boundary = get_window_boundary(open_dim, - config, - policy_retention_get_drop_after_int, - policy_retention_get_drop_after_interval); + boundary = + get_window_boundary(open_dim, config, policy_retention_get_drop_after_int, interval_getter); /* We need to do a reverse lookup here since the given hypertable might be a materialized hypertable, and thus need to call drop_chunks on the @@ -356,6 +361,7 @@ policy_retention_read_and_validate_config(Jsonb *config, PolicyRetentionData *po policy_data->object_relid = object_relid; policy_data->boundary = boundary; policy_data->boundary_type = boundary_type; + policy_data->use_creation_time = use_creation_time; } } diff --git a/tsl/src/bgw_policy/job.h b/tsl/src/bgw_policy/job.h index 90e3e104d60..4e8947aa84c 100644 --- a/tsl/src/bgw_policy/job.h +++ b/tsl/src/bgw_policy/job.h @@ -29,6 +29,7 @@ typedef struct PolicyRetentionData Oid object_relid; Datum boundary; Datum boundary_type; + bool use_creation_time; } PolicyRetentionData; typedef struct PolicyContinuousAggData diff --git a/tsl/src/bgw_policy/policies_v2.c b/tsl/src/bgw_policy/policies_v2.c index 71f0d1585a0..eeec71713e4 100644 --- a/tsl/src/bgw_policy/policies_v2.c +++ b/tsl/src/bgw_policy/policies_v2.c @@ -219,6 +219,7 @@ validate_and_create_policies(policies_info all_policies, bool if_exists) policy_compression_add_internal(all_policies.rel_oid, all_policies.compress->compress_after, all_policies.compress->compress_after_type, + NULL, DEFAULT_COMPRESSION_SCHEDULE_INTERVAL, false, if_exists, @@ -234,6 +235,7 @@ validate_and_create_policies(policies_info all_policies, bool if_exists) policy_retention_add_internal(all_policies.rel_oid, all_policies.retention->drop_after_type, all_policies.retention->drop_after, + NULL, (Interval) DEFAULT_RETENTION_SCHEDULE_INTERVAL, false, false, @@ -702,6 +704,7 @@ policies_show(PG_FUNCTION_ARGS) job, POL_COMPRESSION_CONF_KEY_COMPRESS_AFTER, SHOW_POLICY_KEY_COMPRESS_AFTER); + /* POL_COMPRESSION_CONF_KEY_COMPRESS_CREATED_BEFORE not supported with caggs */ ts_jsonb_add_interval(parse_state, SHOW_POLICY_KEY_COMPRESS_INTERVAL, &(job->fd.schedule_interval)); @@ -714,6 +717,7 @@ policies_show(PG_FUNCTION_ARGS) job, POL_RETENTION_CONF_KEY_DROP_AFTER, SHOW_POLICY_KEY_DROP_AFTER); + /* POL_RETENTION_CONF_KEY_DROP_CREATED_BEFORE not supported with caggs */ ts_jsonb_add_interval(parse_state, SHOW_POLICY_KEY_RETENTION_INTERVAL, &(job->fd.schedule_interval)); diff --git a/tsl/src/bgw_policy/policies_v2.h b/tsl/src/bgw_policy/policies_v2.h index 7de8734de1a..71e4335e023 100644 --- a/tsl/src/bgw_policy/policies_v2.h +++ b/tsl/src/bgw_policy/policies_v2.h @@ -23,6 +23,7 @@ #define POL_COMPRESSION_CONF_KEY_HYPERTABLE_ID "hypertable_id" #define POL_COMPRESSION_CONF_KEY_COMPRESS_AFTER "compress_after" #define POL_COMPRESSION_CONF_KEY_MAXCHUNKS_TO_COMPRESS "maxchunks_to_compress" +#define POL_COMPRESSION_CONF_KEY_COMPRESS_CREATED_BEFORE "compress_created_before" #define POLICY_RECOMPRESSION_PROC_NAME "policy_recompression" #define POL_RECOMPRESSION_CONF_KEY_RECOMPRESS_AFTER "recompress_after" @@ -31,6 +32,7 @@ #define POLICY_RETENTION_CHECK_NAME "policy_retention_check" #define POL_RETENTION_CONF_KEY_HYPERTABLE_ID "hypertable_id" #define POL_RETENTION_CONF_KEY_DROP_AFTER "drop_after" +#define POL_RETENTION_CONF_KEY_DROP_CREATED_BEFORE "drop_created_before" #define SHOW_POLICY_KEY_HYPERTABLE_ID "hypertable_id" #define SHOW_POLICY_KEY_POLICY_NAME "policy_name" @@ -38,8 +40,10 @@ #define SHOW_POLICY_KEY_REFRESH_START_OFFSET "refresh_start_offset" #define SHOW_POLICY_KEY_REFRESH_END_OFFSET "refresh_end_offset" #define SHOW_POLICY_KEY_COMPRESS_AFTER POL_COMPRESSION_CONF_KEY_COMPRESS_AFTER +#define SHOW_POLICY_KEY_COMPRESS_CREATED_BEFORE POL_COMPRESSION_CONF_KEY_COMPRESS_CREATED_BEFORE #define SHOW_POLICY_KEY_COMPRESS_INTERVAL "compress_interval" #define SHOW_POLICY_KEY_DROP_AFTER POL_RETENTION_CONF_KEY_DROP_AFTER +#define SHOW_POLICY_KEY_DROP_CREATED_BEFORE POL_RETENTION_CONF_KEY_DROP_CREATED_BEFORE #define SHOW_POLICY_KEY_RETENTION_INTERVAL "retention_interval" #define DEFAULT_RETENTION_SCHEDULE_INTERVAL \ diff --git a/tsl/src/bgw_policy/policy_utils.c b/tsl/src/bgw_policy/policy_utils.c index 6d283109faf..1a56ec353d9 100644 --- a/tsl/src/bgw_policy/policy_utils.c +++ b/tsl/src/bgw_policy/policy_utils.c @@ -32,7 +32,7 @@ bool policy_config_check_hypertable_lag_equality(Jsonb *config, const char *json_label, Oid partitioning_type, Oid lag_type, Datum lag_datum) { - if (IS_INTEGER_TYPE(partitioning_type)) + if (IS_INTEGER_TYPE(partitioning_type) && lag_type != INTERVALOID) { bool found; int64 config_value = ts_jsonb_get_int64_field(config, json_label, &found); diff --git a/tsl/src/bgw_policy/retention_api.c b/tsl/src/bgw_policy/retention_api.c index edfcf951ac7..72f92b4322e 100644 --- a/tsl/src/bgw_policy/retention_api.c +++ b/tsl/src/bgw_policy/retention_api.c @@ -101,6 +101,21 @@ policy_retention_get_drop_after_interval(const Jsonb *config) return interval; } +Interval * +policy_retention_get_drop_created_before_interval(const Jsonb *config) +{ + Interval *interval = + ts_jsonb_get_interval_field(config, POL_RETENTION_CONF_KEY_DROP_CREATED_BEFORE); + + if (interval == NULL) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("could not find %s in config for job", + POL_RETENTION_CONF_KEY_DROP_CREATED_BEFORE))); + + return interval; +} + static Hypertable * validate_drop_chunks_hypertable(Cache *hcache, Oid user_htoid) { @@ -151,8 +166,9 @@ validate_drop_chunks_hypertable(Cache *hcache, Oid user_htoid) Datum policy_retention_add_internal(Oid ht_oid, Oid window_type, Datum window_datum, - Interval default_schedule_interval, bool if_not_exists, - bool fixed_schedule, TimestampTz initial_start, const char *timezone) + Interval *created_before, Interval default_schedule_interval, + bool if_not_exists, bool fixed_schedule, TimestampTz initial_start, + const char *timezone) { NameData application_name; int32 job_id; @@ -183,9 +199,10 @@ policy_retention_add_internal(Oid ht_oid, Oid window_type, Datum window_datum, List *jobs = ts_bgw_job_find_by_proc_and_hypertable_id(POLICY_RETENTION_PROC_NAME, FUNCTIONS_SCHEMA_NAME, hypertable->fd.id); - if (jobs != NIL) { + bool is_equal = false; + if (!if_not_exists) ereport(ERROR, (errcode(ERRCODE_DUPLICATE_OBJECT), @@ -195,11 +212,25 @@ policy_retention_add_internal(Oid ht_oid, Oid window_type, Datum window_datum, Assert(list_length(jobs) == 1); BgwJob *existing = linitial(jobs); - if (policy_config_check_hypertable_lag_equality(existing->fd.config, - POL_RETENTION_CONF_KEY_DROP_AFTER, - partitioning_type, - window_type, - window_datum)) + if (OidIsValid(window_type)) + is_equal = + policy_config_check_hypertable_lag_equality(existing->fd.config, + POL_RETENTION_CONF_KEY_DROP_AFTER, + partitioning_type, + window_type, + window_datum); + else + { + Assert(created_before != NULL); + is_equal = policy_config_check_hypertable_lag_equality( + existing->fd.config, + POL_RETENTION_CONF_KEY_DROP_CREATED_BEFORE, + partitioning_type, + INTERVALOID, + IntervalPGetDatum(created_before)); + } + + if (is_equal) { /* If all arguments are the same, do nothing */ ts_cache_release(hcache); @@ -220,12 +251,28 @@ policy_retention_add_internal(Oid ht_oid, Oid window_type, Datum window_datum, } } - if (IS_INTEGER_TYPE(partitioning_type) && !IS_INTEGER_TYPE(window_type)) - ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("invalid value for parameter %s", POL_RETENTION_CONF_KEY_DROP_AFTER), - errhint("Integer time duration is required for hypertables" - " with integer time dimension."))); + if (created_before) + { + Assert(!OidIsValid(window_type)); + window_type = INTERVALOID; + } + + if (IS_INTEGER_TYPE(partitioning_type)) + { + ContinuousAgg *cagg = ts_continuous_agg_find_by_relid(ht_oid); + + if ((IS_INTEGER_TYPE(window_type) && cagg == NULL && + !OidIsValid(ts_get_integer_now_func(dim, false))) || + (!IS_INTEGER_TYPE(window_type) && created_before == NULL)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid value for parameter %s", POL_RETENTION_CONF_KEY_DROP_AFTER), + errhint( + "Integer duration in \"drop_after\" with valid \"integer_now\" function" + " or interval time duration" + " in \"drop_created_before\" is required for hypertables with integer " + "time dimension."))); + } if (IS_TIMESTAMP_TYPE(partitioning_type) && window_type != INTERVALOID) ereport(ERROR, @@ -242,9 +289,14 @@ policy_retention_add_internal(Oid ht_oid, Oid window_type, Datum window_datum, switch (window_type) { case INTERVALOID: - ts_jsonb_add_interval(parse_state, - POL_RETENTION_CONF_KEY_DROP_AFTER, - DatumGetIntervalP(window_datum)); + if (created_before) + ts_jsonb_add_interval(parse_state, + POL_RETENTION_CONF_KEY_DROP_CREATED_BEFORE, + created_before); + else + ts_jsonb_add_interval(parse_state, + POL_RETENTION_CONF_KEY_DROP_AFTER, + DatumGetIntervalP(window_datum)); break; case INT2OID: ts_jsonb_add_int64(parse_state, @@ -306,7 +358,7 @@ Datum policy_retention_add(PG_FUNCTION_ARGS) { /* behave like a strict function */ - if (PG_ARGISNULL(0) || PG_ARGISNULL(1) || PG_ARGISNULL(2)) + if (PG_ARGISNULL(0) || PG_ARGISNULL(2)) PG_RETURN_NULL(); Oid ht_oid = PG_GETARG_OID(0); @@ -319,11 +371,20 @@ policy_retention_add(PG_FUNCTION_ARGS) bool fixed_schedule = !PG_ARGISNULL(4); text *timezone = PG_ARGISNULL(5) ? NULL : PG_GETARG_TEXT_PP(5); char *valid_timezone = NULL; + // Interval *created_before = PG_ARGISNULL(6) ? NULL: PG_GETARG_INTERVAL_P(6); + Interval *created_before = PG_GETARG_INTERVAL_P(6); ts_feature_flag_check(FEATURE_POLICY); TS_PREVENT_FUNC_IF_READ_ONLY(); Datum retval; + + /* drop_after and created_before cannot be specified [or omitted] together */ + if ((PG_ARGISNULL(1) && PG_ARGISNULL(6)) || (!PG_ARGISNULL(1) && !PG_ARGISNULL(6))) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("need to specify one of \"drop_after\" or \"drop_created_before\""))); + /* if users pass in -infinity for initial_start, then use the current_timestamp instead */ if (fixed_schedule) { @@ -338,6 +399,7 @@ policy_retention_add(PG_FUNCTION_ARGS) retval = policy_retention_add_internal(ht_oid, window_type, window_datum, + created_before, default_schedule_interval, if_not_exists, fixed_schedule, diff --git a/tsl/src/bgw_policy/retention_api.h b/tsl/src/bgw_policy/retention_api.h index 38bfa9d3d95..fe658824cd8 100644 --- a/tsl/src/bgw_policy/retention_api.h +++ b/tsl/src/bgw_policy/retention_api.h @@ -18,10 +18,11 @@ extern Datum policy_retention_remove(PG_FUNCTION_ARGS); int32 policy_retention_get_hypertable_id(const Jsonb *config); int64 policy_retention_get_drop_after_int(const Jsonb *config); Interval *policy_retention_get_drop_after_interval(const Jsonb *config); +Interval *policy_retention_get_drop_created_before_interval(const Jsonb *config); Datum policy_retention_add_internal(Oid ht_oid, Oid window_type, Datum window_datum, - Interval default_schedule_interval, bool if_not_exists, - bool fixed_schedule, TimestampTz initial_start, - const char *timezone); + Interval *created_before, Interval default_schedule_interval, + bool if_not_exists, bool fixed_schedule, + TimestampTz initial_start, const char *timezone); Datum policy_retention_remove_internal(Oid table_oid, bool if_exists); #endif /* TIMESCALEDB_TSL_BGW_POLICY_RETENTION_API_H */ diff --git a/tsl/src/chunk.c b/tsl/src/chunk.c index abb3ff409e7..d215a42cb2c 100644 --- a/tsl/src/chunk.c +++ b/tsl/src/chunk.c @@ -308,7 +308,7 @@ chunk_set_default_data_node(PG_FUNCTION_ARGS) * Returns the number of dropped chunks. */ int -chunk_invoke_drop_chunks(Oid relid, Datum older_than, Datum older_than_type) +chunk_invoke_drop_chunks(Oid relid, Datum older_than, Datum older_than_type, bool use_creation_time) { EState *estate; ExprContext *econtext; @@ -318,27 +318,26 @@ chunk_invoke_drop_chunks(Oid relid, Datum older_than, Datum older_than_type) SetExprState *state; Oid restype; Oid func_oid; - Const *argarr[DROP_CHUNKS_NARGS] = { - makeConst(REGCLASSOID, - -1, - InvalidOid, - sizeof(relid), - ObjectIdGetDatum(relid), - false, - false), - makeConst(older_than_type, - -1, - InvalidOid, - get_typlen(older_than_type), - older_than, - false, - get_typbyval(older_than_type)), - makeNullConst(older_than_type, -1, InvalidOid), - castNode(Const, makeBoolConst(false, true)), - /* For now, till we actually support created_before/created_after later */ - makeNullConst(older_than_type, -1, InvalidOid), - makeNullConst(older_than_type, -1, InvalidOid), - }; + Const *TypeNullCons = makeNullConst(older_than_type, -1, InvalidOid); + Const *IntervalVal = makeConst(older_than_type, + -1, + InvalidOid, + get_typlen(older_than_type), + older_than, + false, + get_typbyval(older_than_type)); + Const *argarr[DROP_CHUNKS_NARGS] = { makeConst(REGCLASSOID, + -1, + InvalidOid, + sizeof(relid), + ObjectIdGetDatum(relid), + false, + false), + TypeNullCons, + TypeNullCons, + castNode(Const, makeBoolConst(false, true)), + TypeNullCons, + TypeNullCons }; Oid type_id[DROP_CHUNKS_NARGS] = { REGCLASSOID, ANYOID, ANYOID, BOOLOID, ANYOID, ANYOID }; char *const schema_name = ts_extension_schema_name(); List *const fqn = list_make2(makeString(schema_name), makeString(DROP_CHUNKS_FUNCNAME)); @@ -349,6 +348,12 @@ chunk_invoke_drop_chunks(Oid relid, Datum older_than, Datum older_than_type) func_oid = LookupFuncName(fqn, lengthof(type_id), type_id, false); Assert(func_oid); /* LookupFuncName should not return an invalid OID */ + /* decide whether to use "older_than" or "drop_created_before" */ + if (use_creation_time) + argarr[4] = IntervalVal; + else + argarr[1] = IntervalVal; + /* Prepare the function expr with argument list */ get_func_result_type(func_oid, &restype, NULL); diff --git a/tsl/src/chunk.h b/tsl/src/chunk.h index 15d5a942e3c..062c32d7df8 100644 --- a/tsl/src/chunk.h +++ b/tsl/src/chunk.h @@ -18,7 +18,8 @@ extern Datum chunk_freeze_chunk(PG_FUNCTION_ARGS); extern Datum chunk_unfreeze_chunk(PG_FUNCTION_ARGS); extern Datum chunk_drop_stale_chunks(PG_FUNCTION_ARGS); extern void ts_chunk_drop_stale_chunks(const char *node_name, ArrayType *chunks_array); -extern int chunk_invoke_drop_chunks(Oid relid, Datum older_than, Datum older_than_type); +extern int chunk_invoke_drop_chunks(Oid relid, Datum older_than, Datum older_than_type, + bool use_creation_time); extern Datum chunk_create_replica_table(PG_FUNCTION_ARGS); extern void chunk_update_stale_metadata(Chunk *new_chunk, List *chunk_data_nodes); diff --git a/tsl/test/expected/bgw_policy.out b/tsl/test/expected/bgw_policy.out index 48536a5289f..b72cb018988 100644 --- a/tsl/test/expected/bgw_policy.out +++ b/tsl/test/expected/bgw_policy.out @@ -600,6 +600,14 @@ select * from _timescaledb_catalog.dimension; alter schema new_public rename to public; \c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER_2 +-- test that the behavior is strict when providing NULL required arguments +create table test_strict (time timestamptz not null, a int, b int); +select create_hypertable('test_strict', 'time'); + create_hypertable +-------------------------- + (6,public,test_strict,t) +(1 row) + \set ON_ERROR_STOP 0 select add_reorder_policy('test_table_perm', 'test_table_perm_pkey'); ERROR: must be owner of hypertable "test_table_perm" @@ -609,6 +617,8 @@ select add_retention_policy('test_table_perm', INTERVAL '4 months', true); ERROR: must be owner of hypertable "test_table_perm" select remove_retention_policy('test_table'); ERROR: must be owner of hypertable "test_table" +select add_retention_policy('test_strict', drop_after => NULL); +ERROR: need to specify one of "drop_after" or "drop_created_before" \set ON_ERROR_STOP 1 -- Check the number of non-telemetry policies. We check for telemetry -- policy in telemetry_community.sql @@ -623,21 +633,7 @@ GROUP BY proc_name; policy_retention | 2 (3 rows) --- test that the behavior is strict when providing NULL required arguments -create table test_strict (time timestamptz not null, a int, b int); -select create_hypertable('test_strict', 'time'); - create_hypertable --------------------------- - (6,public,test_strict,t) -(1 row) - -- test retention with null arguments -select add_retention_policy('test_strict', drop_after => NULL); - add_retention_policy ----------------------- - -(1 row) - select add_retention_policy(NULL, NULL); add_retention_policy ---------------------- @@ -665,12 +661,6 @@ select add_retention_policy('test_strict', interval '2 days', schedule_interval -- test compression with null arguments alter table test_strict set (timescaledb.compress); -select add_compression_policy('test_strict', compress_after => NULL); - add_compression_policy ------------------------- - -(1 row) - select add_compression_policy(NULL, compress_after => NULL); add_compression_policy ------------------------ @@ -723,6 +713,8 @@ select * from _timescaledb_config.bgw_job where id in (:retenion_id_missing_sche -- test policy check functions with NULL args \set ON_ERROR_STOP 0 +select add_compression_policy('test_strict', compress_after => NULL); +ERROR: need to specify one of "compress_after" or "compress_created_before" SELECT _timescaledb_functions.policy_compression_check(NULL); ERROR: config must not be NULL SELECT _timescaledb_functions.policy_refresh_continuous_aggregate_check(NULL); diff --git a/tsl/test/expected/cagg_policy.out b/tsl/test/expected/cagg_policy.out index c509b5b9881..f7ee678964e 100644 --- a/tsl/test/expected/cagg_policy.out +++ b/tsl/test/expected/cagg_policy.out @@ -1145,10 +1145,13 @@ ERROR: setup a refresh policy for "metrics_cagg" before setting up a compressio SELECT add_continuous_aggregate_policy('metrics_cagg', '7 day'::interval, '1 day'::interval, '1 h'::interval) as "REFRESH_JOB" \gset SELECT add_compression_policy('metrics_cagg', '8 day'::interval) AS "COMP_JOB" ; ERROR: compression not enabled on continuous aggregate "metrics_cagg" -\set ON_ERROR_STOP 1 ALTER MATERIALIZED VIEW metrics_cagg SET (timescaledb.compress); NOTICE: defaulting compress_segmentby to device_id NOTICE: defaulting compress_orderby to dayb +--cannot use compress_created_before with cagg +SELECT add_compression_policy('metrics_cagg', compress_created_before => '8 day'::interval) AS "COMP_JOB" ; +ERROR: cannot use "compress_created_before" with continuous aggregate "metrics_cagg" +\set ON_ERROR_STOP 1 SELECT add_compression_policy('metrics_cagg', '8 day'::interval) AS "COMP_JOB" ; COMP_JOB ---------- diff --git a/tsl/test/expected/compression_bgw-13.out b/tsl/test/expected/compression_bgw-13.out index fb242b7f10b..74818f05ac4 100644 --- a/tsl/test/expected/compression_bgw-13.out +++ b/tsl/test/expected/compression_bgw-13.out @@ -155,7 +155,7 @@ INSERT INTO test_table_smallint SELECT generate_series(1,5), 10; ALTER TABLE test_table_smallint SET (timescaledb.compress); \set ON_ERROR_STOP 0 select add_compression_policy( 'test_table_smallint', compress_after=> '1 day'::interval ); -ERROR: unsupported compress_after argument type, expected type : smallint +ERROR: invalid value for parameter compress_after \set ON_ERROR_STOP 1 SELECT add_compression_policy('test_table_smallint', 2::SMALLINT) AS compressjob_id \gset SELECT * FROM _timescaledb_config.bgw_job WHERE id = :compressjob_id; diff --git a/tsl/test/expected/compression_bgw-14.out b/tsl/test/expected/compression_bgw-14.out index 6470ec0e451..ceea22edd46 100644 --- a/tsl/test/expected/compression_bgw-14.out +++ b/tsl/test/expected/compression_bgw-14.out @@ -155,7 +155,7 @@ INSERT INTO test_table_smallint SELECT generate_series(1,5), 10; ALTER TABLE test_table_smallint SET (timescaledb.compress); \set ON_ERROR_STOP 0 select add_compression_policy( 'test_table_smallint', compress_after=> '1 day'::interval ); -ERROR: unsupported compress_after argument type, expected type : smallint +ERROR: invalid value for parameter compress_after \set ON_ERROR_STOP 1 SELECT add_compression_policy('test_table_smallint', 2::SMALLINT) AS compressjob_id \gset SELECT * FROM _timescaledb_config.bgw_job WHERE id = :compressjob_id; diff --git a/tsl/test/expected/compression_bgw-15.out b/tsl/test/expected/compression_bgw-15.out index 6470ec0e451..ceea22edd46 100644 --- a/tsl/test/expected/compression_bgw-15.out +++ b/tsl/test/expected/compression_bgw-15.out @@ -155,7 +155,7 @@ INSERT INTO test_table_smallint SELECT generate_series(1,5), 10; ALTER TABLE test_table_smallint SET (timescaledb.compress); \set ON_ERROR_STOP 0 select add_compression_policy( 'test_table_smallint', compress_after=> '1 day'::interval ); -ERROR: unsupported compress_after argument type, expected type : smallint +ERROR: invalid value for parameter compress_after \set ON_ERROR_STOP 1 SELECT add_compression_policy('test_table_smallint', 2::SMALLINT) AS compressjob_id \gset SELECT * FROM _timescaledb_config.bgw_job WHERE id = :compressjob_id; diff --git a/tsl/test/expected/compression_bgw-16.out b/tsl/test/expected/compression_bgw-16.out index 6470ec0e451..ceea22edd46 100644 --- a/tsl/test/expected/compression_bgw-16.out +++ b/tsl/test/expected/compression_bgw-16.out @@ -155,7 +155,7 @@ INSERT INTO test_table_smallint SELECT generate_series(1,5), 10; ALTER TABLE test_table_smallint SET (timescaledb.compress); \set ON_ERROR_STOP 0 select add_compression_policy( 'test_table_smallint', compress_after=> '1 day'::interval ); -ERROR: unsupported compress_after argument type, expected type : smallint +ERROR: invalid value for parameter compress_after \set ON_ERROR_STOP 1 SELECT add_compression_policy('test_table_smallint', 2::SMALLINT) AS compressjob_id \gset SELECT * FROM _timescaledb_config.bgw_job WHERE id = :compressjob_id; diff --git a/tsl/test/expected/compression_errors-13.out b/tsl/test/expected/compression_errors-13.out index aaf125b67f4..db524dfcedd 100644 --- a/tsl/test/expected/compression_errors-13.out +++ b/tsl/test/expected/compression_errors-13.out @@ -485,8 +485,8 @@ SELECT config FROM _timescaledb_config.bgw_job WHERE id = :compressjob_id; --should fail CALL run_job(:compressjob_id); -ERROR: job 1000 config must have compress_after -CONTEXT: PL/pgSQL function _timescaledb_functions.policy_compression(integer,jsonb) line 35 at RAISE +ERROR: job 1000 config must have compress_after or compress_created_before +CONTEXT: PL/pgSQL function _timescaledb_functions.policy_compression(integer,jsonb) line 48 at RAISE SELECT remove_compression_policy('test_table_int'); remove_compression_policy --------------------------- @@ -508,7 +508,7 @@ SELECT config FROM _timescaledb_config.bgw_job WHERE id = :compressjob_id; --should fail CALL run_job(:compressjob_id); ERROR: job 1001 config must have hypertable_id -CONTEXT: PL/pgSQL function _timescaledb_functions.policy_compression(integer,jsonb) line 26 at RAISE +CONTEXT: PL/pgSQL function _timescaledb_functions.policy_compression(integer,jsonb) line 29 at RAISE UPDATE _timescaledb_config.bgw_job SET config = NULL WHERE id = :compressjob_id; @@ -521,7 +521,7 @@ SELECT config FROM _timescaledb_config.bgw_job WHERE id = :compressjob_id; --should fail CALL run_job(:compressjob_id); ERROR: job 1001 has null config -CONTEXT: PL/pgSQL function _timescaledb_functions.policy_compression(integer,jsonb) line 21 at RAISE +CONTEXT: PL/pgSQL function _timescaledb_functions.policy_compression(integer,jsonb) line 24 at RAISE -- test ADD COLUMN IF NOT EXISTS CREATE TABLE metric (time TIMESTAMPTZ NOT NULL, val FLOAT8 NOT NULL, dev_id INT4 NOT NULL); SELECT create_hypertable('metric', 'time', 'dev_id', 10); diff --git a/tsl/test/expected/compression_errors-14.out b/tsl/test/expected/compression_errors-14.out index aaf125b67f4..db524dfcedd 100644 --- a/tsl/test/expected/compression_errors-14.out +++ b/tsl/test/expected/compression_errors-14.out @@ -485,8 +485,8 @@ SELECT config FROM _timescaledb_config.bgw_job WHERE id = :compressjob_id; --should fail CALL run_job(:compressjob_id); -ERROR: job 1000 config must have compress_after -CONTEXT: PL/pgSQL function _timescaledb_functions.policy_compression(integer,jsonb) line 35 at RAISE +ERROR: job 1000 config must have compress_after or compress_created_before +CONTEXT: PL/pgSQL function _timescaledb_functions.policy_compression(integer,jsonb) line 48 at RAISE SELECT remove_compression_policy('test_table_int'); remove_compression_policy --------------------------- @@ -508,7 +508,7 @@ SELECT config FROM _timescaledb_config.bgw_job WHERE id = :compressjob_id; --should fail CALL run_job(:compressjob_id); ERROR: job 1001 config must have hypertable_id -CONTEXT: PL/pgSQL function _timescaledb_functions.policy_compression(integer,jsonb) line 26 at RAISE +CONTEXT: PL/pgSQL function _timescaledb_functions.policy_compression(integer,jsonb) line 29 at RAISE UPDATE _timescaledb_config.bgw_job SET config = NULL WHERE id = :compressjob_id; @@ -521,7 +521,7 @@ SELECT config FROM _timescaledb_config.bgw_job WHERE id = :compressjob_id; --should fail CALL run_job(:compressjob_id); ERROR: job 1001 has null config -CONTEXT: PL/pgSQL function _timescaledb_functions.policy_compression(integer,jsonb) line 21 at RAISE +CONTEXT: PL/pgSQL function _timescaledb_functions.policy_compression(integer,jsonb) line 24 at RAISE -- test ADD COLUMN IF NOT EXISTS CREATE TABLE metric (time TIMESTAMPTZ NOT NULL, val FLOAT8 NOT NULL, dev_id INT4 NOT NULL); SELECT create_hypertable('metric', 'time', 'dev_id', 10); diff --git a/tsl/test/expected/compression_errors-15.out b/tsl/test/expected/compression_errors-15.out index aaf125b67f4..db524dfcedd 100644 --- a/tsl/test/expected/compression_errors-15.out +++ b/tsl/test/expected/compression_errors-15.out @@ -485,8 +485,8 @@ SELECT config FROM _timescaledb_config.bgw_job WHERE id = :compressjob_id; --should fail CALL run_job(:compressjob_id); -ERROR: job 1000 config must have compress_after -CONTEXT: PL/pgSQL function _timescaledb_functions.policy_compression(integer,jsonb) line 35 at RAISE +ERROR: job 1000 config must have compress_after or compress_created_before +CONTEXT: PL/pgSQL function _timescaledb_functions.policy_compression(integer,jsonb) line 48 at RAISE SELECT remove_compression_policy('test_table_int'); remove_compression_policy --------------------------- @@ -508,7 +508,7 @@ SELECT config FROM _timescaledb_config.bgw_job WHERE id = :compressjob_id; --should fail CALL run_job(:compressjob_id); ERROR: job 1001 config must have hypertable_id -CONTEXT: PL/pgSQL function _timescaledb_functions.policy_compression(integer,jsonb) line 26 at RAISE +CONTEXT: PL/pgSQL function _timescaledb_functions.policy_compression(integer,jsonb) line 29 at RAISE UPDATE _timescaledb_config.bgw_job SET config = NULL WHERE id = :compressjob_id; @@ -521,7 +521,7 @@ SELECT config FROM _timescaledb_config.bgw_job WHERE id = :compressjob_id; --should fail CALL run_job(:compressjob_id); ERROR: job 1001 has null config -CONTEXT: PL/pgSQL function _timescaledb_functions.policy_compression(integer,jsonb) line 21 at RAISE +CONTEXT: PL/pgSQL function _timescaledb_functions.policy_compression(integer,jsonb) line 24 at RAISE -- test ADD COLUMN IF NOT EXISTS CREATE TABLE metric (time TIMESTAMPTZ NOT NULL, val FLOAT8 NOT NULL, dev_id INT4 NOT NULL); SELECT create_hypertable('metric', 'time', 'dev_id', 10); diff --git a/tsl/test/expected/compression_errors-16.out b/tsl/test/expected/compression_errors-16.out index 6e9289e518e..12c98f13cb8 100644 --- a/tsl/test/expected/compression_errors-16.out +++ b/tsl/test/expected/compression_errors-16.out @@ -485,8 +485,8 @@ SELECT config FROM _timescaledb_config.bgw_job WHERE id = :compressjob_id; --should fail CALL run_job(:compressjob_id); -ERROR: job 1000 config must have compress_after -CONTEXT: PL/pgSQL function _timescaledb_functions.policy_compression(integer,jsonb) line 35 at RAISE +ERROR: job 1000 config must have compress_after or compress_created_before +CONTEXT: PL/pgSQL function _timescaledb_functions.policy_compression(integer,jsonb) line 48 at RAISE SELECT remove_compression_policy('test_table_int'); remove_compression_policy --------------------------- @@ -508,7 +508,7 @@ SELECT config FROM _timescaledb_config.bgw_job WHERE id = :compressjob_id; --should fail CALL run_job(:compressjob_id); ERROR: job 1001 config must have hypertable_id -CONTEXT: PL/pgSQL function _timescaledb_functions.policy_compression(integer,jsonb) line 26 at RAISE +CONTEXT: PL/pgSQL function _timescaledb_functions.policy_compression(integer,jsonb) line 29 at RAISE UPDATE _timescaledb_config.bgw_job SET config = NULL WHERE id = :compressjob_id; @@ -521,7 +521,7 @@ SELECT config FROM _timescaledb_config.bgw_job WHERE id = :compressjob_id; --should fail CALL run_job(:compressjob_id); ERROR: job 1001 has null config -CONTEXT: PL/pgSQL function _timescaledb_functions.policy_compression(integer,jsonb) line 21 at RAISE +CONTEXT: PL/pgSQL function _timescaledb_functions.policy_compression(integer,jsonb) line 24 at RAISE -- test ADD COLUMN IF NOT EXISTS CREATE TABLE metric (time TIMESTAMPTZ NOT NULL, val FLOAT8 NOT NULL, dev_id INT4 NOT NULL); SELECT create_hypertable('metric', 'time', 'dev_id', 10); diff --git a/tsl/test/expected/policy_generalization.out b/tsl/test/expected/policy_generalization.out index 1a83734f46f..7e4547fbfeb 100644 --- a/tsl/test/expected/policy_generalization.out +++ b/tsl/test/expected/policy_generalization.out @@ -73,4 +73,113 @@ select count(*) from timescaledb_information.chunks where hypertable_name='test' 0 (1 row) +-- retention policy +INSERT INTO test SELECT i, i %10, 0.10 FROM generate_series(1, 100, 1) i; +\set ON_ERROR_STOP 0 +-- interval input for "drop_after" for INTEGER partitioning errors out +SELECT add_retention_policy('test', INTERVAL '5 seconds', true); +ERROR: invalid value for parameter drop_after +-- integer input for "drop_after" for INTEGER partitioning without valid +-- integer_now function errors out +SELECT add_retention_policy('test', 2000, true); +ERROR: invalid value for parameter drop_after +-- both drop_created_before and drop_after should error out +SELECT add_retention_policy('test', drop_after => INTERVAL '5 seconds', + drop_created_before => INTERVAL '2 seconds'); +ERROR: need to specify one of "drop_after" or "drop_created_before" +\set ON_ERROR_STOP 1 +SELECT add_retention_policy('test', drop_created_before => INTERVAL '2 seconds', + if_not_exists => true) as drop_chunks_job_id \gset +CALL run_job(:drop_chunks_job_id); +select count(*) from timescaledb_information.chunks where hypertable_name='test'; + count +------- + 11 +(1 row) + +SELECT pg_sleep(3); + pg_sleep +---------- + +(1 row) + +CALL run_job(:drop_chunks_job_id); +select count(*) from timescaledb_information.chunks where hypertable_name='test'; + count +------- + 0 +(1 row) + +-- check for WARNING/NOTICE if policy already exists +SELECT add_retention_policy('test', drop_created_before => INTERVAL '2 seconds', + if_not_exists => true); +NOTICE: retention policy already exists for hypertable "test", skipping + add_retention_policy +---------------------- + -1 +(1 row) + +SELECT add_retention_policy('test', drop_created_before => INTERVAL '20 seconds', + if_not_exists => true); +WARNING: retention policy already exists for hypertable "test" + add_retention_policy +---------------------- + -1 +(1 row) + +SELECT remove_retention_policy('test'); + remove_retention_policy +------------------------- + +(1 row) + +-- compression policy +ALTER TABLE test SET (timescaledb.compress); +INSERT INTO test SELECT i, i %10, 0.10 FROM generate_series(1, 100, 1) i; +-- Chunk compression status +SELECT DISTINCT compression_status FROM _timescaledb_internal.compressed_chunk_stats; + compression_status +-------------------- + Uncompressed +(1 row) + +-- Compression policy +SELECT add_compression_policy('test', compress_created_before => INTERVAL '2 seconds') AS compress_chunks_job_id \gset +SELECT pg_sleep(3); + pg_sleep +---------- + +(1 row) + +CALL run_job(:compress_chunks_job_id); +-- Chunk compression status +SELECT DISTINCT compression_status FROM _timescaledb_internal.compressed_chunk_stats; + compression_status +-------------------- + Compressed +(1 row) + +-- check for WARNING/NOTICE if policy already exists +SELECT add_compression_policy('test', compress_created_before => INTERVAL '2 seconds', + if_not_exists => true); +NOTICE: compression policy already exists for hypertable "test", skipping + add_compression_policy +------------------------ + -1 +(1 row) + +SELECT add_compression_policy('test', compress_created_before => INTERVAL '20 seconds', + if_not_exists => true); +WARNING: compression policy already exists for hypertable "test" + add_compression_policy +------------------------ + -1 +(1 row) + +SELECT remove_compression_policy('test'); + remove_compression_policy +--------------------------- + t +(1 row) + DROP TABLE test; diff --git a/tsl/test/expected/tsl_tables.out b/tsl/test/expected/tsl_tables.out index 484c0c45611..15b66dd99fe 100644 --- a/tsl/test/expected/tsl_tables.out +++ b/tsl/test/expected/tsl_tables.out @@ -115,7 +115,7 @@ SELECT * FROM _timescaledb_config.bgw_job WHERE id >= 1000 ORDER BY id; select add_retention_policy(); ERROR: function add_retention_policy() does not exist at character 8 select add_retention_policy('test_table'); -ERROR: function add_retention_policy(unknown) does not exist at character 8 +ERROR: need to specify one of "drop_after" or "drop_created_before" select add_retention_policy(INTERVAL '3 hours'); ERROR: function add_retention_policy(interval) does not exist at character 8 select add_retention_policy('test_table', INTERVAL 'haha'); @@ -198,6 +198,7 @@ select * from _timescaledb_catalog.dimension; \c :TEST_DBNAME :ROLE_SUPERUSER CREATE SCHEMA IF NOT EXISTS my_new_schema; create or replace function my_new_schema.dummy_now2() returns BIGINT LANGUAGE SQL IMMUTABLE as 'SELECT 1::BIGINT'; +grant usage on SCHEMA my_new_schema to public; grant execute on ALL FUNCTIONS IN SCHEMA my_new_schema to public; select set_integer_now_func('test_table_int', 'my_new_schema.dummy_now2'); set_integer_now_func diff --git a/tsl/test/shared/expected/compat.out b/tsl/test/shared/expected/compat.out index 54419936385..9667ae1dc0a 100644 --- a/tsl/test/shared/expected/compat.out +++ b/tsl/test/shared/expected/compat.out @@ -407,8 +407,8 @@ ERROR: null values cannot be formatted as an SQL identifier CALL _timescaledb_internal.policy_compression(0,NULL); WARNING: procedure _timescaledb_internal.policy_compression(integer,jsonb) is deprecated and has been moved to _timescaledb_functions schema. this compatibility function will be removed in a future version. ERROR: job 0 has null config -CALL _timescaledb_internal.policy_compression_execute(0,0,NULL::interval,0,true,true); -WARNING: procedure _timescaledb_internal.policy_compression_execute(integer,integer,anyelement,integer,boolean,boolean) is deprecated and has been moved to _timescaledb_functions schema. this compatibility function will be removed in a future version. +CALL _timescaledb_internal.policy_compression_execute(0,0,NULL::interval,0,true,true,true); +WARNING: procedure _timescaledb_internal.policy_compression_execute(integer,integer,anyelement,integer,boolean,boolean,boolean) is deprecated and has been moved to _timescaledb_functions schema. this compatibility function will be removed in a future version. ERROR: invalid hypertable or continuous aggregate CALL _timescaledb_internal.policy_recompression(0,NULL); WARNING: procedure _timescaledb_internal.policy_recompression(integer,jsonb) is deprecated and has been moved to _timescaledb_functions schema. this compatibility function will be removed in a future version. diff --git a/tsl/test/shared/expected/extension.out b/tsl/test/shared/expected/extension.out index 7db0572be44..2336ab5e860 100644 --- a/tsl/test/shared/expected/extension.out +++ b/tsl/test/shared/expected/extension.out @@ -114,7 +114,7 @@ ORDER BY pronamespace::regnamespace::text COLLATE "C", p.oid::regprocedure::text _timescaledb_functions.ping_data_node(name,interval) _timescaledb_functions.policy_compression(integer,jsonb) _timescaledb_functions.policy_compression_check(jsonb) - _timescaledb_functions.policy_compression_execute(integer,integer,anyelement,integer,boolean,boolean) + _timescaledb_functions.policy_compression_execute(integer,integer,anyelement,integer,boolean,boolean,boolean) _timescaledb_functions.policy_job_error_retention(integer,jsonb) _timescaledb_functions.policy_job_error_retention_check(jsonb) _timescaledb_functions.policy_recompression(integer,jsonb) @@ -225,7 +225,7 @@ ORDER BY pronamespace::regnamespace::text COLLATE "C", p.oid::regprocedure::text _timescaledb_internal.ping_data_node(name,interval) _timescaledb_internal.policy_compression(integer,jsonb) _timescaledb_internal.policy_compression_check(jsonb) - _timescaledb_internal.policy_compression_execute(integer,integer,anyelement,integer,boolean,boolean) + _timescaledb_internal.policy_compression_execute(integer,integer,anyelement,integer,boolean,boolean,boolean) _timescaledb_internal.policy_job_error_retention(integer,jsonb) _timescaledb_internal.policy_job_error_retention_check(jsonb) _timescaledb_internal.policy_recompression(integer,jsonb) @@ -262,14 +262,14 @@ ORDER BY pronamespace::regnamespace::text COLLATE "C", p.oid::regprocedure::text debug_waitpoint_enable(text) debug_waitpoint_id(text) debug_waitpoint_release(text) - add_compression_policy(regclass,"any",boolean,interval,timestamp with time zone,text) + add_compression_policy(regclass,"any",boolean,interval,timestamp with time zone,text,interval) add_continuous_aggregate_policy(regclass,"any","any",interval,boolean,timestamp with time zone,text) add_data_node(name,text,name,integer,boolean,boolean,text) add_dimension(regclass,_timescaledb_internal.dimension_info,boolean) add_dimension(regclass,name,integer,anyelement,regproc,boolean) add_job(regproc,interval,jsonb,timestamp with time zone,boolean,regproc,boolean,text) add_reorder_policy(regclass,name,boolean,timestamp with time zone,text) - add_retention_policy(regclass,"any",boolean,interval,timestamp with time zone,text) + add_retention_policy(regclass,"any",boolean,interval,timestamp with time zone,text,interval) alter_data_node(name,text,name,integer,boolean) alter_job(integer,interval,interval,integer,interval,boolean,jsonb,timestamp with time zone,boolean,regproc,boolean,timestamp with time zone,text) approximate_row_count(regclass) diff --git a/tsl/test/shared/sql/compat.sql b/tsl/test/shared/sql/compat.sql index b45081c7685..cb6b7cda9ec 100644 --- a/tsl/test/shared/sql/compat.sql +++ b/tsl/test/shared/sql/compat.sql @@ -100,7 +100,7 @@ CALL _timescaledb_internal.cagg_migrate_execute_override_cagg(NULL,NULL); CALL _timescaledb_internal.cagg_migrate_execute_plan(NULL); CALL _timescaledb_internal.cagg_migrate_execute_refresh_new_cagg(NULL,NULL); CALL _timescaledb_internal.policy_compression(0,NULL); -CALL _timescaledb_internal.policy_compression_execute(0,0,NULL::interval,0,true,true); +CALL _timescaledb_internal.policy_compression_execute(0,0,NULL::interval,0,true,true,true); CALL _timescaledb_internal.policy_recompression(0,NULL); CALL _timescaledb_internal.policy_refresh_continuous_aggregate(0,NULL); CALL _timescaledb_internal.policy_reorder(0,NULL); diff --git a/tsl/test/sql/bgw_policy.sql b/tsl/test/sql/bgw_policy.sql index 4666138669e..e8844cc3c3b 100644 --- a/tsl/test/sql/bgw_policy.sql +++ b/tsl/test/sql/bgw_policy.sql @@ -313,13 +313,17 @@ select * from _timescaledb_catalog.dimension; alter schema new_public rename to public; \c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER_2 +-- test that the behavior is strict when providing NULL required arguments +create table test_strict (time timestamptz not null, a int, b int); +select create_hypertable('test_strict', 'time'); + \set ON_ERROR_STOP 0 select add_reorder_policy('test_table_perm', 'test_table_perm_pkey'); select remove_reorder_policy('test_table'); select add_retention_policy('test_table_perm', INTERVAL '4 months', true); select remove_retention_policy('test_table'); - +select add_retention_policy('test_strict', drop_after => NULL); \set ON_ERROR_STOP 1 -- Check the number of non-telemetry policies. We check for telemetry @@ -330,11 +334,7 @@ WHERE proc_name NOT LIKE '%telemetry%' GROUP BY proc_name; --- test that the behavior is strict when providing NULL required arguments -create table test_strict (time timestamptz not null, a int, b int); -select create_hypertable('test_strict', 'time'); -- test retention with null arguments -select add_retention_policy('test_strict', drop_after => NULL); select add_retention_policy(NULL, NULL); select add_retention_policy(NULL, drop_after => interval '2 days'); -- this is an optional argument @@ -342,7 +342,6 @@ select add_retention_policy('test_strict', drop_after => interval '2 days', if_n select add_retention_policy('test_strict', interval '2 days', schedule_interval => NULL); -- test compression with null arguments alter table test_strict set (timescaledb.compress); -select add_compression_policy('test_strict', compress_after => NULL); select add_compression_policy(NULL, compress_after => NULL); select add_compression_policy('test_strict', INTERVAL '2 weeks', if_not_exists => NULL); select add_compression_policy('test_strict', INTERVAL '2 weeks', schedule_interval => NULL); @@ -366,6 +365,7 @@ select * from _timescaledb_config.bgw_job where id in (:retenion_id_missing_sche -- test policy check functions with NULL args \set ON_ERROR_STOP 0 +select add_compression_policy('test_strict', compress_after => NULL); SELECT _timescaledb_functions.policy_compression_check(NULL); SELECT _timescaledb_functions.policy_refresh_continuous_aggregate_check(NULL); SELECT _timescaledb_functions.policy_reorder_check(NULL); diff --git a/tsl/test/sql/cagg_policy.sql b/tsl/test/sql/cagg_policy.sql index 18935a5d5c9..c752c422723 100644 --- a/tsl/test/sql/cagg_policy.sql +++ b/tsl/test/sql/cagg_policy.sql @@ -540,9 +540,12 @@ SELECT add_compression_policy('metrics_cagg', '1 day'::interval); --can set compression policy only after enabling compression -- SELECT add_continuous_aggregate_policy('metrics_cagg', '7 day'::interval, '1 day'::interval, '1 h'::interval) as "REFRESH_JOB" \gset SELECT add_compression_policy('metrics_cagg', '8 day'::interval) AS "COMP_JOB" ; +ALTER MATERIALIZED VIEW metrics_cagg SET (timescaledb.compress); + +--cannot use compress_created_before with cagg +SELECT add_compression_policy('metrics_cagg', compress_created_before => '8 day'::interval) AS "COMP_JOB" ; \set ON_ERROR_STOP 1 -ALTER MATERIALIZED VIEW metrics_cagg SET (timescaledb.compress); SELECT add_compression_policy('metrics_cagg', '8 day'::interval) AS "COMP_JOB" ; SELECT remove_compression_policy('metrics_cagg'); diff --git a/tsl/test/sql/policy_generalization.sql b/tsl/test/sql/policy_generalization.sql index 7162b706f62..4a610a38bb3 100644 --- a/tsl/test/sql/policy_generalization.sql +++ b/tsl/test/sql/policy_generalization.sql @@ -28,4 +28,51 @@ select count(*) from timescaledb_information.chunks where hypertable_name='test' SELECT count(*) from drop_chunks('test', created_after => INTERVAL '1 hour'); select count(*) from timescaledb_information.chunks where hypertable_name='test'; +-- retention policy +INSERT INTO test SELECT i, i %10, 0.10 FROM generate_series(1, 100, 1) i; +\set ON_ERROR_STOP 0 +-- interval input for "drop_after" for INTEGER partitioning errors out +SELECT add_retention_policy('test', INTERVAL '5 seconds', true); +-- integer input for "drop_after" for INTEGER partitioning without valid +-- integer_now function errors out +SELECT add_retention_policy('test', 2000, true); +-- both drop_created_before and drop_after should error out +SELECT add_retention_policy('test', drop_after => INTERVAL '5 seconds', + drop_created_before => INTERVAL '2 seconds'); +\set ON_ERROR_STOP 1 +SELECT add_retention_policy('test', drop_created_before => INTERVAL '2 seconds', + if_not_exists => true) as drop_chunks_job_id \gset +CALL run_job(:drop_chunks_job_id); +select count(*) from timescaledb_information.chunks where hypertable_name='test'; +SELECT pg_sleep(3); +CALL run_job(:drop_chunks_job_id); +select count(*) from timescaledb_information.chunks where hypertable_name='test'; +-- check for WARNING/NOTICE if policy already exists +SELECT add_retention_policy('test', drop_created_before => INTERVAL '2 seconds', + if_not_exists => true); +SELECT add_retention_policy('test', drop_created_before => INTERVAL '20 seconds', + if_not_exists => true); +SELECT remove_retention_policy('test'); + +-- compression policy +ALTER TABLE test SET (timescaledb.compress); +INSERT INTO test SELECT i, i %10, 0.10 FROM generate_series(1, 100, 1) i; + +-- Chunk compression status +SELECT DISTINCT compression_status FROM _timescaledb_internal.compressed_chunk_stats; + +-- Compression policy +SELECT add_compression_policy('test', compress_created_before => INTERVAL '2 seconds') AS compress_chunks_job_id \gset +SELECT pg_sleep(3); +CALL run_job(:compress_chunks_job_id); + +-- Chunk compression status +SELECT DISTINCT compression_status FROM _timescaledb_internal.compressed_chunk_stats; +-- check for WARNING/NOTICE if policy already exists +SELECT add_compression_policy('test', compress_created_before => INTERVAL '2 seconds', + if_not_exists => true); +SELECT add_compression_policy('test', compress_created_before => INTERVAL '20 seconds', + if_not_exists => true); +SELECT remove_compression_policy('test'); + DROP TABLE test; diff --git a/tsl/test/sql/tsl_tables.sql b/tsl/test/sql/tsl_tables.sql index 73cd5c47234..b2ff2053d74 100644 --- a/tsl/test/sql/tsl_tables.sql +++ b/tsl/test/sql/tsl_tables.sql @@ -90,6 +90,7 @@ select * from _timescaledb_catalog.dimension; CREATE SCHEMA IF NOT EXISTS my_new_schema; create or replace function my_new_schema.dummy_now2() returns BIGINT LANGUAGE SQL IMMUTABLE as 'SELECT 1::BIGINT'; +grant usage on SCHEMA my_new_schema to public; grant execute on ALL FUNCTIONS IN SCHEMA my_new_schema to public; select set_integer_now_func('test_table_int', 'my_new_schema.dummy_now2');