Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change parameter name to enable Hypercore TAM #7411

Merged
merged 1 commit into from
Nov 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/pgspot.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
--proc-without-search-path
'_timescaledb_functions.policy_compression_execute(job_id integer,htid integer,lag anyelement,maxchunks integer,verbose_log boolean,recompress_enabled boolean,use_creation_time boolean)'
--proc-without-search-path
'_timescaledb_functions.policy_compression_execute(job_id integer,htid integer,lag anyelement,maxchunks integer,verbose_log boolean,recompress_enabled boolean,use_creation_time boolean,amname name)'
'_timescaledb_functions.policy_compression_execute(job_id integer,htid integer,lag anyelement,maxchunks integer,verbose_log boolean,recompress_enabled boolean,use_creation_time boolean,useam boolean)'
--proc-without-search-path
'_timescaledb_internal.policy_compression_execute(job_id integer,htid integer,lag anyelement,maxchunks integer,verbose_log boolean,recompress_enabled boolean)'
--proc-without-search-path
Expand Down
1 change: 1 addition & 0 deletions .unreleased/pr_7411
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implements: #7411 Change parameter name to enable Hypercore TAM
2 changes: 1 addition & 1 deletion sql/maintenance_utils.sql
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ CREATE OR REPLACE FUNCTION @[email protected]_chunk(
uncompressed_chunk REGCLASS,
if_not_compressed BOOLEAN = true,
recompress BOOLEAN = false,
compress_using NAME = NULL
hypercore_use_access_method BOOL = NULL
) RETURNS REGCLASS AS '@MODULE_PATHNAME@', 'ts_compress_chunk' LANGUAGE C VOLATILE;

CREATE OR REPLACE FUNCTION @[email protected]_chunk(
Expand Down
4 changes: 2 additions & 2 deletions sql/policy_api.sql
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ CREATE OR REPLACE FUNCTION @[email protected]_compression_policy(
initial_start TIMESTAMPTZ = NULL,
timezone TEXT = NULL,
compress_created_before INTERVAL = NULL,
compress_using NAME = NULL
hypercore_use_access_method BOOL = NULL
)
RETURNS INTEGER
AS '@MODULE_PATHNAME@', 'ts_policy_compression_add'
Expand Down Expand Up @@ -95,7 +95,7 @@ CREATE OR REPLACE FUNCTION timescaledb_experimental.add_policies(
refresh_end_offset "any" = NULL,
compress_after "any" = NULL,
drop_after "any" = NULL,
compress_using NAME = NULL)
hypercore_use_access_method BOOL = NULL)
RETURNS BOOL
AS '@MODULE_PATHNAME@', 'ts_policies_add'
LANGUAGE C VOLATILE;
Expand Down
18 changes: 9 additions & 9 deletions sql/policy_internal.sql
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ _timescaledb_functions.policy_compression_execute(
verbose_log BOOLEAN,
recompress_enabled BOOLEAN,
use_creation_time BOOLEAN,
amname NAME = NULL)
useam BOOLEAN = NULL)
AS $$
DECLARE
htoid REGCLASS;
Expand Down Expand Up @@ -109,7 +109,7 @@ BEGIN
LOOP
IF chunk_rec.status = 0 THEN
BEGIN
PERFORM @[email protected]_chunk(chunk_rec.oid, compress_using => amname);
PERFORM @[email protected]_chunk(chunk_rec.oid, hypercore_use_access_method => useam);
EXCEPTION WHEN OTHERS THEN
GET STACKED DIAGNOSTICS
_message = MESSAGE_TEXT,
Expand All @@ -134,7 +134,7 @@ BEGIN
PERFORM _timescaledb_functions.recompress_chunk_segmentwise(chunk_rec.oid);
ELSE
PERFORM @[email protected]_chunk(chunk_rec.oid, if_compressed => true);
PERFORM @[email protected]_chunk(chunk_rec.oid, compress_using => amname);
PERFORM @[email protected]_chunk(chunk_rec.oid, hypercore_use_access_method => useam);
END IF;
EXCEPTION WHEN OTHERS THEN
GET STACKED DIAGNOSTICS
Expand Down Expand Up @@ -187,7 +187,7 @@ DECLARE
numchunks INTEGER := 1;
recompress_enabled BOOL;
use_creation_time BOOL := FALSE;
compress_using TEXT;
hypercore_use_access_method BOOL;
BEGIN

