Skip to content

Commit

Permalink
Change parameter name to enable Hypercore TAM
Browse files Browse the repository at this point in the history
Changing from using the `compress_using` parameter with a table access
method name to use the boolean parameter `hypercore_use_access_method`
instead to avoid having to provide a name when using the table access
method for compression.
  • Loading branch information
mkindahl committed Nov 4, 2024
1 parent 76e3b27 commit 159ea80
Show file tree
Hide file tree
Showing 41 changed files with 182 additions and 211 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/pgspot.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
--proc-without-search-path
'_timescaledb_functions.policy_compression_execute(job_id integer,htid integer,lag anyelement,maxchunks integer,verbose_log boolean,recompress_enabled boolean,use_creation_time boolean)'
--proc-without-search-path
'_timescaledb_functions.policy_compression_execute(job_id integer,htid integer,lag anyelement,maxchunks integer,verbose_log boolean,recompress_enabled boolean,use_creation_time boolean,amname name)'
'_timescaledb_functions.policy_compression_execute(job_id integer,htid integer,lag anyelement,maxchunks integer,verbose_log boolean,recompress_enabled boolean,use_creation_time boolean,useam boolean)'
--proc-without-search-path
'_timescaledb_internal.policy_compression_execute(job_id integer,htid integer,lag anyelement,maxchunks integer,verbose_log boolean,recompress_enabled boolean)'
--proc-without-search-path
Expand Down
1 change: 1 addition & 0 deletions .unreleased/pr_7411
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implements: #7411 Change parameter name to enable Hypercore TAM
2 changes: 1 addition & 1 deletion sql/maintenance_utils.sql
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ CREATE OR REPLACE FUNCTION @[email protected]_chunk(
uncompressed_chunk REGCLASS,
if_not_compressed BOOLEAN = true,
recompress BOOLEAN = false,
compress_using NAME = NULL
hypercore_use_access_method BOOL = NULL
) RETURNS REGCLASS AS '@MODULE_PATHNAME@', 'ts_compress_chunk' LANGUAGE C VOLATILE;

CREATE OR REPLACE FUNCTION @[email protected]_chunk(
Expand Down
4 changes: 2 additions & 2 deletions sql/policy_api.sql
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ CREATE OR REPLACE FUNCTION @[email protected]_compression_policy(
initial_start TIMESTAMPTZ = NULL,
timezone TEXT = NULL,
compress_created_before INTERVAL = NULL,
compress_using NAME = NULL
hypercore_use_access_method BOOL = NULL
)
RETURNS INTEGER
AS '@MODULE_PATHNAME@', 'ts_policy_compression_add'
Expand Down Expand Up @@ -95,7 +95,7 @@ CREATE OR REPLACE FUNCTION timescaledb_experimental.add_policies(
refresh_end_offset "any" = NULL,
compress_after "any" = NULL,
drop_after "any" = NULL,
compress_using NAME = NULL)
hypercore_use_access_method BOOL = NULL)
RETURNS BOOL
AS '@MODULE_PATHNAME@', 'ts_policies_add'
LANGUAGE C VOLATILE;
Expand Down
18 changes: 9 additions & 9 deletions sql/policy_internal.sql
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ _timescaledb_functions.policy_compression_execute(
verbose_log BOOLEAN,
recompress_enabled BOOLEAN,
use_creation_time BOOLEAN,
amname NAME = NULL)
useam BOOLEAN = NULL)
AS $$
DECLARE
htoid REGCLASS;
Expand Down Expand Up @@ -109,7 +109,7 @@ BEGIN
LOOP
IF chunk_rec.status = 0 THEN
BEGIN
PERFORM @[email protected]_chunk(chunk_rec.oid, compress_using => amname);
PERFORM @[email protected]_chunk(chunk_rec.oid, hypercore_use_access_method => useam);
EXCEPTION WHEN OTHERS THEN
GET STACKED DIAGNOSTICS
_message = MESSAGE_TEXT,
Expand All @@ -134,7 +134,7 @@ BEGIN
PERFORM _timescaledb_functions.recompress_chunk_segmentwise(chunk_rec.oid);
ELSE
PERFORM @[email protected]_chunk(chunk_rec.oid, if_compressed => true);
PERFORM @[email protected]_chunk(chunk_rec.oid, compress_using => amname);
PERFORM @[email protected]_chunk(chunk_rec.oid, hypercore_use_access_method => useam);
END IF;
EXCEPTION WHEN OTHERS THEN
GET STACKED DIAGNOSTICS
Expand Down Expand Up @@ -187,7 +187,7 @@ DECLARE
numchunks INTEGER := 1;
recompress_enabled BOOL;
use_creation_time BOOL := FALSE;
compress_using TEXT;
hypercore_use_access_method BOOL;
BEGIN

