diff --git a/.github/workflows/pgspot.yaml b/.github/workflows/pgspot.yaml
index ef0116adc6f..1912610f886 100644
--- a/.github/workflows/pgspot.yaml
+++ b/.github/workflows/pgspot.yaml
@@ -21,6 +21,10 @@ jobs:
         --proc-without-search-path 'extschema.recompress_chunk(chunk regclass,if_not_compressed boolean)'
         --proc-without-search-path '_timescaledb_internal.policy_compression(job_id integer,config jsonb)'
         --proc-without-search-path '_timescaledb_functions.policy_compression(job_id integer,config jsonb)'
+        --proc-without-search-path
+          '_timescaledb_internal.policy_compression_execute(job_id integer,htid integer,lag anyelement,maxchunks integer,verbose_log boolean,recompress_enabled boolean,use_creation_time boolean)'
+        --proc-without-search-path
+          '_timescaledb_functions.policy_compression_execute(job_id integer,htid integer,lag anyelement,maxchunks integer,verbose_log boolean,recompress_enabled boolean,use_creation_time boolean)'
         --proc-without-search-path
           '_timescaledb_internal.policy_compression_execute(job_id integer,htid integer,lag anyelement,maxchunks integer,verbose_log boolean,recompress_enabled boolean)'
         --proc-without-search-path
diff --git a/.unreleased/feature_6227 b/.unreleased/feature_6227
new file mode 100644
index 00000000000..39ba8386bee
--- /dev/null
+++ b/.unreleased/feature_6227
@@ -0,0 +1 @@
+Implements: #6227 Use creation time in retention/compression policy
diff --git a/sql/compat.sql b/sql/compat.sql
index e98375f07ee..9cd31c94b0d 100644
--- a/sql/compat.sql
+++ b/sql/compat.sql
@@ -1015,12 +1015,12 @@ END$$
 SET search_path TO pg_catalog,pg_temp;
 
 
-CREATE OR REPLACE PROCEDURE _timescaledb_internal.policy_compression_execute(job_id integer,htid integer,lag anyelement,maxchunks integer,verbose_log boolean,recompress_enabled boolean) LANGUAGE PLPGSQL AS $$
+CREATE OR REPLACE PROCEDURE _timescaledb_internal.policy_compression_execute(job_id integer,htid integer,lag anyelement,maxchunks integer,verbose_log boolean,recompress_enabled boolean,use_creation_time boolean) LANGUAGE PLPGSQL AS $$
 BEGIN
   IF current_setting('timescaledb.enable_deprecation_warnings', true)::bool THEN
-    RAISE WARNING 'procedure _timescaledb_internal.policy_compression_execute(integer,integer,anyelement,integer,boolean,boolean) is deprecated and has been moved to _timescaledb_functions schema. this compatibility function will be removed in a future version.';
+    RAISE WARNING 'procedure _timescaledb_internal.policy_compression_execute(integer,integer,anyelement,integer,boolean,boolean,boolean) is deprecated and has been moved to _timescaledb_functions schema. this compatibility function will be removed in a future version.';
   END IF;
-  CALL _timescaledb_functions.policy_compression_execute($1,$2,$3,$4,$5,$6);
+  CALL _timescaledb_functions.policy_compression_execute($1,$2,$3,$4,$5,$6,$7);
 END$$
 SET search_path TO pg_catalog,pg_temp;
 
diff --git a/sql/policy_api.sql b/sql/policy_api.sql
index 0900ee361d1..00ae6bd0ea2 100644
--- a/sql/policy_api.sql
+++ b/sql/policy_api.sql
@@ -12,11 +12,12 @@
 -- might be kept, but data within the window will never be deleted.
 CREATE OR REPLACE FUNCTION @extschema@.add_retention_policy(
        relation REGCLASS,
-       drop_after "any",
+       drop_after "any" = NULL,
        if_not_exists BOOL = false,
        schedule_interval INTERVAL = NULL,
        initial_start TIMESTAMPTZ = NULL,
-       timezone TEXT = NULL
+       timezone TEXT = NULL,
+       drop_created_before INTERVAL = NULL
 )
 RETURNS INTEGER AS '@MODULE_PATHNAME@', 'ts_policy_retention_add'
 LANGUAGE C VOLATILE;
@@ -45,11 +46,13 @@ LANGUAGE C VOLATILE STRICT;
 
 /* compression policy */
 CREATE OR REPLACE FUNCTION @extschema@.add_compression_policy(
-    hypertable REGCLASS, compress_after "any",
+    hypertable REGCLASS,
+    compress_after "any" = NULL,
     if_not_exists BOOL = false,
     schedule_interval INTERVAL = NULL,
     initial_start TIMESTAMPTZ = NULL,
-    timezone TEXT = NULL
+    timezone TEXT = NULL,
+    compress_created_before INTERVAL = NULL
 )
 RETURNS INTEGER
 AS '@MODULE_PATHNAME@', 'ts_policy_compression_add'
@@ -83,6 +86,7 @@ LANGUAGE C VOLATILE;
 /* 1 step policies */
 
 /* Add policies */
+/* Unsupported drop_created_before/compress_created_before in add/alter for caggs */
 CREATE OR REPLACE FUNCTION timescaledb_experimental.add_policies(
     relation REGCLASS,
     if_not_exists BOOL = false,
@@ -128,4 +132,4 @@ CREATE OR REPLACE FUNCTION timescaledb_experimental.show_policies(
     relation REGCLASS)
 RETURNS SETOF JSONB
 AS '@MODULE_PATHNAME@', 'ts_policies_show'
-LANGUAGE C  VOLATILE;
\ No newline at end of file
+LANGUAGE C  VOLATILE;
diff --git a/sql/policy_internal.sql b/sql/policy_internal.sql
index 99a32d4a8f8..2ea47567b39 100644
--- a/sql/policy_internal.sql
+++ b/sql/policy_internal.sql
@@ -41,7 +41,8 @@ _timescaledb_functions.policy_compression_execute(
   lag                 ANYELEMENT,
   maxchunks           INTEGER,
   verbose_log         BOOLEAN,
-  recompress_enabled  BOOLEAN)
+  recompress_enabled  BOOLEAN,
+  use_creation_time   BOOLEAN)
 AS $$
 DECLARE
   htoid       REGCLASS;
@@ -54,6 +55,7 @@ DECLARE
   bit_compressed_unordered int := 2;
   bit_frozen int := 4;
   bit_compressed_partial int := 8;
+  creation_lag INTERVAL := NULL;
 BEGIN
 
   -- procedures with SET clause cannot execute transaction
@@ -67,14 +69,26 @@ BEGIN
   -- for the integer cases, we have to compute the lag w.r.t
   -- the integer_now function and then pass on to show_chunks
   IF pg_typeof(lag) IN ('BIGINT'::regtype, 'INTEGER'::regtype, 'SMALLINT'::regtype) THEN
+    -- cannot have use_creation_time set with this
+    IF use_creation_time IS TRUE THEN
+        RAISE EXCEPTION 'job % cannot use creation time with integer_now function', job_id;
+    END IF;
     lag := _timescaledb_functions.subtract_integer_from_now(htoid, lag::BIGINT);
   END IF;
 
+  -- if use_creation_time has been specified then the lag needs to be used with the
+  -- "compress_created_before" argument. Otherwise the usual "older_than" argument
+  -- is good enough
+  IF use_creation_time IS TRUE THEN
+    creation_lag := lag;
+    lag := NULL;
+  END IF;
+
   FOR chunk_rec IN
     SELECT
       show.oid, ch.schema_name, ch.table_name, ch.status
     FROM
-      @extschema@.show_chunks(htoid, older_than => lag) AS show(oid)
+      @extschema@.show_chunks(htoid, older_than => lag, created_before => creation_lag) AS show(oid)
       INNER JOIN pg_class pgc ON pgc.oid = show.oid
       INNER JOIN pg_namespace pgns ON pgc.relnamespace = pgns.oid
       INNER JOIN _timescaledb_catalog.chunk ch ON ch.table_name = pgc.relname AND ch.schema_name = pgns.nspname AND ch.hypertable_id = htid
@@ -157,7 +171,9 @@ DECLARE
   dimtype             REGTYPE;
   dimtypeinput        REGPROC;
   compress_after      TEXT;
+  compress_created_before TEXT;
   lag_value           TEXT;
+  lag_bigint_value    BIGINT;
   htid                INTEGER;
   htoid               REGCLASS;
   chunk_rec           RECORD;
@@ -165,6 +181,7 @@ DECLARE
   maxchunks           INTEGER := 0;
   numchunks           INTEGER := 1;
   recompress_enabled  BOOL;
+  use_creation_time   BOOL := FALSE;
 BEGIN
 
   -- procedures with SET clause cannot execute transaction
@@ -183,11 +200,6 @@ BEGIN
   verbose_log         := COALESCE(jsonb_object_field_text(config, 'verbose_log')::BOOLEAN, FALSE);
   maxchunks           := COALESCE(jsonb_object_field_text(config, 'maxchunks_to_compress')::INTEGER, 0);
   recompress_enabled  := COALESCE(jsonb_object_field_text(config, 'recompress')::BOOLEAN, TRUE);
-  compress_after      := jsonb_object_field_text(config, 'compress_after');
-
-  IF compress_after IS NULL THEN
-    RAISE EXCEPTION 'job % config must have compress_after', job_id;
-  END IF;
 
   -- find primary dimension type --
   SELECT dim.column_type INTO dimtype
@@ -197,29 +209,40 @@ BEGIN
   ORDER BY dim.id
   LIMIT 1;
 
-  lag_value := jsonb_object_field_text(config, 'compress_after');
+  compress_after      := jsonb_object_field_text(config, 'compress_after');
+  IF compress_after IS NULL THEN
+    compress_created_before := jsonb_object_field_text(config, 'compress_created_before');
+    IF compress_created_before IS NULL THEN
+        RAISE EXCEPTION 'job % config must have compress_after or compress_created_before', job_id;
+    END IF;
+    lag_value := compress_created_before;
+    use_creation_time := true;
+    dimtype := 'INTERVAL' ::regtype;
+  ELSE
+    lag_value := compress_after;
+  END IF;
 
   -- execute the properly type casts for the lag value
   CASE dimtype
-    WHEN 'TIMESTAMP'::regtype, 'TIMESTAMPTZ'::regtype, 'DATE'::regtype THEN
+    WHEN 'TIMESTAMP'::regtype, 'TIMESTAMPTZ'::regtype, 'DATE'::regtype, 'INTERVAL' ::regtype  THEN
       CALL _timescaledb_functions.policy_compression_execute(
         job_id, htid, lag_value::INTERVAL,
-        maxchunks, verbose_log, recompress_enabled
+        maxchunks, verbose_log, recompress_enabled, use_creation_time
       );
     WHEN 'BIGINT'::regtype THEN
       CALL _timescaledb_functions.policy_compression_execute(
         job_id, htid, lag_value::BIGINT,
-        maxchunks, verbose_log, recompress_enabled
+        maxchunks, verbose_log, recompress_enabled, use_creation_time
       );
     WHEN 'INTEGER'::regtype THEN
       CALL _timescaledb_functions.policy_compression_execute(
         job_id, htid, lag_value::INTEGER,
-        maxchunks, verbose_log, recompress_enabled
+        maxchunks, verbose_log, recompress_enabled, use_creation_time
       );
     WHEN 'SMALLINT'::regtype THEN
       CALL _timescaledb_functions.policy_compression_execute(
         job_id, htid, lag_value::SMALLINT,
-        maxchunks, verbose_log, recompress_enabled
+        maxchunks, verbose_log, recompress_enabled, use_creation_time
       );
   END CASE;
 END;