-- procedures with SET clause cannot execute transaction
Expand Down Expand Up @@ -228,29 +228,29 @@ BEGIN
lag_value := compress_after;
END IF;

compress_using := jsonb_object_field_text(config, 'compress_using')::name;
hypercore_use_access_method := jsonb_object_field_text(config, 'hypercore_use_access_method')::bool;

-- execute the properly type casts for the lag value
CASE dimtype
WHEN 'TIMESTAMP'::regtype, 'TIMESTAMPTZ'::regtype, 'DATE'::regtype, 'INTERVAL' ::regtype THEN
CALL _timescaledb_functions.policy_compression_execute(
job_id, htid, lag_value::INTERVAL,
maxchunks, verbose_log, recompress_enabled, use_creation_time, compress_using
maxchunks, verbose_log, recompress_enabled, use_creation_time, hypercore_use_access_method
);
WHEN 'BIGINT'::regtype THEN
CALL _timescaledb_functions.policy_compression_execute(
job_id, htid, lag_value::BIGINT,
maxchunks, verbose_log, recompress_enabled, use_creation_time, compress_using
maxchunks, verbose_log, recompress_enabled, use_creation_time, hypercore_use_access_method
);
WHEN 'INTEGER'::regtype THEN
CALL _timescaledb_functions.policy_compression_execute(
job_id, htid, lag_value::INTEGER,
maxchunks, verbose_log, recompress_enabled, use_creation_time, compress_using
maxchunks, verbose_log, recompress_enabled, use_creation_time, hypercore_use_access_method
);
WHEN 'SMALLINT'::regtype THEN
CALL _timescaledb_functions.policy_compression_execute(
job_id, htid, lag_value::SMALLINT,
maxchunks, verbose_log, recompress_enabled, use_creation_time, compress_using
maxchunks, verbose_log, recompress_enabled, use_creation_time, hypercore_use_access_method
);
END CASE;
END;
Expand Down
6 changes: 3 additions & 3 deletions sql/updates/latest-dev.sql
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ CREATE FUNCTION @[email protected]_chunk(
uncompressed_chunk REGCLASS,
if_not_compressed BOOLEAN = true,
recompress BOOLEAN = false,
compress_using NAME = NULL
hypercore_use_access_method BOOL = NULL
) RETURNS REGCLASS AS '@MODULE_PATHNAME@', 'ts_update_placeholder' LANGUAGE C VOLATILE;

DROP FUNCTION IF EXISTS @[email protected]_compression_policy(hypertable REGCLASS, compress_after "any", if_not_exists BOOL, schedule_interval INTERVAL, initial_start TIMESTAMPTZ, timezone TEXT, compress_created_before INTERVAL);
Expand All @@ -24,7 +24,7 @@ CREATE FUNCTION @[email protected]_compression_policy(
initial_start TIMESTAMPTZ = NULL,
timezone TEXT = NULL,
compress_created_before INTERVAL = NULL,
compress_using NAME = NULL
hypercore_use_access_method BOOL = NULL
)
RETURNS INTEGER
AS '@MODULE_PATHNAME@', 'ts_update_placeholder'
Expand All @@ -39,7 +39,7 @@ CREATE FUNCTION timescaledb_experimental.add_policies(
refresh_end_offset "any" = NULL,
compress_after "any" = NULL,
drop_after "any" = NULL,
compress_using NAME = NULL)
hypercore_use_access_method BOOL = NULL)
RETURNS BOOL
AS '@MODULE_PATHNAME@', 'ts_update_placeholder'
LANGUAGE C VOLATILE;
Expand Down
8 changes: 4 additions & 4 deletions sql/updates/reverse-dev.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ DROP ACCESS METHOD IF EXISTS hypercore;
DROP FUNCTION IF EXISTS ts_hypercore_handler;
DROP FUNCTION IF EXISTS _timescaledb_debug.is_compressed_tid;

DROP FUNCTION IF EXISTS @[email protected]_chunk(uncompressed_chunk REGCLASS, if_not_compressed BOOLEAN, recompress BOOLEAN, compress_using NAME);
DROP FUNCTION IF EXISTS @[email protected]_chunk(uncompressed_chunk REGCLASS, if_not_compressed BOOLEAN, recompress BOOLEAN, hypercore_use_access_method BOOL);

CREATE FUNCTION @[email protected]_chunk(
uncompressed_chunk REGCLASS,
if_not_compressed BOOLEAN = true,
recompress BOOLEAN = false
) RETURNS REGCLASS AS '@MODULE_PATHNAME@', 'ts_compress_chunk' LANGUAGE C STRICT VOLATILE;