-- procedures with SET clause cannot execute transaction
Expand Down Expand Up @@ -228,29 +228,29 @@ BEGIN
lag_value := compress_after;
END IF;

compress_using := jsonb_object_field_text(config, 'compress_using')::name;
hypercore_use_access_method := jsonb_object_field_text(config, 'hypercore_use_access_method')::bool;

-- execute the properly type casts for the lag value
CASE dimtype
WHEN 'TIMESTAMP'::regtype, 'TIMESTAMPTZ'::regtype, 'DATE'::regtype, 'INTERVAL' ::regtype THEN
CALL _timescaledb_functions.policy_compression_execute(
job_id, htid, lag_value::INTERVAL,
maxchunks, verbose_log, recompress_enabled, use_creation_time, compress_using
maxchunks, verbose_log, recompress_enabled, use_creation_time, hypercore_use_access_method
);
WHEN 'BIGINT'::regtype THEN
CALL _timescaledb_functions.policy_compression_execute(
job_id, htid, lag_value::BIGINT,
maxchunks, verbose_log, recompress_enabled, use_creation_time, compress_using
maxchunks, verbose_log, recompress_enabled, use_creation_time, hypercore_use_access_method
);
WHEN 'INTEGER'::regtype THEN
CALL _timescaledb_functions.policy_compression_execute(
job_id, htid, lag_value::INTEGER,
maxchunks, verbose_log, recompress_enabled, use_creation_time, compress_using
maxchunks, verbose_log, recompress_enabled, use_creation_time, hypercore_use_access_method
);
WHEN 'SMALLINT'::regtype THEN
CALL _timescaledb_functions.policy_compression_execute(
job_id, htid, lag_value::SMALLINT,
maxchunks, verbose_log, recompress_enabled, use_creation_time, compress_using
maxchunks, verbose_log, recompress_enabled, use_creation_time, hypercore_use_access_method
);
END CASE;
END;
Expand Down
6 changes: 3 additions & 3 deletions sql/updates/latest-dev.sql
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ CREATE FUNCTION @[email protected]_chunk(
uncompressed_chunk REGCLASS,
if_not_compressed BOOLEAN = true,
recompress BOOLEAN = false,
compress_using NAME = NULL
hypercore_use_access_method BOOL = NULL
) RETURNS REGCLASS AS '@MODULE_PATHNAME@', 'ts_update_placeholder' LANGUAGE C VOLATILE;

DROP FUNCTION IF EXISTS @[email protected]_compression_policy(hypertable REGCLASS, compress_after "any", if_not_exists BOOL, schedule_interval INTERVAL, initial_start TIMESTAMPTZ, timezone TEXT, compress_created_before INTERVAL);
Expand All @@ -24,7 +24,7 @@ CREATE FUNCTION @[email protected]_compression_policy(
initial_start TIMESTAMPTZ = NULL,
timezone TEXT = NULL,
compress_created_before INTERVAL = NULL,
compress_using NAME = NULL
hypercore_use_access_method BOOL = NULL
)
RETURNS INTEGER
AS '@MODULE_PATHNAME@', 'ts_update_placeholder'
Expand All @@ -39,7 +39,7 @@ CREATE FUNCTION timescaledb_experimental.add_policies(
refresh_end_offset "any" = NULL,
compress_after "any" = NULL,
drop_after "any" = NULL,
compress_using NAME = NULL)
hypercore_use_access_method BOOL = NULL)
RETURNS BOOL
AS '@MODULE_PATHNAME@', 'ts_update_placeholder'
LANGUAGE C VOLATILE;
Expand Down
8 changes: 4 additions & 4 deletions sql/updates/reverse-dev.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ DROP ACCESS METHOD IF EXISTS hypercore;
DROP FUNCTION IF EXISTS ts_hypercore_handler;
DROP FUNCTION IF EXISTS _timescaledb_debug.is_compressed_tid;