diff --git a/sql/updates/latest-dev.sql b/sql/updates/latest-dev.sql
index e743b81bd3f..fd64e2bc7ef 100644
--- a/sql/updates/latest-dev.sql
+++ b/sql/updates/latest-dev.sql
@@ -250,3 +250,156 @@ CREATE FUNCTION @extschema@.show_chunks(
      created_after          "any" = NULL
  ) RETURNS SETOF REGCLASS AS '@MODULE_PATHNAME@', 'ts_chunk_show_chunks'
  LANGUAGE C STABLE PARALLEL SAFE;
+
+DROP FUNCTION @extschema@.add_retention_policy(REGCLASS, "any", BOOL, INTERVAL, TIMESTAMPTZ, TEXT);
+CREATE FUNCTION @extschema@.add_retention_policy(
+       relation REGCLASS,
+       drop_after "any" = NULL,
+       if_not_exists BOOL = false,
+       schedule_interval INTERVAL = NULL,
+       initial_start TIMESTAMPTZ = NULL,
+       timezone TEXT = NULL,
+       drop_created_before INTERVAL = NULL
+)
+RETURNS INTEGER AS '@MODULE_PATHNAME@', 'ts_policy_retention_add'
+LANGUAGE C VOLATILE;
+
+DROP FUNCTION @extschema@.add_compression_policy(REGCLASS, "any", BOOL, INTERVAL, TIMESTAMPTZ, TEXT);
+CREATE FUNCTION @extschema@.add_compression_policy(
+    hypertable REGCLASS,
+    compress_after "any" = NULL,
+    if_not_exists BOOL = false,
+    schedule_interval INTERVAL = NULL,
+    initial_start TIMESTAMPTZ = NULL,
+    timezone TEXT = NULL,
+    compress_created_before INTERVAL = NULL
+)
+RETURNS INTEGER
+AS '@MODULE_PATHNAME@', 'ts_policy_compression_add'
+LANGUAGE C VOLATILE;
+
+DROP PROCEDURE IF EXISTS _timescaledb_functions.policy_compression_execute(INTEGER, INTEGER, ANYELEMENT, INTEGER, BOOLEAN, BOOLEAN);
+DROP PROCEDURE IF EXISTS _timescaledb_internal.policy_compression_execute(INTEGER, INTEGER, ANYELEMENT, INTEGER, BOOLEAN, BOOLEAN);
+CREATE PROCEDURE
+_timescaledb_functions.policy_compression_execute(
+  job_id              INTEGER,
+  htid                INTEGER,
+  lag                 ANYELEMENT,
+  maxchunks           INTEGER,
+  verbose_log         BOOLEAN,
+  recompress_enabled  BOOLEAN,
+  use_creation_time   BOOLEAN)
+AS $$
+DECLARE
+  htoid       REGCLASS;
+  chunk_rec   RECORD;
+  numchunks   INTEGER := 1;
+  _message     text;
+  _detail      text;
+  -- chunk status bits:
+  bit_compressed int := 1;
+  bit_compressed_unordered int := 2;
+  bit_frozen int := 4;
+  bit_compressed_partial int := 8;
+  creation_lag INTERVAL := NULL;
+BEGIN
+
+  -- procedures with SET clause cannot execute transaction
+  -- control so we adjust search_path in procedure body
+  SET LOCAL search_path TO pg_catalog, pg_temp;
+
+  SELECT format('%I.%I', schema_name, table_name) INTO htoid
+  FROM _timescaledb_catalog.hypertable
+  WHERE id = htid;
+
+  -- for the integer cases, we have to compute the lag w.r.t
+  -- the integer_now function and then pass on to show_chunks
+  IF pg_typeof(lag) IN ('BIGINT'::regtype, 'INTEGER'::regtype, 'SMALLINT'::regtype) THEN
+    -- cannot have use_creation_time set with this
+    IF use_creation_time IS TRUE THEN
+        RAISE EXCEPTION 'job % cannot use creation time with integer_now function', job_id;
+    END IF;
+    lag := _timescaledb_functions.subtract_integer_from_now(htoid, lag::BIGINT);
+  END IF;
+
+  -- if use_creation_time has been specified then the lag needs to be used with the
+  -- "compress_created_before" argument. Otherwise the usual "older_than" argument
+  -- is good enough
+  IF use_creation_time IS TRUE THEN
+    creation_lag := lag;
+    lag := NULL;
+  END IF;
+
+  FOR chunk_rec IN
+    SELECT
+      show.oid, ch.schema_name, ch.table_name, ch.status
+    FROM
+      @extschema@.show_chunks(htoid, older_than => lag, created_before => creation_lag) AS show(oid)
+      INNER JOIN pg_class pgc ON pgc.oid = show.oid
+      INNER JOIN pg_namespace pgns ON pgc.relnamespace = pgns.oid
+      INNER JOIN _timescaledb_catalog.chunk ch ON ch.table_name = pgc.relname AND ch.schema_name = pgns.nspname AND ch.hypertable_id = htid
+    WHERE
+      ch.dropped IS FALSE
+      AND (
+        ch.status = 0 OR
+        (
+          ch.status & bit_compressed > 0 AND (
+            ch.status & bit_compressed_unordered > 0 OR
+            ch.status & bit_compressed_partial > 0
+          )
+        )
+      )
+  LOOP
+    IF chunk_rec.status = 0 THEN
+      BEGIN
+        PERFORM @extschema@.compress_chunk( chunk_rec.oid );
+      EXCEPTION WHEN OTHERS THEN
+        GET STACKED DIAGNOSTICS
+            _message = MESSAGE_TEXT,
+            _detail = PG_EXCEPTION_DETAIL;
+        RAISE WARNING 'compressing chunk "%" failed when compression policy is executed', chunk_rec.oid::regclass::text
+            USING DETAIL = format('Message: (%s), Detail: (%s).', _message, _detail),
+                  ERRCODE = sqlstate;
+      END;
+    ELSIF
+      (
+        chunk_rec.status & bit_compressed > 0 AND (
+          chunk_rec.status & bit_compressed_unordered > 0 OR
+          chunk_rec.status & bit_compressed_partial > 0
+        )
+      ) AND recompress_enabled IS TRUE THEN
+      BEGIN
+        PERFORM @extschema@.decompress_chunk(chunk_rec.oid, if_compressed => true);
+      EXCEPTION WHEN OTHERS THEN
+        RAISE WARNING 'decompressing chunk "%" failed when compression policy is executed', chunk_rec.oid::regclass::text
+            USING DETAIL = format('Message: (%s), Detail: (%s).', _message, _detail),
+                  ERRCODE = sqlstate;
+      END;
+      -- SET LOCAL is only active until end of transaction.
+      -- While we could use SET at the start of the function we do not
+      -- want to bleed out search_path to caller, so we do SET LOCAL
+      -- again after COMMIT
+      BEGIN
+        PERFORM @extschema@.compress_chunk(chunk_rec.oid);
+      EXCEPTION WHEN OTHERS THEN
+        RAISE WARNING 'compressing chunk "%" failed when compression policy is executed', chunk_rec.oid::regclass::text
+            USING DETAIL = format('Message: (%s), Detail: (%s).', _message, _detail),
+                  ERRCODE = sqlstate;
+      END;
+    END IF;
+    COMMIT;
+    -- SET LOCAL is only active until end of transaction.
+    -- While we could use SET at the start of the function we do not
+    -- want to bleed out search_path to caller, so we do SET LOCAL
+    -- again after COMMIT
+    SET LOCAL search_path TO pg_catalog, pg_temp;
+    IF verbose_log THEN
+       RAISE LOG 'job % completed processing chunk %.%', job_id, chunk_rec.schema_name, chunk_rec.table_name;
+    END IF;
+    numchunks := numchunks + 1;
+    IF maxchunks > 0 AND numchunks >= maxchunks THEN
+         EXIT;
+    END IF;
+  END LOOP;
+END;
+$$ LANGUAGE PLPGSQL;
diff --git a/sql/updates/reverse-dev.sql b/sql/updates/reverse-dev.sql
index 647e567d614..4d076479a4e 100644
--- a/sql/updates/reverse-dev.sql
+++ b/sql/updates/reverse-dev.sql
@@ -191,3 +191,140 @@ CREATE FUNCTION @extschema@.show_chunks(
      newer_than             "any" = NULL
  ) RETURNS SETOF REGCLASS AS '@MODULE_PATHNAME@', 'ts_chunk_show_chunks'
  LANGUAGE C STABLE PARALLEL SAFE;