DROP FUNCTION IF EXISTS @[email protected]_compression_policy(hypertable REGCLASS, compress_after "any", if_not_exists BOOL, schedule_interval INTERVAL, initial_start TIMESTAMPTZ, timezone TEXT, compress_created_before INTERVAL, compress_using NAME);
DROP FUNCTION IF EXISTS @[email protected]_compression_policy(hypertable REGCLASS, compress_after "any", if_not_exists BOOL, schedule_interval INTERVAL, initial_start TIMESTAMPTZ, timezone TEXT, compress_created_before INTERVAL, hypercore_use_access_method BOOL);

CREATE FUNCTION @[email protected]_compression_policy(
hypertable REGCLASS,
Expand All @@ -28,7 +28,7 @@ RETURNS INTEGER
AS '@MODULE_PATHNAME@', 'ts_policy_compression_add'
LANGUAGE C VOLATILE;

DROP FUNCTION IF EXISTS timescaledb_experimental.add_policies(relation REGCLASS, if_not_exists BOOL, refresh_start_offset "any", refresh_end_offset "any", compress_after "any", drop_after "any", compress_using NAME);
DROP FUNCTION IF EXISTS timescaledb_experimental.add_policies(relation REGCLASS, if_not_exists BOOL, refresh_start_offset "any", refresh_end_offset "any", compress_after "any", drop_after "any", hypercore_use_access_method BOOL);

CREATE FUNCTION timescaledb_experimental.add_policies(
relation REGCLASS,
Expand All @@ -41,6 +41,6 @@ RETURNS BOOL
AS '@MODULE_PATHNAME@', 'ts_policies_add'
LANGUAGE C VOLATILE;

DROP PROCEDURE IF EXISTS _timescaledb_functions.policy_compression_execute(job_id INTEGER, htid INTEGER, lag ANYELEMENT, maxchunks INTEGER, verbose_log BOOLEAN, recompress_enabled BOOLEAN, use_creation_time BOOLEAN, amname NAME);
DROP PROCEDURE IF EXISTS _timescaledb_functions.policy_compression_execute(job_id INTEGER, htid INTEGER, lag ANYELEMENT, maxchunks INTEGER, verbose_log BOOLEAN, recompress_enabled BOOLEAN, use_creation_time BOOLEAN, useam BOOLEAN);

DROP PROCEDURE IF EXISTS _timescaledb_functions.policy_compression(job_id INTEGER, config JSONB);
20 changes: 9 additions & 11 deletions tsl/src/bgw_policy/compression_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include <postgres.h>
#include <access/xact.h>
#include <fmgr.h>
#include <miscadmin.h>
#include <utils/builtins.h>

Expand All @@ -18,6 +19,7 @@
#include "bgw_policy/job.h"
#include "bgw_policy/job_api.h"
#include "bgw_policy/policies_v2.h"
#include "compression/api.h"
#include "errors.h"
#include "guc.h"
#include "hypertable.h"
Expand Down Expand Up @@ -158,7 +160,7 @@ policy_compression_add_internal(Oid user_rel_oid, Datum compress_after_datum,
Interval *default_schedule_interval,
bool user_defined_schedule_interval, bool if_not_exists,
bool fixed_schedule, TimestampTz initial_start,
const char *timezone, const char *compress_using)
const char *timezone, UseAccessMethod use_access_method)
{
NameData application_name;
NameData proc_name, proc_schema, check_schema, check_name, owner;
Expand Down Expand Up @@ -282,12 +284,6 @@ policy_compression_add_internal(Oid user_rel_oid, Datum compress_after_datum,
}
}

if (compress_using != NULL && strcmp(compress_using, "heap") != 0 &&
strcmp(compress_using, TS_HYPERCORE_TAM_NAME) != 0)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("can only compress using \"heap\" or \"%s\"", TS_HYPERCORE_TAM_NAME)));

