Skip to content

Commit

Permalink
Use creation time in retention/compression policy
Browse files Browse the repository at this point in the history
The retention and compression policies can now use drop_created_before
and compress_created_before arguments respectively to specify chunk
selection using their creation times.

We don't support creation times for CAggs, yet.
  • Loading branch information
nikkhils committed Oct 24, 2023
1 parent 1981e8e commit e3f3c00
Show file tree
Hide file tree
Showing 30 changed files with 423 additions and 135 deletions.
6 changes: 3 additions & 3 deletions sql/compat.sql
Original file line number Diff line number Diff line change
Expand Up @@ -1015,12 +1015,12 @@ END$$
SET search_path TO pg_catalog,pg_temp;


CREATE OR REPLACE PROCEDURE _timescaledb_internal.policy_compression_execute(job_id integer,htid integer,lag anyelement,maxchunks integer,verbose_log boolean,recompress_enabled boolean) LANGUAGE PLPGSQL AS $$
CREATE OR REPLACE PROCEDURE _timescaledb_internal.policy_compression_execute(job_id integer,htid integer,lag anyelement,maxchunks integer,verbose_log boolean,recompress_enabled boolean,use_creation_time boolean) LANGUAGE PLPGSQL AS $$
BEGIN
IF current_setting('timescaledb.enable_deprecation_warnings', true)::bool THEN
RAISE WARNING 'procedure _timescaledb_internal.policy_compression_execute(integer,integer,anyelement,integer,boolean,boolean) is deprecated and has been moved to _timescaledb_functions schema. this compatibility function will be removed in a future version.';
RAISE WARNING 'procedure _timescaledb_internal.policy_compression_execute(integer,integer,anyelement,integer,boolean,boolean,boolean) is deprecated and has been moved to _timescaledb_functions schema. this compatibility function will be removed in a future version.';
END IF;
CALL _timescaledb_functions.policy_compression_execute($1,$2,$3,$4,$5,$6);
CALL _timescaledb_functions.policy_compression_execute($1,$2,$3,$4,$5,$6,$7);
END$$
SET search_path TO pg_catalog,pg_temp;

Expand Down
14 changes: 9 additions & 5 deletions sql/policy_api.sql
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@
-- might be kept, but data within the window will never be deleted.
CREATE OR REPLACE FUNCTION @[email protected]_retention_policy(
relation REGCLASS,
drop_after "any",
drop_after "any" = NULL,
if_not_exists BOOL = false,
schedule_interval INTERVAL = NULL,
initial_start TIMESTAMPTZ = NULL,
timezone TEXT = NULL
timezone TEXT = NULL,
drop_created_before INTERVAL = NULL
)
RETURNS INTEGER AS '@MODULE_PATHNAME@', 'ts_policy_retention_add'
LANGUAGE C VOLATILE;
Expand Down Expand Up @@ -45,11 +46,13 @@ LANGUAGE C VOLATILE STRICT;

/* compression policy */
CREATE OR REPLACE FUNCTION @[email protected]_compression_policy(
hypertable REGCLASS, compress_after "any",
hypertable REGCLASS,
compress_after "any" = NULL,
if_not_exists BOOL = false,
schedule_interval INTERVAL = NULL,
initial_start TIMESTAMPTZ = NULL,
timezone TEXT = NULL
timezone TEXT = NULL,
compress_created_before INTERVAL = NULL
)
RETURNS INTEGER
AS '@MODULE_PATHNAME@', 'ts_policy_compression_add'
Expand Down Expand Up @@ -83,6 +86,7 @@ LANGUAGE C VOLATILE;
/* 1 step policies */