+
+DROP FUNCTION @extschema@.add_retention_policy(REGCLASS, "any", BOOL, INTERVAL, TIMESTAMPTZ, TEXT, INTERVAL);
+CREATE FUNCTION @extschema@.add_retention_policy(
+       relation REGCLASS,
+       drop_after "any",
+       if_not_exists BOOL = false,
+       schedule_interval INTERVAL = NULL,
+       initial_start TIMESTAMPTZ = NULL,
+       timezone TEXT = NULL
+)
+RETURNS INTEGER AS '@MODULE_PATHNAME@', 'ts_policy_retention_add'
+LANGUAGE C VOLATILE;
+
+DROP FUNCTION @extschema@.add_compression_policy(REGCLASS, "any", BOOL, INTERVAL, TIMESTAMPTZ, TEXT, INTERVAL);
+CREATE FUNCTION @extschema@.add_compression_policy(
+    hypertable REGCLASS,
+    compress_after "any",
+    if_not_exists BOOL = false,
+    schedule_interval INTERVAL = NULL,
+    initial_start TIMESTAMPTZ = NULL,
+    timezone TEXT = NULL
+)
+RETURNS INTEGER
+AS '@MODULE_PATHNAME@', 'ts_policy_compression_add'
+LANGUAGE C VOLATILE;
+
+DROP PROCEDURE IF EXISTS _timescaledb_functions.policy_compression_execute(INTEGER, INTEGER, ANYELEMENT, INTEGER, BOOLEAN, BOOLEAN, BOOLEAN);
+DROP PROCEDURE IF EXISTS _timescaledb_internal.policy_compression_execute(INTEGER, INTEGER, ANYELEMENT, INTEGER, BOOLEAN, BOOLEAN, BOOLEAN);
+CREATE PROCEDURE
+_timescaledb_functions.policy_compression_execute(
+  job_id              INTEGER,
+  htid                INTEGER,
+  lag                 ANYELEMENT,
+  maxchunks           INTEGER,
+  verbose_log         BOOLEAN,
+  recompress_enabled  BOOLEAN)
+AS $$
+DECLARE
+  htoid       REGCLASS;
+  chunk_rec   RECORD;
+  numchunks   INTEGER := 1;
+  _message     text;
+  _detail      text;
+  -- chunk status bits:
+  bit_compressed int := 1;
+  bit_compressed_unordered int := 2;
+  bit_frozen int := 4;
+  bit_compressed_partial int := 8;
+BEGIN
+
+  -- procedures with SET clause cannot execute transaction
+  -- control so we adjust search_path in procedure body
+  SET LOCAL search_path TO pg_catalog, pg_temp;
+
+  SELECT format('%I.%I', schema_name, table_name) INTO htoid
+  FROM _timescaledb_catalog.hypertable
+  WHERE id = htid;
+
+  -- for the integer cases, we have to compute the lag w.r.t
+  -- the integer_now function and then pass on to show_chunks
+  IF pg_typeof(lag) IN ('BIGINT'::regtype, 'INTEGER'::regtype, 'SMALLINT'::regtype) THEN
+    lag := _timescaledb_functions.subtract_integer_from_now(htoid, lag::BIGINT);
+  END IF;
+
+  FOR chunk_rec IN
+    SELECT
+      show.oid, ch.schema_name, ch.table_name, ch.status
+    FROM
+      @extschema@.show_chunks(htoid, older_than => lag) AS show(oid)
+      INNER JOIN pg_class pgc ON pgc.oid = show.oid
+      INNER JOIN pg_namespace pgns ON pgc.relnamespace = pgns.oid
+      INNER JOIN _timescaledb_catalog.chunk ch ON ch.table_name = pgc.relname AND ch.schema_name = pgns.nspname AND ch.hypertable_id = htid
+    WHERE
+      ch.dropped IS FALSE
+      AND (
+        ch.status = 0 OR
+        (
+          ch.status & bit_compressed > 0 AND (
+            ch.status & bit_compressed_unordered > 0 OR
+            ch.status & bit_compressed_partial > 0
+          )
+        )
+      )
+  LOOP
+    IF chunk_rec.status = 0 THEN
+      BEGIN
+        PERFORM @extschema@.compress_chunk( chunk_rec.oid );
+      EXCEPTION WHEN OTHERS THEN
+        GET STACKED DIAGNOSTICS
+            _message = MESSAGE_TEXT,
+            _detail = PG_EXCEPTION_DETAIL;
+        RAISE WARNING 'compressing chunk "%" failed when compression policy is executed', chunk_rec.oid::regclass::text
+            USING DETAIL = format('Message: (%s), Detail: (%s).', _message, _detail),
+                  ERRCODE = sqlstate;
+      END;
+    ELSIF
+      (
+        chunk_rec.status & bit_compressed > 0 AND (
+          chunk_rec.status & bit_compressed_unordered > 0 OR
+          chunk_rec.status & bit_compressed_partial > 0
+        )
+      ) AND recompress_enabled IS TRUE THEN
+      BEGIN
+        PERFORM @extschema@.decompress_chunk(chunk_rec.oid, if_compressed => true);
+      EXCEPTION WHEN OTHERS THEN
+        RAISE WARNING 'decompressing chunk "%" failed when compression policy is executed', chunk_rec.oid::regclass::text
+            USING DETAIL = format('Message: (%s), Detail: (%s).', _message, _detail),
+                  ERRCODE = sqlstate;
+      END;
+      -- SET LOCAL is only active until end of transaction.
+      -- While we could use SET at the start of the function we do not
+      -- want to bleed out search_path to caller, so we do SET LOCAL
+      -- again after COMMIT
+      BEGIN
+        PERFORM @extschema@.compress_chunk(chunk_rec.oid);
+      EXCEPTION WHEN OTHERS THEN
+        RAISE WARNING 'compressing chunk "%" failed when compression policy is executed', chunk_rec.oid::regclass::text
+            USING DETAIL = format('Message: (%s), Detail: (%s).', _message, _detail),
+                  ERRCODE = sqlstate;
+      END;
+    END IF;
+    COMMIT;
+    -- SET LOCAL is only active until end of transaction.
+    -- While we could use SET at the start of the function we do not
+    -- want to bleed out search_path to caller, so we do SET LOCAL
+    -- again after COMMIT
+    SET LOCAL search_path TO pg_catalog, pg_temp;
+    IF verbose_log THEN
+       RAISE LOG 'job % completed processing chunk %.%', job_id, chunk_rec.schema_name, chunk_rec.table_name;
+    END IF;
+    numchunks := numchunks + 1;
+    IF maxchunks > 0 AND numchunks >= maxchunks THEN
+         EXIT;
+    END IF;
+  END LOOP;
+END;
+$$ LANGUAGE PLPGSQL;
diff --git a/tsl/src/bgw_policy/compression_api.c b/tsl/src/bgw_policy/compression_api.c
index d884614aca4..ae8d856630c 100644
--- a/tsl/src/bgw_policy/compression_api.c
+++ b/tsl/src/bgw_policy/compression_api.c
@@ -96,6 +96,21 @@ policy_compression_get_compress_after_interval(const Jsonb *config)
 	return interval;
 }
 