/* insert a new job into jobs table */
namestrcpy(&application_name, "Compression Policy");
namestrcpy(&proc_name, POLICY_COMPRESSION_PROC_NAME);
Expand All @@ -302,8 +298,10 @@ policy_compression_add_internal(Oid user_rel_oid, Datum compress_after_datum,
ts_jsonb_add_int32(parse_state, POL_COMPRESSION_CONF_KEY_HYPERTABLE_ID, hypertable->fd.id);
validate_compress_after_type(dim, partitioning_type, compress_after_type);

if (NULL != compress_using)
ts_jsonb_add_str(parse_state, POL_COMPRESSION_CONF_KEY_COMPRESS_USING, compress_using);
if (use_access_method != USE_AM_NULL)
ts_jsonb_add_bool(parse_state,
POL_COMPRESSION_CONF_KEY_USE_ACCESS_METHOD,
use_access_method);

switch (compress_after_type)
{
Expand Down Expand Up @@ -406,7 +404,7 @@ policy_compression_add(PG_FUNCTION_ARGS)
text *timezone = PG_ARGISNULL(5) ? NULL : PG_GETARG_TEXT_PP(5);
char *valid_timezone = NULL;
Interval *created_before = PG_GETARG_INTERVAL_P(6);
Name compress_using = PG_ARGISNULL(7) ? NULL : PG_GETARG_NAME(7);
UseAccessMethod use_access_method = PG_ARGISNULL(7) ? USE_AM_NULL : PG_GETARG_BOOL(7);

ts_feature_flag_check(FEATURE_POLICY);
TS_PREVENT_FUNC_IF_READ_ONLY();
Expand Down Expand Up @@ -440,7 +438,7 @@ policy_compression_add(PG_FUNCTION_ARGS)
fixed_schedule,
initial_start,
valid_timezone,
compress_using ? NameStr(*compress_using) : NULL);
use_access_method);

if (!TIMESTAMP_NOT_FINITE(initial_start))
{
Expand Down
3 changes: 2 additions & 1 deletion tsl/src/bgw_policy/compression_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#pragma once

#include <postgres.h>
#include "compression/api.h"
#include <utils/jsonb.h>
#include <utils/timestamp.h>

Expand All @@ -26,5 +27,5 @@ Datum policy_compression_add_internal(Oid user_rel_oid, Datum compress_after_dat
Interval *default_schedule_interval,
bool user_defined_schedule_interval, bool if_not_exists,
bool fixed_schedule, TimestampTz initial_start,
const char *timezone, const char *compress_using);
const char *timezone, UseAccessMethod use_access_method);
bool policy_compression_remove_internal(Oid user_rel_oid, bool if_exists);
5 changes: 3 additions & 2 deletions tsl/src/bgw_policy/policies_v2.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include <postgres.h>
#include <access/xact.h>
#include <fmgr.h>
#include <miscadmin.h>
#include <parser/parse_coerce.h>
#include <utils/builtins.h>
Expand Down Expand Up @@ -233,7 +234,7 @@ validate_and_create_policies(policies_info all_policies, bool if_exists)
false,
DT_NOBEGIN,
NULL,
all_policies.compress->compress_using);
all_policies.compress->use_access_method);
}

if (all_policies.retention && all_policies.retention->create_policy)
Expand Down Expand Up @@ -310,7 +311,7 @@ policies_add(PG_FUNCTION_ARGS)
.create_policy = true,
.compress_after = PG_GETARG_DATUM(4),
.compress_after_type = get_fn_expr_argtype(fcinfo->flinfo, 4),
.compress_using = PG_ARGISNULL(6) ? NULL : NameStr(*PG_GETARG_NAME(6)),
.use_access_method = PG_ARGISNULL(6) ? USE_AM_NULL : PG_GETARG_BOOL(6),
};
comp = tmp;
all_policies.compress = &comp;
Expand Down
5 changes: 3 additions & 2 deletions tsl/src/bgw_policy/policies_v2.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#pragma once

#include <postgres.h>
#include "compression/api.h"
#include "dimension.h"
#include <bgw_policy/compression_api.h>
#include <bgw_policy/continuous_aggregate_api.h>
Expand All @@ -25,7 +26,7 @@
#define POL_COMPRESSION_CONF_KEY_COMPRESS_AFTER "compress_after"
#define POL_COMPRESSION_CONF_KEY_MAXCHUNKS_TO_COMPRESS "maxchunks_to_compress"
#define POL_COMPRESSION_CONF_KEY_COMPRESS_CREATED_BEFORE "compress_created_before"
#define POL_COMPRESSION_CONF_KEY_COMPRESS_USING "compress_using"
#define POL_COMPRESSION_CONF_KEY_USE_ACCESS_METHOD "hypercore_use_access_method"