DROP FUNCTION IF EXISTS @[email protected]_chunk(uncompressed_chunk REGCLASS, if_not_compressed BOOLEAN, recompress BOOLEAN, compress_using NAME);
DROP FUNCTION IF EXISTS @[email protected]_chunk(uncompressed_chunk REGCLASS, if_not_compressed BOOLEAN, recompress BOOLEAN, hypercore_use_access_method BOOL);

CREATE FUNCTION @[email protected]_chunk(
uncompressed_chunk REGCLASS,
if_not_compressed BOOLEAN = true,
recompress BOOLEAN = false
) RETURNS REGCLASS AS '@MODULE_PATHNAME@', 'ts_compress_chunk' LANGUAGE C STRICT VOLATILE;

DROP FUNCTION IF EXISTS @[email protected]_compression_policy(hypertable REGCLASS, compress_after "any", if_not_exists BOOL, schedule_interval INTERVAL, initial_start TIMESTAMPTZ, timezone TEXT, compress_created_before INTERVAL, compress_using NAME);
DROP FUNCTION IF EXISTS @[email protected]_compression_policy(hypertable REGCLASS, compress_after "any", if_not_exists BOOL, schedule_interval INTERVAL, initial_start TIMESTAMPTZ, timezone TEXT, compress_created_before INTERVAL, hypercore_use_access_method BOOL);

CREATE FUNCTION @[email protected]_compression_policy(
hypertable REGCLASS,
Expand All @@ -28,7 +28,7 @@ RETURNS INTEGER
AS '@MODULE_PATHNAME@', 'ts_policy_compression_add'
LANGUAGE C VOLATILE;

DROP FUNCTION IF EXISTS timescaledb_experimental.add_policies(relation REGCLASS, if_not_exists BOOL, refresh_start_offset "any", refresh_end_offset "any", compress_after "any", drop_after "any", compress_using NAME);
DROP FUNCTION IF EXISTS timescaledb_experimental.add_policies(relation REGCLASS, if_not_exists BOOL, refresh_start_offset "any", refresh_end_offset "any", compress_after "any", drop_after "any", hypercore_use_access_method BOOL);

CREATE FUNCTION timescaledb_experimental.add_policies(
relation REGCLASS,
Expand All @@ -41,6 +41,6 @@ RETURNS BOOL
AS '@MODULE_PATHNAME@', 'ts_policies_add'
LANGUAGE C VOLATILE;

DROP PROCEDURE IF EXISTS _timescaledb_functions.policy_compression_execute(job_id INTEGER, htid INTEGER, lag ANYELEMENT, maxchunks INTEGER, verbose_log BOOLEAN, recompress_enabled BOOLEAN, use_creation_time BOOLEAN, amname NAME);
DROP PROCEDURE IF EXISTS _timescaledb_functions.policy_compression_execute(job_id INTEGER, htid INTEGER, lag ANYELEMENT, maxchunks INTEGER, verbose_log BOOLEAN, recompress_enabled BOOLEAN, use_creation_time BOOLEAN, useam BOOLEAN);

DROP PROCEDURE IF EXISTS _timescaledb_functions.policy_compression(job_id INTEGER, config JSONB);
20 changes: 9 additions & 11 deletions tsl/src/bgw_policy/compression_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include <postgres.h>
#include <access/xact.h>
#include <fmgr.h>
#include <miscadmin.h>
#include <utils/builtins.h>

Expand All @@ -18,6 +19,7 @@
#include "bgw_policy/job.h"
#include "bgw_policy/job_api.h"
#include "bgw_policy/policies_v2.h"
#include "compression/api.h"
#include "errors.h"
#include "guc.h"
#include "hypertable.h"
Expand Down Expand Up @@ -158,7 +160,7 @@ policy_compression_add_internal(Oid user_rel_oid, Datum compress_after_datum,
Interval *default_schedule_interval,
bool user_defined_schedule_interval, bool if_not_exists,
bool fixed_schedule, TimestampTz initial_start,
const char *timezone, const char *compress_using)
const char *timezone, UseAccessMethod use_access_method)
{
NameData application_name;
NameData proc_name, proc_schema, check_schema, check_name, owner;
Expand Down Expand Up @@ -282,12 +284,6 @@ policy_compression_add_internal(Oid user_rel_oid, Datum compress_after_datum,
}
}

if (compress_using != NULL && strcmp(compress_using, "heap") != 0 &&
strcmp(compress_using, TS_HYPERCORE_TAM_NAME) != 0)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("can only compress using \"heap\" or \"%s\"", TS_HYPERCORE_TAM_NAME)));