+Interval *
+policy_compression_get_compress_created_before_interval(const Jsonb *config)
+{
+	Interval *interval =
+		ts_jsonb_get_interval_field(config, POL_COMPRESSION_CONF_KEY_COMPRESS_CREATED_BEFORE);
+
+	if (interval == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_INTERNAL_ERROR),
+				 errmsg("could not find %s in config for job",
+						POL_COMPRESSION_CONF_KEY_COMPRESS_CREATED_BEFORE)));
+
+	return interval;
+}
+
 int64
 policy_recompression_get_recompress_after_int(const Jsonb *config)
 {
@@ -184,7 +199,8 @@ policy_compression_check(PG_FUNCTION_ARGS)
 /* compression policies are added to hypertables or continuous aggregates */
 Datum
 policy_compression_add_internal(Oid user_rel_oid, Datum compress_after_datum,
-								Oid compress_after_type, Interval *default_schedule_interval,
+								Oid compress_after_type, Interval *created_before,
+								Interval *default_schedule_interval,
 								bool user_defined_schedule_interval, bool if_not_exists,
 								bool fixed_schedule, TimestampTz initial_start,
 								const char *timezone)
@@ -201,6 +217,13 @@ policy_compression_add_internal(Oid user_rel_oid, Datum compress_after_datum,
 	hcache = ts_hypertable_cache_pin();
 	hypertable = validate_compress_chunks_hypertable(hcache, user_rel_oid, &is_cagg);
 
+	/* creation time usage not supported with caggs yet */
+	if (is_cagg && created_before != NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("cannot use \"compress_created_before\" with continuous aggregate \"%s\" ",
+						get_rel_name(user_rel_oid))));
+
 	owner_id = ts_hypertable_permissions_check(user_rel_oid, GetUserId());
 	ts_bgw_job_validate_job_owner(owner_id);
 
@@ -214,6 +237,8 @@ policy_compression_add_internal(Oid user_rel_oid, Datum compress_after_datum,
 
 	if (jobs != NIL)
 	{
+		bool is_equal = false;
+
 		if (!if_not_exists)
 		{
 			ts_cache_release(hcache);
@@ -227,11 +252,25 @@ policy_compression_add_internal(Oid user_rel_oid, Datum compress_after_datum,
 		Assert(list_length(jobs) == 1);
 		BgwJob *existing = linitial(jobs);
 
-		if (policy_config_check_hypertable_lag_equality(existing->fd.config,
-														POL_COMPRESSION_CONF_KEY_COMPRESS_AFTER,
-														partitioning_type,
-														compress_after_type,
-														compress_after_datum))
+		if (OidIsValid(compress_after_type))
+			is_equal =
+				policy_config_check_hypertable_lag_equality(existing->fd.config,
+															POL_COMPRESSION_CONF_KEY_COMPRESS_AFTER,
+															partitioning_type,
+															compress_after_type,
+															compress_after_datum);
+		else
+		{
+			Assert(created_before != NULL);
+			is_equal = policy_config_check_hypertable_lag_equality(
+				existing->fd.config,
+				POL_COMPRESSION_CONF_KEY_COMPRESS_CREATED_BEFORE,
+				partitioning_type,
+				INTERVALOID,
+				IntervalPGetDatum(created_before));
+		}
+
+		if (is_equal)
 		{
 			/* If all arguments are the same, do nothing */
 			ts_cache_release(hcache);
@@ -251,6 +290,22 @@ policy_compression_add_internal(Oid user_rel_oid, Datum compress_after_datum,
 			PG_RETURN_INT32(-1);
 		}
 	}
+
+	if (created_before)
+	{
+		Assert(!OidIsValid(compress_after_type));
+		compress_after_type = INTERVALOID;
+	}
+
+	if (!is_cagg && IS_INTEGER_TYPE(partitioning_type) && !IS_INTEGER_TYPE(compress_after_type) &&
+		created_before == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("invalid value for parameter %s", POL_COMPRESSION_CONF_KEY_COMPRESS_AFTER),
+				 errhint("Integer duration in \"compress_after\" or interval time duration"
+						 " in \"compress_created_before\" is required for hypertables with integer "
+						 "time dimension.")));
+
 	if (dim && IS_TIMESTAMP_TYPE(ts_dimension_get_partition_type(dim)) &&
 		!user_defined_schedule_interval)
 	{
@@ -286,9 +341,14 @@ policy_compression_add_internal(Oid user_rel_oid, Datum compress_after_datum,
 	switch (compress_after_type)
 	{
 		case INTERVALOID:
-			ts_jsonb_add_interval(parse_state,
-								  POL_COMPRESSION_CONF_KEY_COMPRESS_AFTER,
-								  DatumGetIntervalP(compress_after_datum));
+			if (created_before)
+				ts_jsonb_add_interval(parse_state,
+									  POL_COMPRESSION_CONF_KEY_COMPRESS_CREATED_BEFORE,
+									  DatumGetIntervalP(created_before));
+			else
+				ts_jsonb_add_interval(parse_state,
+									  POL_COMPRESSION_CONF_KEY_COMPRESS_AFTER,
+									  DatumGetIntervalP(compress_after_datum));
 			break;
 		case INT2OID:
 			ts_jsonb_add_int64(parse_state,
@@ -360,7 +420,7 @@ policy_compression_add(PG_FUNCTION_ARGS)
 	 * The function is not STRICT but we can't allow required args to be NULL
 	 * so we need to act like a strict function in those cases
 	 */
-	if (PG_ARGISNULL(0) || PG_ARGISNULL(1) || PG_ARGISNULL(2))
+	if (PG_ARGISNULL(0) || PG_ARGISNULL(2))
 	{
 		ts_feature_flag_check(FEATURE_POLICY);
 		PG_RETURN_NULL();
@@ -368,7 +428,7 @@ policy_compression_add(PG_FUNCTION_ARGS)
 
 	Oid user_rel_oid = PG_GETARG_OID(0);
 	Datum compress_after_datum = PG_GETARG_DATUM(1);
-	Oid compress_after_type = get_fn_expr_argtype(fcinfo->flinfo, 1);
+	Oid compress_after_type = PG_ARGISNULL(1) ? InvalidOid : get_fn_expr_argtype(fcinfo->flinfo, 1);
 	bool if_not_exists = PG_GETARG_BOOL(2);
 	bool user_defined_schedule_interval = !(PG_ARGISNULL(3));
 	Interval *default_schedule_interval =
@@ -378,10 +438,18 @@ policy_compression_add(PG_FUNCTION_ARGS)
 	bool fixed_schedule = !PG_ARGISNULL(4);
 	text *timezone = PG_ARGISNULL(5) ? NULL : PG_GETARG_TEXT_PP(5);
 	char *valid_timezone = NULL;
+	Interval *created_before = PG_GETARG_INTERVAL_P(6);
 
 	ts_feature_flag_check(FEATURE_POLICY);
 	TS_PREVENT_FUNC_IF_READ_ONLY();
 
+	/* compress_after and created_before cannot be specified [or omitted] together */
+	if ((PG_ARGISNULL(1) && PG_ARGISNULL(6)) || (!PG_ARGISNULL(1) && !PG_ARGISNULL(6)))
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg(
+					 "need to specify one of \"compress_after\" or \"compress_created_before\"")));
+
 	/* if users pass in -infinity for initial_start, then use the current_timestamp instead */
 	if (fixed_schedule)
 	{
@@ -397,6 +465,7 @@ policy_compression_add(PG_FUNCTION_ARGS)
 	retval = policy_compression_add_internal(user_rel_oid,
 											 compress_after_datum,
 											 compress_after_type,
+											 created_before,
 											 default_schedule_interval,
 											 user_defined_schedule_interval,
 											 if_not_exists,
diff --git a/tsl/src/bgw_policy/compression_api.h b/tsl/src/bgw_policy/compression_api.h
index 7157e9b5fb5..57a8a5c04bc 100644
--- a/tsl/src/bgw_policy/compression_api.h
+++ b/tsl/src/bgw_policy/compression_api.h
@@ -21,12 +21,14 @@ extern Datum policy_compression_check(PG_FUNCTION_ARGS);
 int32 policy_compression_get_hypertable_id(const Jsonb *config);
 int64 policy_compression_get_compress_after_int(const Jsonb *config);
 Interval *policy_compression_get_compress_after_interval(const Jsonb *config);
+Interval *policy_compression_get_compress_created_before_interval(const Jsonb *config);
 int32 policy_compression_get_maxchunks_per_job(const Jsonb *config);
 int64 policy_recompression_get_recompress_after_int(const Jsonb *config);
 Interval *policy_recompression_get_recompress_after_interval(const Jsonb *config);
 
 Datum policy_compression_add_internal(Oid user_rel_oid, Datum compress_after_datum,
-									  Oid compress_after_type, Interval *default_schedule_interval,
+									  Oid compress_after_type, Interval *created_before,
+									  Interval *default_schedule_interval,
 									  bool user_defined_schedule_interval, bool if_not_exists,
 									  bool fixed_schedule, TimestampTz initial_start,
 									  const char *timezone);
diff --git a/tsl/src/bgw_policy/job.c b/tsl/src/bgw_policy/job.c
index ea25c6cda39..07c9a5166e0 100644
--- a/tsl/src/bgw_policy/job.c
+++ b/tsl/src/bgw_policy/job.c
@@ -73,9 +73,10 @@ log_retention_boundary(int elevel, PolicyRetentionData *policy_data, const char
 
 	if (OidIsValid(outfuncid))
 		elog(elevel,
-			 "%s \"%s\": dropping data older than %s",
+			 "%s \"%s\": dropping data %s %s",
 			 message,
 			 relname,
+			 policy_data->use_creation_time ? "created before" : "older than",
 			 DatumGetCString(OidFunctionCall1(outfuncid, boundary)));
 }
 
@@ -294,7 +295,8 @@ policy_retention_execute(int32 job_id, Jsonb *config)
 
 	chunk_invoke_drop_chunks(policy_data.object_relid,
 							 policy_data.boundary,
-							 policy_data.boundary_type);
+							 policy_data.boundary_type,
+							 policy_data.use_creation_time);
 
 	return true;
 }
@@ -309,6 +311,9 @@ policy_retention_read_and_validate_config(Jsonb *config, PolicyRetentionData *po
 	Datum boundary;
 	Datum boundary_type;
 	ContinuousAgg *cagg;
+	Interval *(*interval_getter)(const Jsonb *);
+	interval_getter = policy_retention_get_drop_after_interval;
+	bool use_creation_time = false;
 
 	object_relid = ts_hypertable_id_to_relid(policy_retention_get_hypertable_id(config), false);
 	hypertable = ts_hypertable_cache_get_cache_and_entry(object_relid, CACHE_FLAG_NONE, &hcache);
@@ -329,14 +334,14 @@ policy_retention_read_and_validate_config(Jsonb *config, PolicyRetentionData *po
 
 		/* if there's no int_now function the boundary is considered as an INTERVAL */
 		boundary_type = INTERVALOID;
+		interval_getter = policy_retention_get_drop_created_before_interval;
+		use_creation_time = true;
 	}
 	else
 		boundary_type = ts_dimension_get_partition_type(open_dim);
 
-	boundary = get_window_boundary(open_dim,
-								   config,
-								   policy_retention_get_drop_after_int,
-								   policy_retention_get_drop_after_interval);
+	boundary =
+		get_window_boundary(open_dim, config, policy_retention_get_drop_after_int, interval_getter);
 
 	/* We need to do a reverse lookup here since the given hypertable might be
 	   a materialized hypertable, and thus need to call drop_chunks on the
@@ -356,6 +361,7 @@ policy_retention_read_and_validate_config(Jsonb *config, PolicyRetentionData *po
 		policy_data->object_relid = object_relid;
 		policy_data->boundary = boundary;
 		policy_data->boundary_type = boundary_type;
+		policy_data->use_creation_time = use_creation_time;
 	}
 }
 
diff --git a/tsl/src/bgw_policy/job.h b/tsl/src/bgw_policy/job.h
index 90e3e104d60..4e8947aa84c 100644
--- a/tsl/src/bgw_policy/job.h
+++ b/tsl/src/bgw_policy/job.h
@@ -29,6 +29,7 @@ typedef struct PolicyRetentionData
 	Oid object_relid;
 	Datum boundary;
 	Datum boundary_type;
+	bool use_creation_time;
 } PolicyRetentionData;
 
 typedef struct PolicyContinuousAggData
diff --git a/tsl/src/bgw_policy/policies_v2.c b/tsl/src/bgw_policy/policies_v2.c
index 71f0d1585a0..eeec71713e4 100644
--- a/tsl/src/bgw_policy/policies_v2.c
+++ b/tsl/src/bgw_policy/policies_v2.c
@@ -219,6 +219,7 @@ validate_and_create_policies(policies_info all_policies, bool if_exists)
 			policy_compression_add_internal(all_policies.rel_oid,
 											all_policies.compress->compress_after,
 											all_policies.compress->compress_after_type,
+											NULL,
 											DEFAULT_COMPRESSION_SCHEDULE_INTERVAL,
 											false,
 											if_exists,
@@ -234,6 +235,7 @@ validate_and_create_policies(policies_info all_policies, bool if_exists)
 			policy_retention_add_internal(all_policies.rel_oid,
 										  all_policies.retention->drop_after_type,
 										  all_policies.retention->drop_after,
+										  NULL,
 										  (Interval) DEFAULT_RETENTION_SCHEDULE_INTERVAL,
 										  false,
 										  false,
@@ -702,6 +704,7 @@ policies_show(PG_FUNCTION_ARGS)
 						 job,
 						 POL_COMPRESSION_CONF_KEY_COMPRESS_AFTER,
 						 SHOW_POLICY_KEY_COMPRESS_AFTER);
+			/* POL_COMPRESSION_CONF_KEY_COMPRESS_CREATED_BEFORE not supported with caggs */
 			ts_jsonb_add_interval(parse_state,
 								  SHOW_POLICY_KEY_COMPRESS_INTERVAL,
 								  &(job->fd.schedule_interval));
@@ -714,6 +717,7 @@ policies_show(PG_FUNCTION_ARGS)
 						 job,
 						 POL_RETENTION_CONF_KEY_DROP_AFTER,
 						 SHOW_POLICY_KEY_DROP_AFTER);
+			/* POL_RETENTION_CONF_KEY_DROP_CREATED_BEFORE not supported with caggs */
 			ts_jsonb_add_interval(parse_state,
 								  SHOW_POLICY_KEY_RETENTION_INTERVAL,
 								  &(job->fd.schedule_interval));
diff --git a/tsl/src/bgw_policy/policies_v2.h b/tsl/src/bgw_policy/policies_v2.h
index 7de8734de1a..71e4335e023 100644
--- a/tsl/src/bgw_policy/policies_v2.h
+++ b/tsl/src/bgw_policy/policies_v2.h
@@ -23,6 +23,7 @@
 #define POL_COMPRESSION_CONF_KEY_HYPERTABLE_ID "hypertable_id"
 #define POL_COMPRESSION_CONF_KEY_COMPRESS_AFTER "compress_after"
 #define POL_COMPRESSION_CONF_KEY_MAXCHUNKS_TO_COMPRESS "maxchunks_to_compress"
+#define POL_COMPRESSION_CONF_KEY_COMPRESS_CREATED_BEFORE "compress_created_before"
 
 #define POLICY_RECOMPRESSION_PROC_NAME "policy_recompression"
 #define POL_RECOMPRESSION_CONF_KEY_RECOMPRESS_AFTER "recompress_after"
@@ -31,6 +32,7 @@
 #define POLICY_RETENTION_CHECK_NAME "policy_retention_check"
 #define POL_RETENTION_CONF_KEY_HYPERTABLE_ID "hypertable_id"
 #define POL_RETENTION_CONF_KEY_DROP_AFTER "drop_after"
+#define POL_RETENTION_CONF_KEY_DROP_CREATED_BEFORE "drop_created_before"
 
 #define SHOW_POLICY_KEY_HYPERTABLE_ID "hypertable_id"
 #define SHOW_POLICY_KEY_POLICY_NAME "policy_name"
@@ -38,8 +40,10 @@
 #define SHOW_POLICY_KEY_REFRESH_START_OFFSET "refresh_start_offset"
 #define SHOW_POLICY_KEY_REFRESH_END_OFFSET "refresh_end_offset"
 #define SHOW_POLICY_KEY_COMPRESS_AFTER POL_COMPRESSION_CONF_KEY_COMPRESS_AFTER
+#define SHOW_POLICY_KEY_COMPRESS_CREATED_BEFORE POL_COMPRESSION_CONF_KEY_COMPRESS_CREATED_BEFORE
 #define SHOW_POLICY_KEY_COMPRESS_INTERVAL "compress_interval"
 #define SHOW_POLICY_KEY_DROP_AFTER POL_RETENTION_CONF_KEY_DROP_AFTER
+#define SHOW_POLICY_KEY_DROP_CREATED_BEFORE POL_RETENTION_CONF_KEY_DROP_CREATED_BEFORE
 #define SHOW_POLICY_KEY_RETENTION_INTERVAL "retention_interval"
 
 #define DEFAULT_RETENTION_SCHEDULE_INTERVAL                                                        \
diff --git a/tsl/src/bgw_policy/policy_utils.c b/tsl/src/bgw_policy/policy_utils.c
index 6d283109faf..1a56ec353d9 100644
--- a/tsl/src/bgw_policy/policy_utils.c
+++ b/tsl/src/bgw_policy/policy_utils.c
@@ -32,7 +32,7 @@ bool
 policy_config_check_hypertable_lag_equality(Jsonb *config, const char *json_label,
 											Oid partitioning_type, Oid lag_type, Datum lag_datum)
 {
-	if (IS_INTEGER_TYPE(partitioning_type))
+	if (IS_INTEGER_TYPE(partitioning_type) && lag_type != INTERVALOID)
 	{
 		bool found;
 		int64 config_value = ts_jsonb_get_int64_field(config, json_label, &found);
diff --git a/tsl/src/bgw_policy/retention_api.c b/tsl/src/bgw_policy/retention_api.c
index edfcf951ac7..50b17a1e2ee 100644
--- a/tsl/src/bgw_policy/retention_api.c
+++ b/tsl/src/bgw_policy/retention_api.c
@@ -101,6 +101,21 @@ policy_retention_get_drop_after_interval(const Jsonb *config)
 	return interval;
 }
 
+Interval *
+policy_retention_get_drop_created_before_interval(const Jsonb *config)
+{
+	Interval *interval =
+		ts_jsonb_get_interval_field(config, POL_RETENTION_CONF_KEY_DROP_CREATED_BEFORE);
+
+	if (interval == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_INTERNAL_ERROR),
+				 errmsg("could not find %s in config for job",
+						POL_RETENTION_CONF_KEY_DROP_CREATED_BEFORE)));
+
+	return interval;
+}
+
 static Hypertable *
 validate_drop_chunks_hypertable(Cache *hcache, Oid user_htoid)
 {
@@ -151,8 +166,9 @@ validate_drop_chunks_hypertable(Cache *hcache, Oid user_htoid)
 
 Datum
 policy_retention_add_internal(Oid ht_oid, Oid window_type, Datum window_datum,
-							  Interval default_schedule_interval, bool if_not_exists,
-							  bool fixed_schedule, TimestampTz initial_start, const char *timezone)
+							  Interval *created_before, Interval default_schedule_interval,
+							  bool if_not_exists, bool fixed_schedule, TimestampTz initial_start,
+							  const char *timezone)
 {
 	NameData application_name;
 	int32 job_id;
@@ -183,9 +199,10 @@ policy_retention_add_internal(Oid ht_oid, Oid window_type, Datum window_datum,
 	List *jobs = ts_bgw_job_find_by_proc_and_hypertable_id(POLICY_RETENTION_PROC_NAME,
 														   FUNCTIONS_SCHEMA_NAME,
 														   hypertable->fd.id);
-
 	if (jobs != NIL)
 	{
+		bool is_equal = false;
+
 		if (!if_not_exists)
 			ereport(ERROR,
 					(errcode(ERRCODE_DUPLICATE_OBJECT),
@@ -195,11 +212,25 @@ policy_retention_add_internal(Oid ht_oid, Oid window_type, Datum window_datum,
 		Assert(list_length(jobs) == 1);
 		BgwJob *existing = linitial(jobs);
 
-		if (policy_config_check_hypertable_lag_equality(existing->fd.config,
-														POL_RETENTION_CONF_KEY_DROP_AFTER,
-														partitioning_type,
-														window_type,
-														window_datum))
+		if (OidIsValid(window_type))
+			is_equal =
+				policy_config_check_hypertable_lag_equality(existing->fd.config,
+															POL_RETENTION_CONF_KEY_DROP_AFTER,
+															partitioning_type,
+															window_type,
+															window_datum);
+		else
+		{
+			Assert(created_before != NULL);
+			is_equal = policy_config_check_hypertable_lag_equality(
+				existing->fd.config,
+				POL_RETENTION_CONF_KEY_DROP_CREATED_BEFORE,
+				partitioning_type,
+				INTERVALOID,
+				IntervalPGetDatum(created_before));
+		}
+
+		if (is_equal)
 		{
 			/* If all arguments are the same, do nothing */
 			ts_cache_release(hcache);
@@ -220,12 +251,28 @@ policy_retention_add_internal(Oid ht_oid, Oid window_type, Datum window_datum,
 		}
 	}
 
-	if (IS_INTEGER_TYPE(partitioning_type) && !IS_INTEGER_TYPE(window_type))
-		ereport(ERROR,
-				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-				 errmsg("invalid value for parameter %s", POL_RETENTION_CONF_KEY_DROP_AFTER),
-				 errhint("Integer time duration is required for hypertables"
-						 " with integer time dimension.")));
+	if (created_before)
+	{
+		Assert(!OidIsValid(window_type));
+		window_type = INTERVALOID;
+	}
+
+	if (IS_INTEGER_TYPE(partitioning_type))
+	{
+		ContinuousAgg *cagg = ts_continuous_agg_find_by_relid(ht_oid);
+
+		if ((IS_INTEGER_TYPE(window_type) && cagg == NULL &&
+			 !OidIsValid(ts_get_integer_now_func(dim, false))) ||
+			(!IS_INTEGER_TYPE(window_type) && created_before == NULL))
+			ereport(ERROR,
+					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+					 errmsg("invalid value for parameter %s", POL_RETENTION_CONF_KEY_DROP_AFTER),
+					 errhint(
+						 "Integer duration in \"drop_after\" with valid \"integer_now\" function"
+						 " or interval time duration"
+						 " in \"drop_created_before\" is required for hypertables with integer "
+						 "time dimension.")));
+	}
 
 	if (IS_TIMESTAMP_TYPE(partitioning_type) && window_type != INTERVALOID)
 		ereport(ERROR,
@@ -242,9 +289,14 @@ policy_retention_add_internal(Oid ht_oid, Oid window_type, Datum window_datum,
 	switch (window_type)
 	{
 		case INTERVALOID:
-			ts_jsonb_add_interval(parse_state,
-								  POL_RETENTION_CONF_KEY_DROP_AFTER,
-								  DatumGetIntervalP(window_datum));
+			if (created_before)
+				ts_jsonb_add_interval(parse_state,
+									  POL_RETENTION_CONF_KEY_DROP_CREATED_BEFORE,
+									  DatumGetIntervalP(created_before));
+			else
+				ts_jsonb_add_interval(parse_state,
+									  POL_RETENTION_CONF_KEY_DROP_AFTER,
+									  DatumGetIntervalP(window_datum));
 			break;
 		case INT2OID:
 			ts_jsonb_add_int64(parse_state,
@@ -306,7 +358,7 @@ Datum
 policy_retention_add(PG_FUNCTION_ARGS)
 {
 	/* behave like a strict function */
-	if (PG_ARGISNULL(0) || PG_ARGISNULL(1) || PG_ARGISNULL(2))
+	if (PG_ARGISNULL(0) || PG_ARGISNULL(2))
 		PG_RETURN_NULL();
 
 	Oid ht_oid = PG_GETARG_OID(0);
@@ -319,11 +371,20 @@ policy_retention_add(PG_FUNCTION_ARGS)
 	bool fixed_schedule = !PG_ARGISNULL(4);
 	text *timezone = PG_ARGISNULL(5) ? NULL : PG_GETARG_TEXT_PP(5);
 	char *valid_timezone = NULL;
+	// Interval *created_before = PG_ARGISNULL(6) ? NULL: PG_GETARG_INTERVAL_P(6);
+	Interval *created_before = PG_GETARG_INTERVAL_P(6);
 
 	ts_feature_flag_check(FEATURE_POLICY);
 	TS_PREVENT_FUNC_IF_READ_ONLY();
 
 	Datum retval;
+
+	/* drop_after and created_before cannot be specified [or omitted] together */
+	if ((PG_ARGISNULL(1) && PG_ARGISNULL(6)) || (!PG_ARGISNULL(1) && !PG_ARGISNULL(6)))
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("need to specify one of \"drop_after\" or \"drop_created_before\"")));
+
 	/* if users pass in -infinity for initial_start, then use the current_timestamp instead */
 	if (fixed_schedule)
 	{
@@ -338,6 +399,7 @@ policy_retention_add(PG_FUNCTION_ARGS)
 	retval = policy_retention_add_internal(ht_oid,
 										   window_type,
 										   window_datum,
+										   created_before,
 										   default_schedule_interval,
 										   if_not_exists,
 										   fixed_schedule,
diff --git a/tsl/src/bgw_policy/retention_api.h b/tsl/src/bgw_policy/retention_api.h
index 38bfa9d3d95..fe658824cd8 100644
--- a/tsl/src/bgw_policy/retention_api.h
+++ b/tsl/src/bgw_policy/retention_api.h
@@ -18,10 +18,11 @@ extern Datum policy_retention_remove(PG_FUNCTION_ARGS);
 int32 policy_retention_get_hypertable_id(const Jsonb *config);
 int64 policy_retention_get_drop_after_int(const Jsonb *config);
 Interval *policy_retention_get_drop_after_interval(const Jsonb *config);
+Interval *policy_retention_get_drop_created_before_interval(const Jsonb *config);
 
 Datum policy_retention_add_internal(Oid ht_oid, Oid window_type, Datum window_datum,
-									Interval default_schedule_interval, bool if_not_exists,
-									bool fixed_schedule, TimestampTz initial_start,
-									const char *timezone);
+									Interval *created_before, Interval default_schedule_interval,
+									bool if_not_exists, bool fixed_schedule,
+									TimestampTz initial_start, const char *timezone);
 Datum policy_retention_remove_internal(Oid table_oid, bool if_exists);
 #endif /* TIMESCALEDB_TSL_BGW_POLICY_RETENTION_API_H */
diff --git a/tsl/src/chunk.c b/tsl/src/chunk.c
index abb3ff409e7..d215a42cb2c 100644
--- a/tsl/src/chunk.c
+++ b/tsl/src/chunk.c
@@ -308,7 +308,7 @@ chunk_set_default_data_node(PG_FUNCTION_ARGS)
  * Returns the number of dropped chunks.
  */
 int
-chunk_invoke_drop_chunks(Oid relid, Datum older_than, Datum older_than_type)
+chunk_invoke_drop_chunks(Oid relid, Datum older_than, Datum older_than_type, bool use_creation_time)
 {
 	EState *estate;
 	ExprContext *econtext;
@@ -318,27 +318,26 @@ chunk_invoke_drop_chunks(Oid relid, Datum older_than, Datum older_than_type)
 	SetExprState *state;
 	Oid restype;
 	Oid func_oid;
-	Const *argarr[DROP_CHUNKS_NARGS] = {
-		makeConst(REGCLASSOID,
-				  -1,
-				  InvalidOid,
-				  sizeof(relid),
-				  ObjectIdGetDatum(relid),
-				  false,
-				  false),
-		makeConst(older_than_type,
-				  -1,
-				  InvalidOid,
-				  get_typlen(older_than_type),
-				  older_than,
-				  false,
-				  get_typbyval(older_than_type)),
-		makeNullConst(older_than_type, -1, InvalidOid),
-		castNode(Const, makeBoolConst(false, true)),
-		/* For now, till we actually support created_before/created_after later */
-		makeNullConst(older_than_type, -1, InvalidOid),
-		makeNullConst(older_than_type, -1, InvalidOid),
-	};
+	Const *TypeNullCons = makeNullConst(older_than_type, -1, InvalidOid);
+	Const *IntervalVal = makeConst(older_than_type,
+								   -1,
+								   InvalidOid,
+								   get_typlen(older_than_type),
+								   older_than,
+								   false,
+								   get_typbyval(older_than_type));
+	Const *argarr[DROP_CHUNKS_NARGS] = { makeConst(REGCLASSOID,
+												   -1,
+												   InvalidOid,
+												   sizeof(relid),
+												   ObjectIdGetDatum(relid),
+												   false,
+												   false),
+										 TypeNullCons,
+										 TypeNullCons,
+										 castNode(Const, makeBoolConst(false, true)),
+										 TypeNullCons,
+										 TypeNullCons };
 	Oid type_id[DROP_CHUNKS_NARGS] = { REGCLASSOID, ANYOID, ANYOID, BOOLOID, ANYOID, ANYOID };
 	char *const schema_name = ts_extension_schema_name();
 	List *const fqn = list_make2(makeString(schema_name), makeString(DROP_CHUNKS_FUNCNAME));
@@ -349,6 +348,12 @@ chunk_invoke_drop_chunks(Oid relid, Datum older_than, Datum older_than_type)
 	func_oid = LookupFuncName(fqn, lengthof(type_id), type_id, false);
 	Assert(func_oid); /* LookupFuncName should not return an invalid OID */
 
+	/* decide whether to use "older_than" or "drop_created_before" */
+	if (use_creation_time)
+		argarr[4] = IntervalVal;
+	else
+		argarr[1] = IntervalVal;
+
 	/* Prepare the function expr with argument list */
 	get_func_result_type(func_oid, &restype, NULL);
 
diff --git a/tsl/src/chunk.h b/tsl/src/chunk.h
index 15d5a942e3c..062c32d7df8 100644
--- a/tsl/src/chunk.h
+++ b/tsl/src/chunk.h
@@ -18,7 +18,8 @@ extern Datum chunk_freeze_chunk(PG_FUNCTION_ARGS);
 extern Datum chunk_unfreeze_chunk(PG_FUNCTION_ARGS);
 extern Datum chunk_drop_stale_chunks(PG_FUNCTION_ARGS);
 extern void ts_chunk_drop_stale_chunks(const char *node_name, ArrayType *chunks_array);
-extern int chunk_invoke_drop_chunks(Oid relid, Datum older_than, Datum older_than_type);
+extern int chunk_invoke_drop_chunks(Oid relid, Datum older_than, Datum older_than_type,
+									bool use_creation_time);
 extern Datum chunk_create_replica_table(PG_FUNCTION_ARGS);
 extern void chunk_update_stale_metadata(Chunk *new_chunk, List *chunk_data_nodes);
 
diff --git a/tsl/test/expected/bgw_policy.out b/tsl/test/expected/bgw_policy.out
index 48536a5289f..b72cb018988 100644
--- a/tsl/test/expected/bgw_policy.out
+++ b/tsl/test/expected/bgw_policy.out
@@ -600,6 +600,14 @@ select * from  _timescaledb_catalog.dimension;
 
 alter schema new_public rename to public;
 \c  :TEST_DBNAME :ROLE_DEFAULT_PERM_USER_2
+-- test that the behavior is strict when providing NULL required arguments
+create table test_strict (time timestamptz not null, a int, b int);
+select create_hypertable('test_strict', 'time');
+    create_hypertable     
+--------------------------
+ (6,public,test_strict,t)
+(1 row)
+
 \set ON_ERROR_STOP 0
 select add_reorder_policy('test_table_perm', 'test_table_perm_pkey');
 ERROR:  must be owner of hypertable "test_table_perm"
@@ -609,6 +617,8 @@ select add_retention_policy('test_table_perm', INTERVAL '4 months', true);
 ERROR:  must be owner of hypertable "test_table_perm"
 select remove_retention_policy('test_table');
 ERROR:  must be owner of hypertable "test_table"
+select add_retention_policy('test_strict', drop_after => NULL);
+ERROR:  need to specify one of "drop_after" or "drop_created_before"
 \set ON_ERROR_STOP 1
 -- Check the number of non-telemetry policies. We check for telemetry
 -- policy in telemetry_community.sql
@@ -623,21 +633,7 @@ GROUP BY proc_name;
  policy_retention           |     2
 (3 rows)
 
--- test that the behavior is strict when providing NULL required arguments
-create table test_strict (time timestamptz not null, a int, b int);
-select create_hypertable('test_strict', 'time');
-    create_hypertable     
---------------------------
- (6,public,test_strict,t)
-(1 row)
-
 -- test retention with null arguments
-select add_retention_policy('test_strict', drop_after => NULL);
- add_retention_policy 
-----------------------
-                     
-(1 row)
-
 select add_retention_policy(NULL, NULL);
  add_retention_policy 
 ----------------------
@@ -665,12 +661,6 @@ select add_retention_policy('test_strict', interval '2 days', schedule_interval
 
 -- test compression with null arguments
 alter table test_strict set (timescaledb.compress);
-select add_compression_policy('test_strict', compress_after => NULL);
- add_compression_policy 
-------------------------
-                       
-(1 row)
-
 select add_compression_policy(NULL, compress_after => NULL);
  add_compression_policy 
 ------------------------
@@ -723,6 +713,8 @@ select * from _timescaledb_config.bgw_job where id in (:retenion_id_missing_sche
 
 -- test policy check functions with NULL args
 \set ON_ERROR_STOP 0
+select add_compression_policy('test_strict', compress_after => NULL);
+ERROR:  need to specify one of "compress_after" or "compress_created_before"
 SELECT _timescaledb_functions.policy_compression_check(NULL);
 ERROR:  config must not be NULL
 SELECT _timescaledb_functions.policy_refresh_continuous_aggregate_check(NULL);
diff --git a/tsl/test/expected/cagg_policy.out b/tsl/test/expected/cagg_policy.out
index c509b5b9881..f7ee678964e 100644
--- a/tsl/test/expected/cagg_policy.out
+++ b/tsl/test/expected/cagg_policy.out
@@ -1145,10 +1145,13 @@ ERROR:  setup a refresh policy for "metrics_cagg" before setting up a compressio
 SELECT add_continuous_aggregate_policy('metrics_cagg', '7 day'::interval, '1 day'::interval, '1 h'::interval) as "REFRESH_JOB" \gset
 SELECT add_compression_policy('metrics_cagg', '8 day'::interval) AS "COMP_JOB" ;
 ERROR:  compression not enabled on continuous aggregate "metrics_cagg"
-\set ON_ERROR_STOP 1
 ALTER MATERIALIZED VIEW metrics_cagg SET (timescaledb.compress);
 NOTICE:  defaulting compress_segmentby to device_id
 NOTICE:  defaulting compress_orderby to dayb
+--cannot use compress_created_before with cagg
+SELECT add_compression_policy('metrics_cagg', compress_created_before => '8 day'::interval) AS "COMP_JOB" ;
+ERROR:  cannot use "compress_created_before" with continuous aggregate "metrics_cagg" 
+\set ON_ERROR_STOP 1
 SELECT add_compression_policy('metrics_cagg', '8 day'::interval) AS "COMP_JOB" ;
  COMP_JOB 
 ----------
diff --git a/tsl/test/expected/compression_bgw-13.out b/tsl/test/expected/compression_bgw-13.out
index fb242b7f10b..74818f05ac4 100644
--- a/tsl/test/expected/compression_bgw-13.out
+++ b/tsl/test/expected/compression_bgw-13.out
@@ -155,7 +155,7 @@ INSERT INTO test_table_smallint SELECT generate_series(1,5), 10;
 ALTER TABLE test_table_smallint SET (timescaledb.compress);
 \set ON_ERROR_STOP 0
 select add_compression_policy( 'test_table_smallint', compress_after=> '1 day'::interval );
-ERROR:  unsupported compress_after argument type, expected type : smallint
+ERROR:  invalid value for parameter compress_after
 \set ON_ERROR_STOP 1
 SELECT add_compression_policy('test_table_smallint', 2::SMALLINT) AS compressjob_id \gset
 SELECT * FROM _timescaledb_config.bgw_job WHERE id = :compressjob_id;
diff --git a/tsl/test/expected/compression_bgw-14.out b/tsl/test/expected/compression_bgw-14.out
index 6470ec0e451..ceea22edd46 100644
--- a/tsl/test/expected/compression_bgw-14.out
+++ b/tsl/test/expected/compression_bgw-14.out
@@ -155,7 +155,7 @@ INSERT INTO test_table_smallint SELECT generate_series(1,5), 10;
 ALTER TABLE test_table_smallint SET (timescaledb.compress);
 \set ON_ERROR_STOP 0
 select add_compression_policy( 'test_table_smallint', compress_after=> '1 day'::interval );
-ERROR:  unsupported compress_after argument type, expected type : smallint
+ERROR:  invalid value for parameter compress_after
 \set ON_ERROR_STOP 1
 SELECT add_compression_policy('test_table_smallint', 2::SMALLINT) AS compressjob_id \gset
 SELECT * FROM _timescaledb_config.bgw_job WHERE id = :compressjob_id;
diff --git a/tsl/test/expected/compression_bgw-15.out b/tsl/test/expected/compression_bgw-15.out
index 6470ec0e451..ceea22edd46 100644
--- a/tsl/test/expected/compression_bgw-15.out
+++ b/tsl/test/expected/compression_bgw-15.out
@@ -155,7 +155,7 @@ INSERT INTO test_table_smallint SELECT generate_series(1,5), 10;
 ALTER TABLE test_table_smallint SET (timescaledb.compress);
 \set ON_ERROR_STOP 0
 select add_compression_policy( 'test_table_smallint', compress_after=> '1 day'::interval );
-ERROR:  unsupported compress_after argument type, expected type : smallint
+ERROR:  invalid value for parameter compress_after
 \set ON_ERROR_STOP 1
 SELECT add_compression_policy('test_table_smallint', 2::SMALLINT) AS compressjob_id \gset
 SELECT * FROM _timescaledb_config.bgw_job WHERE id = :compressjob_id;
diff --git a/tsl/test/expected/compression_errors-13.out b/tsl/test/expected/compression_errors-13.out
index 457d6151781..04c9be58265 100644
--- a/tsl/test/expected/compression_errors-13.out
+++ b/tsl/test/expected/compression_errors-13.out
@@ -481,8 +481,8 @@ SELECT config FROM _timescaledb_config.bgw_job WHERE id = :compressjob_id;
 
 --should fail
 CALL run_job(:compressjob_id);
-ERROR:  job 1000 config must have compress_after
-CONTEXT:  PL/pgSQL function _timescaledb_functions.policy_compression(integer,jsonb) line 35 at RAISE
+ERROR:  job 1000 config must have compress_after or compress_created_before
+CONTEXT:  PL/pgSQL function _timescaledb_functions.policy_compression(integer,jsonb) line 48 at RAISE
 SELECT remove_compression_policy('test_table_int');
  remove_compression_policy 
 ---------------------------
@@ -504,7 +504,7 @@ SELECT config FROM _timescaledb_config.bgw_job WHERE id = :compressjob_id;
 --should fail
 CALL run_job(:compressjob_id);
 ERROR:  job 1001 config must have hypertable_id
-CONTEXT:  PL/pgSQL function _timescaledb_functions.policy_compression(integer,jsonb) line 26 at RAISE
+CONTEXT:  PL/pgSQL function _timescaledb_functions.policy_compression(integer,jsonb) line 29 at RAISE
 UPDATE _timescaledb_config.bgw_job
 SET config = NULL
 WHERE id = :compressjob_id;
@@ -517,7 +517,7 @@ SELECT config FROM _timescaledb_config.bgw_job WHERE id = :compressjob_id;
 --should fail
 CALL run_job(:compressjob_id);
 ERROR:  job 1001 has null config
-CONTEXT:  PL/pgSQL function _timescaledb_functions.policy_compression(integer,jsonb) line 21 at RAISE
+CONTEXT:  PL/pgSQL function _timescaledb_functions.policy_compression(integer,jsonb) line 24 at RAISE
 -- test ADD COLUMN IF NOT EXISTS
 CREATE TABLE metric (time TIMESTAMPTZ NOT NULL, val FLOAT8 NOT NULL, dev_id INT4 NOT NULL);
 SELECT create_hypertable('metric', 'time', 'dev_id', 10);
diff --git a/tsl/test/expected/compression_errors-14.out b/tsl/test/expected/compression_errors-14.out
index 457d6151781..04c9be58265 100644
--- a/tsl/test/expected/compression_errors-14.out
+++ b/tsl/test/expected/compression_errors-14.out
@@ -481,8 +481,8 @@ SELECT config FROM _timescaledb_config.bgw_job WHERE id = :compressjob_id;
 
 --should fail
 CALL run_job(:compressjob_id);
-ERROR:  job 1000 config must have compress_after
-CONTEXT:  PL/pgSQL function _timescaledb_functions.policy_compression(integer,jsonb) line 35 at RAISE
+ERROR:  job 1000 config must have compress_after or compress_created_before
+CONTEXT:  PL/pgSQL function _timescaledb_functions.policy_compression(integer,jsonb) line 48 at RAISE
 SELECT remove_compression_policy('test_table_int');
  remove_compression_policy 
 ---------------------------
@@ -504,7 +504,7 @@ SELECT config FROM _timescaledb_config.bgw_job WHERE id = :compressjob_id;
 --should fail
 CALL run_job(:compressjob_id);
 ERROR:  job 1001 config must have hypertable_id
-CONTEXT:  PL/pgSQL function _timescaledb_functions.policy_compression(integer,jsonb) line 26 at RAISE
+CONTEXT:  PL/pgSQL function _timescaledb_functions.policy_compression(integer,jsonb) line 29 at RAISE
 UPDATE _timescaledb_config.bgw_job
 SET config = NULL
 WHERE id = :compressjob_id;
@@ -517,7 +517,7 @@ SELECT config FROM _timescaledb_config.bgw_job WHERE id = :compressjob_id;
 --should fail
 CALL run_job(:compressjob_id);
 ERROR:  job 1001 has null config
-CONTEXT:  PL/pgSQL function _timescaledb_functions.policy_compression(integer,jsonb) line 21 at RAISE
+CONTEXT:  PL/pgSQL function _timescaledb_functions.policy_compression(integer,jsonb) line 24 at RAISE
 -- test ADD COLUMN IF NOT EXISTS
 CREATE TABLE metric (time TIMESTAMPTZ NOT NULL, val FLOAT8 NOT NULL, dev_id INT4 NOT NULL);
 SELECT create_hypertable('metric', 'time', 'dev_id', 10);
diff --git a/tsl/test/expected/compression_errors-15.out b/tsl/test/expected/compression_errors-15.out
index 457d6151781..04c9be58265 100644
--- a/tsl/test/expected/compression_errors-15.out
+++ b/tsl/test/expected/compression_errors-15.out
@@ -481,8 +481,8 @@ SELECT config FROM _timescaledb_config.bgw_job WHERE id = :compressjob_id;
 
 --should fail
 CALL run_job(:compressjob_id);
-ERROR:  job 1000 config must have compress_after
-CONTEXT:  PL/pgSQL function _timescaledb_functions.policy_compression(integer,jsonb) line 35 at RAISE
+ERROR:  job 1000 config must have compress_after or compress_created_before
+CONTEXT:  PL/pgSQL function _timescaledb_functions.policy_compression(integer,jsonb) line 48 at RAISE
 SELECT remove_compression_policy('test_table_int');
  remove_compression_policy 
 ---------------------------
@@ -504,7 +504,7 @@ SELECT config FROM _timescaledb_config.bgw_job WHERE id = :compressjob_id;
 --should fail
 CALL run_job(:compressjob_id);
 ERROR:  job 1001 config must have hypertable_id
-CONTEXT:  PL/pgSQL function _timescaledb_functions.policy_compression(integer,jsonb) line 26 at RAISE
+CONTEXT:  PL/pgSQL function _timescaledb_functions.policy_compression(integer,jsonb) line 29 at RAISE
 UPDATE _timescaledb_config.bgw_job
 SET config = NULL
 WHERE id = :compressjob_id;
@@ -517,7 +517,7 @@ SELECT config FROM _timescaledb_config.bgw_job WHERE id = :compressjob_id;
 --should fail
 CALL run_job(:compressjob_id);
 ERROR:  job 1001 has null config
-CONTEXT:  PL/pgSQL function _timescaledb_functions.policy_compression(integer,jsonb) line 21 at RAISE
+CONTEXT:  PL/pgSQL function _timescaledb_functions.policy_compression(integer,jsonb) line 24 at RAISE
 -- test ADD COLUMN IF NOT EXISTS
 CREATE TABLE metric (time TIMESTAMPTZ NOT NULL, val FLOAT8 NOT NULL, dev_id INT4 NOT NULL);
 SELECT create_hypertable('metric', 'time', 'dev_id', 10);
diff --git a/tsl/test/expected/policy_generalization.out b/tsl/test/expected/policy_generalization.out
index 1a83734f46f..cbb24e7ce86 100644
--- a/tsl/test/expected/policy_generalization.out
+++ b/tsl/test/expected/policy_generalization.out
@@ -73,4 +73,75 @@ select count(*) from timescaledb_information.chunks where hypertable_name='test'
      0
 (1 row)
 
+-- retention policy
+INSERT INTO test SELECT i, i %10, 0.10 FROM generate_series(1, 100, 1) i;
+\set ON_ERROR_STOP 0
+-- interval input for "drop_after" for INTEGER partitioning errors out
+SELECT add_retention_policy('test', INTERVAL '5 seconds', true);
+ERROR:  invalid value for parameter drop_after
+-- integer input for "drop_after" for INTEGER partitioning without valid
+-- integer_now function errors out
+SELECT add_retention_policy('test', 2000, true);
+ERROR:  invalid value for parameter drop_after
+\set ON_ERROR_STOP 1
+SELECT add_retention_policy('test', drop_created_before => INTERVAL '2 seconds',
+    if_not_exists => true) as drop_chunks_job_id \gset
+CALL run_job(:drop_chunks_job_id);
+select count(*) from timescaledb_information.chunks where hypertable_name='test';
+ count 
+-------
+    11
+(1 row)
+
+SELECT pg_sleep(3);
+ pg_sleep 
+----------
+ 
+(1 row)
+
+CALL run_job(:drop_chunks_job_id);
+select count(*) from timescaledb_information.chunks where hypertable_name='test';
+ count 
+-------
+     0
+(1 row)
+
+SELECT remove_retention_policy('test');
+ remove_retention_policy 
+-------------------------
+ 
+(1 row)
+
+-- compression policy
+ALTER TABLE test SET (timescaledb.compress);
+INSERT INTO test SELECT i, i %10, 0.10 FROM generate_series(1, 100, 1) i;
+-- Chunk compression status
+SELECT DISTINCT compression_status FROM _timescaledb_internal.compressed_chunk_stats;
+ compression_status 
+--------------------
+ Uncompressed
+(1 row)
+
+-- Compression policy
+SELECT add_compression_policy('test', compress_created_before => INTERVAL '2 seconds') AS compress_chunks_job_id \gset
+SELECT pg_sleep(3);
+ pg_sleep 
+----------
+ 
+(1 row)
+
+CALL run_job(:compress_chunks_job_id);
+-- Chunk compression status
+SELECT DISTINCT compression_status FROM _timescaledb_internal.compressed_chunk_stats;
+ compression_status 
+--------------------
+ Compressed
+(1 row)
+
+SELECT remove_compression_policy('test');
+ remove_compression_policy 
+---------------------------
+ t
+(1 row)
+
 DROP TABLE test;
diff --git a/tsl/test/expected/tsl_tables.out b/tsl/test/expected/tsl_tables.out
index 484c0c45611..15b66dd99fe 100644
--- a/tsl/test/expected/tsl_tables.out
+++ b/tsl/test/expected/tsl_tables.out
@@ -115,7 +115,7 @@ SELECT * FROM _timescaledb_config.bgw_job WHERE id >= 1000 ORDER BY id;
 select add_retention_policy();
 ERROR:  function add_retention_policy() does not exist at character 8
 select add_retention_policy('test_table');
-ERROR:  function add_retention_policy(unknown) does not exist at character 8
+ERROR:  need to specify one of "drop_after" or "drop_created_before"
 select add_retention_policy(INTERVAL '3 hours');
 ERROR:  function add_retention_policy(interval) does not exist at character 8
 select add_retention_policy('test_table', INTERVAL 'haha');
@@ -198,6 +198,7 @@ select * from _timescaledb_catalog.dimension;
 \c :TEST_DBNAME :ROLE_SUPERUSER
 CREATE SCHEMA IF NOT EXISTS my_new_schema;
 create or replace function my_new_schema.dummy_now2() returns BIGINT LANGUAGE SQL IMMUTABLE as  'SELECT 1::BIGINT';
+grant usage on SCHEMA my_new_schema to public;
 grant execute on ALL FUNCTIONS IN SCHEMA my_new_schema to public;
 select set_integer_now_func('test_table_int', 'my_new_schema.dummy_now2');
  set_integer_now_func 
diff --git a/tsl/test/shared/expected/compat.out b/tsl/test/shared/expected/compat.out
index 54419936385..9667ae1dc0a 100644
--- a/tsl/test/shared/expected/compat.out
+++ b/tsl/test/shared/expected/compat.out
@@ -407,8 +407,8 @@ ERROR:  null values cannot be formatted as an SQL identifier
 CALL _timescaledb_internal.policy_compression(0,NULL);
 WARNING:  procedure _timescaledb_internal.policy_compression(integer,jsonb) is deprecated and has been moved to _timescaledb_functions schema. this compatibility function will be removed in a future version.
 ERROR:  job 0 has null config
-CALL _timescaledb_internal.policy_compression_execute(0,0,NULL::interval,0,true,true);
-WARNING:  procedure _timescaledb_internal.policy_compression_execute(integer,integer,anyelement,integer,boolean,boolean) is deprecated and has been moved to _timescaledb_functions schema. this compatibility function will be removed in a future version.
+CALL _timescaledb_internal.policy_compression_execute(0,0,NULL::interval,0,true,true,true);
+WARNING:  procedure _timescaledb_internal.policy_compression_execute(integer,integer,anyelement,integer,boolean,boolean,boolean) is deprecated and has been moved to _timescaledb_functions schema. this compatibility function will be removed in a future version.
 ERROR:  invalid hypertable or continuous aggregate
 CALL _timescaledb_internal.policy_recompression(0,NULL);
 WARNING:  procedure _timescaledb_internal.policy_recompression(integer,jsonb) is deprecated and has been moved to _timescaledb_functions schema. this compatibility function will be removed in a future version.
diff --git a/tsl/test/shared/expected/extension.out b/tsl/test/shared/expected/extension.out
index edc9142d60f..cc5097dca12 100644
--- a/tsl/test/shared/expected/extension.out
+++ b/tsl/test/shared/expected/extension.out
@@ -112,7 +112,7 @@ ORDER BY pronamespace::regnamespace::text COLLATE "C", p.oid::regprocedure::text
  _timescaledb_functions.ping_data_node(name,interval)
  _timescaledb_functions.policy_compression(integer,jsonb)
  _timescaledb_functions.policy_compression_check(jsonb)
- _timescaledb_functions.policy_compression_execute(integer,integer,anyelement,integer,boolean,boolean)
+ _timescaledb_functions.policy_compression_execute(integer,integer,anyelement,integer,boolean,boolean,boolean)
  _timescaledb_functions.policy_job_error_retention(integer,jsonb)
  _timescaledb_functions.policy_job_error_retention_check(jsonb)
  _timescaledb_functions.policy_recompression(integer,jsonb)
@@ -222,7 +222,7 @@ ORDER BY pronamespace::regnamespace::text COLLATE "C", p.oid::regprocedure::text
  _timescaledb_internal.ping_data_node(name,interval)
  _timescaledb_internal.policy_compression(integer,jsonb)
  _timescaledb_internal.policy_compression_check(jsonb)
- _timescaledb_internal.policy_compression_execute(integer,integer,anyelement,integer,boolean,boolean)
+ _timescaledb_internal.policy_compression_execute(integer,integer,anyelement,integer,boolean,boolean,boolean)
  _timescaledb_internal.policy_job_error_retention(integer,jsonb)
  _timescaledb_internal.policy_job_error_retention_check(jsonb)
  _timescaledb_internal.policy_recompression(integer,jsonb)
@@ -259,14 +259,14 @@ ORDER BY pronamespace::regnamespace::text COLLATE "C", p.oid::regprocedure::text
  debug_waitpoint_enable(text)
  debug_waitpoint_id(text)
  debug_waitpoint_release(text)
- add_compression_policy(regclass,"any",boolean,interval,timestamp with time zone,text)
+ add_compression_policy(regclass,"any",boolean,interval,timestamp with time zone,text,interval)
  add_continuous_aggregate_policy(regclass,"any","any",interval,boolean,timestamp with time zone,text)
  add_data_node(name,text,name,integer,boolean,boolean,text)
  add_dimension(regclass,_timescaledb_internal.dimension_info,boolean)
  add_dimension(regclass,name,integer,anyelement,regproc,boolean)
  add_job(regproc,interval,jsonb,timestamp with time zone,boolean,regproc,boolean,text)
  add_reorder_policy(regclass,name,boolean,timestamp with time zone,text)
- add_retention_policy(regclass,"any",boolean,interval,timestamp with time zone,text)
+ add_retention_policy(regclass,"any",boolean,interval,timestamp with time zone,text,interval)
  alter_data_node(name,text,name,integer,boolean)
  alter_job(integer,interval,interval,integer,interval,boolean,jsonb,timestamp with time zone,boolean,regproc,boolean,timestamp with time zone,text)
  approximate_row_count(regclass)
diff --git a/tsl/test/shared/sql/compat.sql b/tsl/test/shared/sql/compat.sql
index b45081c7685..cb6b7cda9ec 100644
--- a/tsl/test/shared/sql/compat.sql
+++ b/tsl/test/shared/sql/compat.sql
@@ -100,7 +100,7 @@ CALL _timescaledb_internal.cagg_migrate_execute_override_cagg(NULL,NULL);
 CALL _timescaledb_internal.cagg_migrate_execute_plan(NULL);
 CALL _timescaledb_internal.cagg_migrate_execute_refresh_new_cagg(NULL,NULL);
 CALL _timescaledb_internal.policy_compression(0,NULL);
-CALL _timescaledb_internal.policy_compression_execute(0,0,NULL::interval,0,true,true);
+CALL _timescaledb_internal.policy_compression_execute(0,0,NULL::interval,0,true,true,true);
 CALL _timescaledb_internal.policy_recompression(0,NULL);
 CALL _timescaledb_internal.policy_refresh_continuous_aggregate(0,NULL);
 CALL _timescaledb_internal.policy_reorder(0,NULL);
diff --git a/tsl/test/sql/bgw_policy.sql b/tsl/test/sql/bgw_policy.sql
index 4666138669e..e8844cc3c3b 100644
--- a/tsl/test/sql/bgw_policy.sql
+++ b/tsl/test/sql/bgw_policy.sql
@@ -313,13 +313,17 @@ select * from  _timescaledb_catalog.dimension;
 alter schema new_public rename to public;
 
 \c  :TEST_DBNAME :ROLE_DEFAULT_PERM_USER_2
+-- test that the behavior is strict when providing NULL required arguments
+create table test_strict (time timestamptz not null, a int, b int);
+select create_hypertable('test_strict', 'time');
+
 \set ON_ERROR_STOP 0
 select add_reorder_policy('test_table_perm', 'test_table_perm_pkey');
 select remove_reorder_policy('test_table');
 
 select add_retention_policy('test_table_perm', INTERVAL '4 months', true);
 select remove_retention_policy('test_table');
-
+select add_retention_policy('test_strict', drop_after => NULL);
 \set ON_ERROR_STOP 1
 
 -- Check the number of non-telemetry policies. We check for telemetry
@@ -330,11 +334,7 @@ WHERE proc_name NOT LIKE '%telemetry%'
 GROUP BY proc_name;
 
 
--- test that the behavior is strict when providing NULL required arguments
-create table test_strict (time timestamptz not null, a int, b int);
-select create_hypertable('test_strict', 'time');
 -- test retention with null arguments
-select add_retention_policy('test_strict', drop_after => NULL);
 select add_retention_policy(NULL, NULL);
 select add_retention_policy(NULL, drop_after => interval '2 days');
 -- this is an optional argument
@@ -342,7 +342,6 @@ select add_retention_policy('test_strict', drop_after => interval '2 days', if_n
 select add_retention_policy('test_strict', interval '2 days', schedule_interval => NULL);
 -- test compression with null arguments
 alter table test_strict set (timescaledb.compress);
-select add_compression_policy('test_strict', compress_after => NULL);
 select add_compression_policy(NULL, compress_after => NULL);
 select add_compression_policy('test_strict', INTERVAL '2 weeks', if_not_exists => NULL);
 select add_compression_policy('test_strict', INTERVAL '2 weeks', schedule_interval => NULL);
@@ -366,6 +365,7 @@ select * from _timescaledb_config.bgw_job where id in (:retenion_id_missing_sche
 
 -- test policy check functions with NULL args
 \set ON_ERROR_STOP 0
+select add_compression_policy('test_strict', compress_after => NULL);
 SELECT _timescaledb_functions.policy_compression_check(NULL);
 SELECT _timescaledb_functions.policy_refresh_continuous_aggregate_check(NULL);
 SELECT _timescaledb_functions.policy_reorder_check(NULL);
diff --git a/tsl/test/sql/cagg_policy.sql b/tsl/test/sql/cagg_policy.sql
index 18935a5d5c9..c752c422723 100644
--- a/tsl/test/sql/cagg_policy.sql
+++ b/tsl/test/sql/cagg_policy.sql
@@ -540,9 +540,12 @@ SELECT add_compression_policy('metrics_cagg', '1 day'::interval);
 --can set compression policy only after enabling compression --
 SELECT add_continuous_aggregate_policy('metrics_cagg', '7 day'::interval, '1 day'::interval, '1 h'::interval) as "REFRESH_JOB" \gset
 SELECT add_compression_policy('metrics_cagg', '8 day'::interval) AS "COMP_JOB" ;
+ALTER MATERIALIZED VIEW metrics_cagg SET (timescaledb.compress);
+
+--cannot use compress_created_before with cagg
+SELECT add_compression_policy('metrics_cagg', compress_created_before => '8 day'::interval) AS "COMP_JOB" ;
 \set ON_ERROR_STOP 1
 
-ALTER MATERIALIZED VIEW metrics_cagg SET (timescaledb.compress);
 
 SELECT add_compression_policy('metrics_cagg', '8 day'::interval) AS "COMP_JOB" ;
 SELECT remove_compression_policy('metrics_cagg');
diff --git a/tsl/test/sql/policy_generalization.sql b/tsl/test/sql/policy_generalization.sql
index 7162b706f62..47582ad7771 100644
--- a/tsl/test/sql/policy_generalization.sql
+++ b/tsl/test/sql/policy_generalization.sql
@@ -28,4 +28,38 @@ select count(*) from timescaledb_information.chunks where hypertable_name='test'
 SELECT count(*) from drop_chunks('test', created_after => INTERVAL '1 hour');
 select count(*) from timescaledb_information.chunks where hypertable_name='test';
 
+-- retention policy
+INSERT INTO test SELECT i, i %10, 0.10 FROM generate_series(1, 100, 1) i;
+\set ON_ERROR_STOP 0
+-- interval input for "drop_after" for INTEGER partitioning errors out
+SELECT add_retention_policy('test', INTERVAL '5 seconds', true);
+-- integer input for "drop_after" for INTEGER partitioning without valid
+-- integer_now function errors out
+SELECT add_retention_policy('test', 2000, true);
+\set ON_ERROR_STOP 1
+SELECT add_retention_policy('test', drop_created_before => INTERVAL '2 seconds',
+    if_not_exists => true) as drop_chunks_job_id \gset
+CALL run_job(:drop_chunks_job_id);
+select count(*) from timescaledb_information.chunks where hypertable_name='test';
+SELECT pg_sleep(3);
+CALL run_job(:drop_chunks_job_id);
+select count(*) from timescaledb_information.chunks where hypertable_name='test';
+SELECT remove_retention_policy('test');
+
+-- compression policy
+ALTER TABLE test SET (timescaledb.compress);
+INSERT INTO test SELECT i, i %10, 0.10 FROM generate_series(1, 100, 1) i;
+
+-- Chunk compression status
+SELECT DISTINCT compression_status FROM _timescaledb_internal.compressed_chunk_stats;
+
+-- Compression policy
+SELECT add_compression_policy('test', compress_created_before => INTERVAL '2 seconds') AS compress_chunks_job_id \gset
+SELECT pg_sleep(3);
+CALL run_job(:compress_chunks_job_id);
+
+-- Chunk compression status
+SELECT DISTINCT compression_status FROM _timescaledb_internal.compressed_chunk_stats;
+SELECT remove_compression_policy('test');
+
 DROP TABLE test;
diff --git a/tsl/test/sql/tsl_tables.sql b/tsl/test/sql/tsl_tables.sql
index 73cd5c47234..b2ff2053d74 100644
--- a/tsl/test/sql/tsl_tables.sql
+++ b/tsl/test/sql/tsl_tables.sql
@@ -90,6 +90,7 @@ select * from _timescaledb_catalog.dimension;
 
 CREATE SCHEMA IF NOT EXISTS my_new_schema;
 create or replace function my_new_schema.dummy_now2() returns BIGINT LANGUAGE SQL IMMUTABLE as  'SELECT 1::BIGINT';
+grant usage on SCHEMA my_new_schema to public;
 grant execute on ALL FUNCTIONS IN SCHEMA my_new_schema to public;
 select set_integer_now_func('test_table_int', 'my_new_schema.dummy_now2');