#define POLICY_RECOMPRESSION_PROC_NAME "policy_recompression"
#define POL_RECOMPRESSION_CONF_KEY_RECOMPRESS_AFTER "recompress_after"
Expand Down Expand Up @@ -89,7 +90,7 @@ typedef struct compression_policy
Datum compress_after;
Oid compress_after_type;
bool create_policy;
const char *compress_using;
UseAccessMethod use_access_method;
} compression_policy;

typedef struct retention_policy
Expand Down
30 changes: 2 additions & 28 deletions tsl/src/compression/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -779,31 +779,6 @@ set_access_method(Oid relid, const char *amname)
return relid;
}

enum UseAccessMethod
{
USE_AM_FALSE,
USE_AM_TRUE,
USE_AM_NULL,
};

static enum UseAccessMethod
parse_use_access_method(const char *compress_using)
{
if (compress_using == NULL)
return USE_AM_NULL;

if (strcmp(compress_using, "heap") == 0)
return USE_AM_FALSE;
else if (strcmp(compress_using, TS_HYPERCORE_TAM_NAME) == 0)
return USE_AM_TRUE;

ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("can only compress using \"heap\" or \"%s\"", TS_HYPERCORE_TAM_NAME)));

pg_unreachable();
}

/*
* When using compress_chunk() with hypercore, there are three cases to
* handle:
Expand All @@ -815,7 +790,7 @@ parse_use_access_method(const char *compress_using)
* 3. Recompress a hypercore
*/
static Oid
compress_hypercore(Chunk *chunk, bool rel_is_hypercore, enum UseAccessMethod useam,
compress_hypercore(Chunk *chunk, bool rel_is_hypercore, UseAccessMethod useam,
bool if_not_compressed, bool recompress)
{
Oid relid = InvalidOid;
Expand Down Expand Up @@ -869,14 +844,13 @@ tsl_compress_chunk(PG_FUNCTION_ARGS)
Oid uncompressed_chunk_id = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
bool if_not_compressed = PG_ARGISNULL(1) ? true : PG_GETARG_BOOL(1);
bool recompress = PG_ARGISNULL(2) ? false : PG_GETARG_BOOL(2);
const char *compress_using = PG_ARGISNULL(3) ? NULL : NameStr(*PG_GETARG_NAME(3));
UseAccessMethod useam = PG_ARGISNULL(3) ? USE_AM_NULL : PG_GETARG_BOOL(3);

ts_feature_flag_check(FEATURE_HYPERTABLE_COMPRESSION);

TS_PREVENT_FUNC_IF_READ_ONLY();
Chunk *chunk = ts_chunk_get_by_relid(uncompressed_chunk_id, true);
bool rel_is_hypercore = get_table_am_oid(TS_HYPERCORE_TAM_NAME, false) == chunk->amoid;
enum UseAccessMethod useam = parse_use_access_method(compress_using);

if (rel_is_hypercore || useam == USE_AM_TRUE)
uncompressed_chunk_id =
Expand Down
15 changes: 15 additions & 0 deletions tsl/src/compression/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,21 @@

#include "chunk.h"

/*
* Decide if the access method should be used for compression, or if it is
* undefined. Used for parameter values to PostgreSQL functions and is a
* nullable boolean.
*
* Using explicit values of TRUE = 1 and FALSE = 0 since this enum is cast to
* boolean value in the code.
*/
typedef enum UseAccessMethod
{
USE_AM_FALSE = 0,
USE_AM_TRUE = 1,
USE_AM_NULL = 2,
} UseAccessMethod;

extern Datum tsl_create_compressed_chunk(PG_FUNCTION_ARGS);
extern Datum tsl_compress_chunk(PG_FUNCTION_ARGS);
extern Datum tsl_decompress_chunk(PG_FUNCTION_ARGS);
Expand Down
2 changes: 1 addition & 1 deletion tsl/test/expected/hypercore_copy.out
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ select cl.oid::regclass as rel, am.amname, inh.inhparent::regclass as relparent
left join pg_inherits inh on (inh.inhrelid = cl.oid);
-- Compress the chunks and check that the counts are the same
select location_id, count(*) into orig from :hypertable GROUP BY location_id;
select compress_chunk(show_chunks(:'hypertable'), compress_using => 'hypercore');
select compress_chunk(show_chunks(:'hypertable'), hypercore_use_access_method => true);
compress_chunk
----------------------------------------
_timescaledb_internal._hyper_1_1_chunk
Expand Down
Loading
Loading