/* insert a new job into jobs table */
namestrcpy(&application_name, "Compression Policy");
namestrcpy(&proc_name, POLICY_COMPRESSION_PROC_NAME);
Expand All @@ -302,8 +298,10 @@ policy_compression_add_internal(Oid user_rel_oid, Datum compress_after_datum,
ts_jsonb_add_int32(parse_state, POL_COMPRESSION_CONF_KEY_HYPERTABLE_ID, hypertable->fd.id);
validate_compress_after_type(dim, partitioning_type, compress_after_type);

if (NULL != compress_using)
ts_jsonb_add_str(parse_state, POL_COMPRESSION_CONF_KEY_COMPRESS_USING, compress_using);
if (use_access_method != USE_AM_NULL)
ts_jsonb_add_bool(parse_state,
POL_COMPRESSION_CONF_KEY_USE_ACCESS_METHOD,
use_access_method);

switch (compress_after_type)
{
Expand Down Expand Up @@ -406,7 +404,7 @@ policy_compression_add(PG_FUNCTION_ARGS)
text *timezone = PG_ARGISNULL(5) ? NULL : PG_GETARG_TEXT_PP(5);
char *valid_timezone = NULL;
Interval *created_before = PG_GETARG_INTERVAL_P(6);
Name compress_using = PG_ARGISNULL(7) ? NULL : PG_GETARG_NAME(7);
UseAccessMethod use_access_method = PG_ARGISNULL(7) ? USE_AM_NULL : PG_GETARG_BOOL(7);

ts_feature_flag_check(FEATURE_POLICY);
TS_PREVENT_FUNC_IF_READ_ONLY();
Expand Down Expand Up @@ -440,7 +438,7 @@ policy_compression_add(PG_FUNCTION_ARGS)
fixed_schedule,
initial_start,
valid_timezone,
compress_using ? NameStr(*compress_using) : NULL);
use_access_method);

if (!TIMESTAMP_NOT_FINITE(initial_start))
{
Expand Down
3 changes: 2 additions & 1 deletion tsl/src/bgw_policy/compression_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#pragma once

#include <postgres.h>
#include "compression/api.h"
#include <utils/jsonb.h>
#include <utils/timestamp.h>

Expand All @@ -26,5 +27,5 @@ Datum policy_compression_add_internal(Oid user_rel_oid, Datum compress_after_dat
Interval *default_schedule_interval,
bool user_defined_schedule_interval, bool if_not_exists,
bool fixed_schedule, TimestampTz initial_start,
const char *timezone, const char *compress_using);
const char *timezone, UseAccessMethod use_access_method);
bool policy_compression_remove_internal(Oid user_rel_oid, bool if_exists);
5 changes: 3 additions & 2 deletions tsl/src/bgw_policy/policies_v2.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include <postgres.h>
#include <access/xact.h>
#include <fmgr.h>
#include <miscadmin.h>
#include <parser/parse_coerce.h>
#include <utils/builtins.h>
Expand Down Expand Up @@ -233,7 +234,7 @@ validate_and_create_policies(policies_info all_policies, bool if_exists)
false,
DT_NOBEGIN,
NULL,
all_policies.compress->compress_using);
all_policies.compress->use_access_method);
}

if (all_policies.retention && all_policies.retention->create_policy)
Expand Down Expand Up @@ -310,7 +311,7 @@ policies_add(PG_FUNCTION_ARGS)
.create_policy = true,
.compress_after = PG_GETARG_DATUM(4),
.compress_after_type = get_fn_expr_argtype(fcinfo->flinfo, 4),
.compress_using = PG_ARGISNULL(6) ? NULL : NameStr(*PG_GETARG_NAME(6)),
.use_access_method = PG_ARGISNULL(6) ? USE_AM_NULL : PG_GETARG_BOOL(6),
};
comp = tmp;
all_policies.compress = &comp;
Expand Down
5 changes: 3 additions & 2 deletions tsl/src/bgw_policy/policies_v2.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#pragma once

#include <postgres.h>
#include "compression/api.h"
#include "dimension.h"
#include <bgw_policy/compression_api.h>
#include <bgw_policy/continuous_aggregate_api.h>
Expand All @@ -25,7 +26,7 @@
#define POL_COMPRESSION_CONF_KEY_COMPRESS_AFTER "compress_after"
#define POL_COMPRESSION_CONF_KEY_MAXCHUNKS_TO_COMPRESS "maxchunks_to_compress"
#define POL_COMPRESSION_CONF_KEY_COMPRESS_CREATED_BEFORE "compress_created_before"
#define POL_COMPRESSION_CONF_KEY_COMPRESS_USING "compress_using"
#define POL_COMPRESSION_CONF_KEY_USE_ACCESS_METHOD "hypercore_use_access_method"