/* Add policies */
/* Unsupported drop_created_before/compress_created_before in add/alter for caggs */
CREATE OR REPLACE FUNCTION timescaledb_experimental.add_policies(
relation REGCLASS,
if_not_exists BOOL = false,
Expand Down Expand Up @@ -128,4 +132,4 @@ CREATE OR REPLACE FUNCTION timescaledb_experimental.show_policies(
relation REGCLASS)
RETURNS SETOF JSONB
AS '@MODULE_PATHNAME@', 'ts_policies_show'
LANGUAGE C VOLATILE;
LANGUAGE C VOLATILE;
49 changes: 36 additions & 13 deletions sql/policy_internal.sql
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ _timescaledb_functions.policy_compression_execute(
lag ANYELEMENT,
maxchunks INTEGER,
verbose_log BOOLEAN,
recompress_enabled BOOLEAN)
recompress_enabled BOOLEAN,
use_creation_time BOOLEAN)
AS $$
DECLARE
htoid REGCLASS;
Expand All @@ -54,6 +55,7 @@ DECLARE
bit_compressed_unordered int := 2;
bit_frozen int := 4;
bit_compressed_partial int := 8;
creation_lag INTERVAL := NULL;
BEGIN

-- procedures with SET clause cannot execute transaction
Expand All @@ -67,14 +69,26 @@ BEGIN
-- for the integer cases, we have to compute the lag w.r.t
-- the integer_now function and then pass on to show_chunks
IF pg_typeof(lag) IN ('BIGINT'::regtype, 'INTEGER'::regtype, 'SMALLINT'::regtype) THEN
-- cannot have use_creation_time set with this
IF use_creation_time IS TRUE THEN
RAISE EXCEPTION 'job % cannot use creation time with integer_now function', job_id;
END IF;
lag := _timescaledb_functions.subtract_integer_from_now(htoid, lag::BIGINT);
END IF;

-- if use_creation_time has been specified then the lag needs to be used with the
-- "compress_created_before" argument. Otherwise the usual "older_than" argument
-- is good enough
IF use_creation_time IS TRUE THEN
creation_lag := lag;
lag := NULL;
END IF;

FOR chunk_rec IN
SELECT
show.oid, ch.schema_name, ch.table_name, ch.status
FROM
@[email protected]_chunks(htoid, older_than => lag) AS show(oid)
@[email protected]_chunks(htoid, older_than => lag, created_before => creation_lag) AS show(oid)
INNER JOIN pg_class pgc ON pgc.oid = show.oid
INNER JOIN pg_namespace pgns ON pgc.relnamespace = pgns.oid
INNER JOIN _timescaledb_catalog.chunk ch ON ch.table_name = pgc.relname AND ch.schema_name = pgns.nspname AND ch.hypertable_id = htid
Expand Down Expand Up @@ -151,14 +165,17 @@ DECLARE
dimtype REGTYPE;
dimtypeinput REGPROC;
compress_after TEXT;
compress_created_before TEXT;
lag_value TEXT;
lag_bigint_value BIGINT;
htid INTEGER;
htoid REGCLASS;
chunk_rec RECORD;
verbose_log BOOL;
maxchunks INTEGER := 0;
numchunks INTEGER := 1;
recompress_enabled BOOL;
use_creation_time BOOL := FALSE;
BEGIN

-- procedures with SET clause cannot execute transaction
Expand All @@ -177,11 +194,6 @@ BEGIN
verbose_log := COALESCE(jsonb_object_field_text(config, 'verbose_log')::BOOLEAN, FALSE);
maxchunks := COALESCE(jsonb_object_field_text(config, 'maxchunks_to_compress')::INTEGER, 0);
recompress_enabled := COALESCE(jsonb_object_field_text(config, 'recompress')::BOOLEAN, TRUE);
compress_after := jsonb_object_field_text(config, 'compress_after');

IF compress_after IS NULL THEN
RAISE EXCEPTION 'job % config must have compress_after', job_id;
END IF;

-- find primary dimension type --
SELECT dim.column_type INTO dimtype
Expand All @@ -191,29 +203,40 @@ BEGIN
ORDER BY dim.id
LIMIT 1;

lag_value := jsonb_object_field_text(config, 'compress_after');
compress_after := jsonb_object_field_text(config, 'compress_after');
IF compress_after IS NULL THEN
compress_created_before := jsonb_object_field_text(config, 'compress_created_before');
IF compress_created_before IS NULL THEN
RAISE EXCEPTION 'job % config must have compress_after or compress_created_before', job_id;
END IF;
lag_value := compress_created_before;
use_creation_time := true;
dimtype := 'INTERVAL' ::regtype;
ELSE
lag_value := compress_after;
END IF;

-- execute the properly type casts for the lag value
CASE dimtype
WHEN 'TIMESTAMP'::regtype, 'TIMESTAMPTZ'::regtype, 'DATE'::regtype THEN
WHEN 'TIMESTAMP'::regtype, 'TIMESTAMPTZ'::regtype, 'DATE'::regtype, 'INTERVAL' ::regtype THEN
CALL _timescaledb_functions.policy_compression_execute(
job_id, htid, lag_value::INTERVAL,
maxchunks, verbose_log, recompress_enabled
maxchunks, verbose_log, recompress_enabled, use_creation_time
);
WHEN 'BIGINT'::regtype THEN
CALL _timescaledb_functions.policy_compression_execute(
job_id, htid, lag_value::BIGINT,
maxchunks, verbose_log, recompress_enabled
maxchunks, verbose_log, recompress_enabled, use_creation_time
);
WHEN 'INTEGER'::regtype THEN
CALL _timescaledb_functions.policy_compression_execute(
job_id, htid, lag_value::INTEGER,
maxchunks, verbose_log, recompress_enabled
maxchunks, verbose_log, recompress_enabled, use_creation_time
);
WHEN 'SMALLINT'::regtype THEN
CALL _timescaledb_functions.policy_compression_execute(
job_id, htid, lag_value::SMALLINT,
maxchunks, verbose_log, recompress_enabled
maxchunks, verbose_log, recompress_enabled, use_creation_time
);
END CASE;
END;
Expand Down
5 changes: 3 additions & 2 deletions src/chunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -2280,8 +2280,9 @@ ts_chunk_show_chunks(PG_FUNCTION_ARGS)
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("cannot specify \"older_than\" and/or \"newer_than\" for "
"\"integer\"-like partitioning types"),
errhint("Use \"created_before\" and/or \"created_after\" which rely on the "
"chunk creation time values.")));
errhint(
"Use \"created_before\" and/or \"created_after\" which rely on the "
"chunk creation time values.")));
funcctx->user_fctx = get_chunks_in_creation_time_range(ht,
created_before,
created_after,
Expand Down
91 changes: 80 additions & 11 deletions tsl/src/bgw_policy/compression_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,21 @@ policy_compression_get_compress_after_interval(const Jsonb *config)
return interval;
}

Interval *
policy_compression_get_compress_created_before_interval(const Jsonb *config)

Check warning on line 100 in tsl/src/bgw_policy/compression_api.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/bgw_policy/compression_api.c#L100

Added line #L100 was not covered by tests
{
Interval *interval =
ts_jsonb_get_interval_field(config, POL_COMPRESSION_CONF_KEY_COMPRESS_CREATED_BEFORE);

Check warning on line 103 in tsl/src/bgw_policy/compression_api.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/bgw_policy/compression_api.c#L102-L103

Added lines #L102 - L103 were not covered by tests

if (interval == NULL)
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("could not find %s in config for job",
POL_COMPRESSION_CONF_KEY_COMPRESS_CREATED_BEFORE)));

return interval;

Check warning on line 111 in tsl/src/bgw_policy/compression_api.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/bgw_policy/compression_api.c#L111

Added line #L111 was not covered by tests
}