#define POLICY_RECOMPRESSION_PROC_NAME "policy_recompression"
#define POL_RECOMPRESSION_CONF_KEY_RECOMPRESS_AFTER "recompress_after"
Expand Down Expand Up @@ -89,7 +90,7 @@ typedef struct compression_policy
Datum compress_after;
Oid compress_after_type;
bool create_policy;
const char *compress_using;
UseAccessMethod use_access_method;
} compression_policy;

typedef struct retention_policy
Expand Down
31 changes: 3 additions & 28 deletions tsl/src/compression/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
* compress and decompress chunks
*/
#include <postgres.h>
#include "guc.h"
#include <access/tableam.h>
#include <access/xact.h>
#include <catalog/dependency.h>
Expand Down Expand Up @@ -778,31 +779,6 @@ set_access_method(Oid relid, const char *amname)
return relid;
}

enum UseAccessMethod
{
USE_AM_FALSE,
USE_AM_TRUE,
USE_AM_NULL,
};

static enum UseAccessMethod
parse_use_access_method(const char *compress_using)
{
if (compress_using == NULL)
return USE_AM_NULL;

if (strcmp(compress_using, "heap") == 0)
return USE_AM_FALSE;
else if (strcmp(compress_using, TS_HYPERCORE_TAM_NAME) == 0)
return USE_AM_TRUE;

ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("can only compress using \"heap\" or \"%s\"", TS_HYPERCORE_TAM_NAME)));

pg_unreachable();
}

/*
* When using compress_chunk() with hypercore, there are three cases to
* handle:
Expand All @@ -814,7 +790,7 @@ parse_use_access_method(const char *compress_using)
* 3. Recompress a hypercore
*/
static Oid
compress_hypercore(Chunk *chunk, bool rel_is_hypercore, enum UseAccessMethod useam,
compress_hypercore(Chunk *chunk, bool rel_is_hypercore, UseAccessMethod useam,
bool if_not_compressed, bool recompress)
{
Oid relid = InvalidOid;
Expand Down Expand Up @@ -868,14 +844,13 @@ tsl_compress_chunk(PG_FUNCTION_ARGS)
Oid uncompressed_chunk_id = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
bool if_not_compressed = PG_ARGISNULL(1) ? true : PG_GETARG_BOOL(1);
bool recompress = PG_ARGISNULL(2) ? false : PG_GETARG_BOOL(2);
const char *compress_using = PG_ARGISNULL(3) ? NULL : NameStr(*PG_GETARG_NAME(3));
UseAccessMethod useam = PG_ARGISNULL(3) ? USE_AM_NULL : PG_GETARG_BOOL(3);

ts_feature_flag_check(FEATURE_HYPERTABLE_COMPRESSION);

TS_PREVENT_FUNC_IF_READ_ONLY();
Chunk *chunk = ts_chunk_get_by_relid(uncompressed_chunk_id, true);
bool rel_is_hypercore = get_table_am_oid(TS_HYPERCORE_TAM_NAME, false) == chunk->amoid;
enum UseAccessMethod useam = parse_use_access_method(compress_using);

if (rel_is_hypercore || useam == USE_AM_TRUE)
uncompressed_chunk_id =
Expand Down
15 changes: 15 additions & 0 deletions tsl/src/compression/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,21 @@

#include "chunk.h"

/*
* Decide if the access method should be used for compression, or if it is
* undefined. Used for parameter values to PostgreSQL functions and is a
* nullable boolean.
*
* Using explicit values of TRUE = 1 and FALSE = 0 since this enum is cast to
* boolean value in the code.
*/
typedef enum UseAccessMethod
{
USE_AM_FALSE = 0,
USE_AM_TRUE = 1,
USE_AM_NULL = 2,
} UseAccessMethod;

extern Datum tsl_create_compressed_chunk(PG_FUNCTION_ARGS);
extern Datum tsl_compress_chunk(PG_FUNCTION_ARGS);
extern Datum tsl_decompress_chunk(PG_FUNCTION_ARGS);
Expand Down
Loading

0 comments on commit 159ea80

Please sign in to comment.