int64
policy_recompression_get_recompress_after_int(const Jsonb *config)
{
Expand Down Expand Up @@ -184,7 +199,8 @@ policy_compression_check(PG_FUNCTION_ARGS)
/* compression policies are added to hypertables or continuous aggregates */
Datum
policy_compression_add_internal(Oid user_rel_oid, Datum compress_after_datum,
Oid compress_after_type, Interval *default_schedule_interval,
Oid compress_after_type, Interval *created_before,
Interval *default_schedule_interval,
bool user_defined_schedule_interval, bool if_not_exists,
bool fixed_schedule, TimestampTz initial_start,
const char *timezone)
Expand All @@ -201,6 +217,13 @@ policy_compression_add_internal(Oid user_rel_oid, Datum compress_after_datum,
hcache = ts_hypertable_cache_pin();
hypertable = validate_compress_chunks_hypertable(hcache, user_rel_oid, &is_cagg);

/* creation time usage not supported with caggs yet */
if (is_cagg && created_before != NULL)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot use \"compress_created_before\" with continuous aggregate \"%s\" ",
get_rel_name(user_rel_oid))));

owner_id = ts_hypertable_permissions_check(user_rel_oid, GetUserId());
ts_bgw_job_validate_job_owner(owner_id);

Expand All @@ -214,6 +237,8 @@ policy_compression_add_internal(Oid user_rel_oid, Datum compress_after_datum,

if (jobs != NIL)
{
bool is_equal = false;

if (!if_not_exists)
{
ts_cache_release(hcache);
Expand All @@ -227,11 +252,25 @@ policy_compression_add_internal(Oid user_rel_oid, Datum compress_after_datum,
Assert(list_length(jobs) == 1);
BgwJob *existing = linitial(jobs);

if (policy_config_check_hypertable_lag_equality(existing->fd.config,
POL_COMPRESSION_CONF_KEY_COMPRESS_AFTER,
partitioning_type,
compress_after_type,
compress_after_datum))
if (OidIsValid(compress_after_type))
is_equal =
policy_config_check_hypertable_lag_equality(existing->fd.config,
POL_COMPRESSION_CONF_KEY_COMPRESS_AFTER,
partitioning_type,
compress_after_type,
compress_after_datum);
else
{
Assert(created_before != NULL);
is_equal = policy_config_check_hypertable_lag_equality(
existing->fd.config,

Check warning on line 266 in tsl/src/bgw_policy/compression_api.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/bgw_policy/compression_api.c#L265-L266

Added lines #L265 - L266 were not covered by tests
POL_COMPRESSION_CONF_KEY_COMPRESS_CREATED_BEFORE,
partitioning_type,

Check warning on line 268 in tsl/src/bgw_policy/compression_api.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/bgw_policy/compression_api.c#L268

Added line #L268 was not covered by tests
INTERVALOID,
IntervalPGetDatum(created_before));

Check warning on line 270 in tsl/src/bgw_policy/compression_api.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/bgw_policy/compression_api.c#L270

Added line #L270 was not covered by tests
}

if (is_equal)
{
/* If all arguments are the same, do nothing */
ts_cache_release(hcache);
Expand All @@ -251,6 +290,22 @@ policy_compression_add_internal(Oid user_rel_oid, Datum compress_after_datum,
PG_RETURN_INT32(-1);
}
}

if (created_before)
{
Assert(!OidIsValid(compress_after_type));
compress_after_type = INTERVALOID;
}

if (!is_cagg && IS_INTEGER_TYPE(partitioning_type) && !IS_INTEGER_TYPE(compress_after_type) &&
created_before == NULL)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid value for parameter %s", POL_COMPRESSION_CONF_KEY_COMPRESS_AFTER),
errhint("Integer duration in \"compress_after\" or interval time duration"
" in \"compress_created_before\" is required for hypertables with integer "
"time dimension.")));

if (dim && IS_TIMESTAMP_TYPE(ts_dimension_get_partition_type(dim)) &&
!user_defined_schedule_interval)
{
Expand Down Expand Up @@ -286,9 +341,14 @@ policy_compression_add_internal(Oid user_rel_oid, Datum compress_after_datum,
switch (compress_after_type)
{
case INTERVALOID:
ts_jsonb_add_interval(parse_state,
POL_COMPRESSION_CONF_KEY_COMPRESS_AFTER,
DatumGetIntervalP(compress_after_datum));
if (created_before)
ts_jsonb_add_interval(parse_state,
POL_COMPRESSION_CONF_KEY_COMPRESS_CREATED_BEFORE,
DatumGetIntervalP(created_before));
else
ts_jsonb_add_interval(parse_state,
POL_COMPRESSION_CONF_KEY_COMPRESS_AFTER,
DatumGetIntervalP(compress_after_datum));
break;
case INT2OID:
ts_jsonb_add_int64(parse_state,
Expand Down Expand Up @@ -360,15 +420,15 @@ policy_compression_add(PG_FUNCTION_ARGS)
* The function is not STRICT but we can't allow required args to be NULL
* so we need to act like a strict function in those cases
*/
if (PG_ARGISNULL(0) || PG_ARGISNULL(1) || PG_ARGISNULL(2))
if (PG_ARGISNULL(0) || PG_ARGISNULL(2))
{
ts_feature_flag_check(FEATURE_POLICY);
PG_RETURN_NULL();
}

Oid user_rel_oid = PG_GETARG_OID(0);
Datum compress_after_datum = PG_GETARG_DATUM(1);
Oid compress_after_type = get_fn_expr_argtype(fcinfo->flinfo, 1);
Oid compress_after_type = PG_ARGISNULL(1) ? InvalidOid : get_fn_expr_argtype(fcinfo->flinfo, 1);
bool if_not_exists = PG_GETARG_BOOL(2);
bool user_defined_schedule_interval = !(PG_ARGISNULL(3));
Interval *default_schedule_interval =
Expand All @@ -378,10 +438,18 @@ policy_compression_add(PG_FUNCTION_ARGS)
bool fixed_schedule = !PG_ARGISNULL(4);
text *timezone = PG_ARGISNULL(5) ? NULL : PG_GETARG_TEXT_PP(5);
char *valid_timezone = NULL;
Interval *created_before = PG_GETARG_INTERVAL_P(6);

ts_feature_flag_check(FEATURE_POLICY);
TS_PREVENT_FUNC_IF_READ_ONLY();

/* compress_after and created_before cannot be specified [or omitted] together */
if ((PG_ARGISNULL(1) && PG_ARGISNULL(6)) || (!PG_ARGISNULL(1) && !PG_ARGISNULL(6)))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg(
"need to specify one of \"compress_after\" or \"compress_created_before\"")));

/* if users pass in -infinity for initial_start, then use the current_timestamp instead */
if (fixed_schedule)
{
Expand All @@ -397,6 +465,7 @@ policy_compression_add(PG_FUNCTION_ARGS)
retval = policy_compression_add_internal(user_rel_oid,
compress_after_datum,
compress_after_type,
created_before,
default_schedule_interval,
user_defined_schedule_interval,
if_not_exists,
Expand Down
4 changes: 3 additions & 1 deletion tsl/src/bgw_policy/compression_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ extern Datum policy_compression_check(PG_FUNCTION_ARGS);
int32 policy_compression_get_hypertable_id(const Jsonb *config);
int64 policy_compression_get_compress_after_int(const Jsonb *config);
Interval *policy_compression_get_compress_after_interval(const Jsonb *config);
Interval *policy_compression_get_compress_created_before_interval(const Jsonb *config);
int32 policy_compression_get_maxchunks_per_job(const Jsonb *config);
int64 policy_recompression_get_recompress_after_int(const Jsonb *config);
Interval *policy_recompression_get_recompress_after_interval(const Jsonb *config);

Datum policy_compression_add_internal(Oid user_rel_oid, Datum compress_after_datum,
Oid compress_after_type, Interval *default_schedule_interval,
Oid compress_after_type, Interval *created_before,
Interval *default_schedule_interval,
bool user_defined_schedule_interval, bool if_not_exists,
bool fixed_schedule, TimestampTz initial_start,
const char *timezone);
Expand Down
Loading

0 comments on commit e3f3c00

Please sign in to comment.