Skip to content

Commit

Permalink
Add drop_chunks hook for OSM
Browse files Browse the repository at this point in the history
Introduce version number for OsmCallbacks struct

Add a callback for cascading drop chunks to OSM and
integrate with drop_chunks

Add backward compatibility for OsmCallbacks
  • Loading branch information
gayyappan committed Sep 14, 2023
1 parent 7fbaf2a commit 79022f0
Show file tree
Hide file tree
Showing 8 changed files with 156 additions and 18 deletions.
1 change: 1 addition & 0 deletions .unreleased/PR_6067
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implements: #6067 Adds drop_chunks hook for OSM
31 changes: 27 additions & 4 deletions src/chunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -1159,9 +1159,9 @@ chunk_create_from_hypercube_after_lock(const Hypertable *ht, Hypercube *cube,
const char *prefix)
{
#if PG14_GE
OsmCallbacks *callbacks = ts_get_osm_callbacks();
chunk_insert_check_hook_type osm_chunk_insert_hook = ts_get_osm_chunk_insert_hook();

if (callbacks)
if (osm_chunk_insert_hook)
{
/* OSM only uses first dimension . doesn't work with multinode tables yet*/
Dimension *dim = &ht->space->dimensions[0];
Expand All @@ -1171,8 +1171,7 @@ chunk_create_from_hypercube_after_lock(const Hypertable *ht, Hypercube *cube,
int64 range_end =
ts_internal_to_time_int64(cube->slices[0]->fd.range_end, dim->fd.column_type);

int chunk_exists =
callbacks->chunk_insert_check_hook(ht->main_table_relid, range_start, range_end);
int chunk_exists = osm_chunk_insert_hook(ht->main_table_relid, range_start, range_end);

if (chunk_exists)
{
Expand Down Expand Up @@ -3890,6 +3889,7 @@ ts_chunk_do_drop_chunks(Hypertable *ht, int64 older_than, int64 newer_than, int3

{
uint64 num_chunks = 0;
int32 osm_chunk_id = 0;
Chunk *chunks;
const char *schema_name, *table_name;
const int32 hypertable_id = ht->fd.id;
Expand Down Expand Up @@ -4031,6 +4031,29 @@ ts_chunk_do_drop_chunks(Hypertable *ht, int64 older_than, int64 newer_than, int3
data_nodes = list_append_unique_oid(data_nodes, cdn->foreign_server_oid);
}
}
// if we have tiered chunks cascade drop to tiering layer as well
#if PG14_GE
osm_chunk_id = ts_chunk_get_osm_chunk_id(ht->fd.id);

if (osm_chunk_id != INVALID_CHUNK_ID)
{
hypertable_drop_chunks_hook_type osm_drop_chunks_hook =
ts_get_osm_hypertable_drop_chunks_hook();
if (osm_drop_chunks_hook)
{
Dimension *dim = &ht->space->dimensions[0];
/* convert to PG timestamp from timescaledb internal format */
int64 range_start = ts_internal_to_time_int64(newer_than, dim->fd.column_type);
int64 range_end = ts_internal_to_time_int64(older_than, dim->fd.column_type);
Chunk *osm_chunk = ts_chunk_get_by_id(osm_chunk_id, true);
osm_drop_chunks_hook(osm_chunk->table_id,
NameStr(ht->fd.schema_name),
NameStr(ht->fd.table_name),
range_start,
range_end);
}
}
#endif

/* When dropping chunks for a given CAgg then force set the watermark */
if (is_materialization_hypertable)
Expand Down
7 changes: 3 additions & 4 deletions src/hypertable.c
Original file line number Diff line number Diff line change
Expand Up @@ -681,16 +681,15 @@ hypertable_tuple_delete(TupleInfo *ti, void *data)
}

#if PG14_GE
OsmCallbacks *callbacks = ts_get_osm_callbacks();

hypertable_drop_hook_type osm_htdrop_hook = ts_get_osm_hypertable_drop_hook();
/* Invoke the OSM callback if set */
if (callbacks)
if (osm_htdrop_hook)
{
Name schema_name =
DatumGetName(slot_getattr(ti->slot, Anum_hypertable_schema_name, &isnull));
Name table_name = DatumGetName(slot_getattr(ti->slot, Anum_hypertable_table_name, &isnull));

callbacks->hypertable_drop_hook(NameStr(*schema_name), NameStr(*table_name));
osm_htdrop_hook(NameStr(*schema_name), NameStr(*table_name));
}
#endif

Expand Down
64 changes: 61 additions & 3 deletions src/osm_callbacks.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,70 @@

#include <fmgr.h>

#define OSM_CALLBACKS_VAR_NAME "osm_callbacks"
#define OSM_CALLBACKS "osm_callbacks"
#define OSM_CALLBACKS_VAR_NAME "osm_callbacks_versioned"

OsmCallbacks *
static OsmCallbacks_Versioned *
ts_get_osm_callbacks(void)
{
OsmCallbacks **ptr = (OsmCallbacks **) find_rendezvous_variable(OSM_CALLBACKS_VAR_NAME);
OsmCallbacks_Versioned **ptr =
(OsmCallbacks_Versioned **) find_rendezvous_variable(OSM_CALLBACKS_VAR_NAME);

return *ptr;
}

/* This interface and version of the struct will be remove donce we have a new version of OSM on all
* instances
*/
static OsmCallbacks *
ts_get_osm_callbacks_old(void)
{
OsmCallbacks **ptr = (OsmCallbacks **) find_rendezvous_variable(OSM_CALLBACKS);

return *ptr;
}

chunk_insert_check_hook_type
ts_get_osm_chunk_insert_hook()
{
OsmCallbacks_Versioned *ptr = ts_get_osm_callbacks();
if (ptr)
{
if (ptr->version_num == 1)
return ptr->chunk_insert_check_hook;
}
else
{
OsmCallbacks *ptr_old = ts_get_osm_callbacks_old();
if (ptr_old)
return ptr_old->chunk_insert_check_hook;

Check warning on line 46 in src/osm_callbacks.c

View check run for this annotation

Codecov / codecov/patch

src/osm_callbacks.c#L46

Added line #L46 was not covered by tests
}
return NULL;
}

hypertable_drop_hook_type
ts_get_osm_hypertable_drop_hook()
{
OsmCallbacks_Versioned *ptr = ts_get_osm_callbacks();
if (ptr)
{
if (ptr->version_num == 1)
return ptr->hypertable_drop_hook;
}
else
{
OsmCallbacks *ptr_old = ts_get_osm_callbacks_old();
if (ptr_old)
return ptr->hypertable_drop_hook;

Check warning on line 64 in src/osm_callbacks.c

View check run for this annotation

Codecov / codecov/patch

src/osm_callbacks.c#L64

Added line #L64 was not covered by tests
}
return NULL;
}

hypertable_drop_chunks_hook_type
ts_get_osm_hypertable_drop_chunks_hook()
{
OsmCallbacks_Versioned *ptr = ts_get_osm_callbacks();
if (ptr && ptr->version_num == 1)
return ptr->hypertable_drop_chunks_hook;
return NULL;

Check warning on line 75 in src/osm_callbacks.c

View check run for this annotation

Codecov / codecov/patch

src/osm_callbacks.c#L75

Added line #L75 was not covered by tests
}
20 changes: 19 additions & 1 deletion src/osm_callbacks.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,39 @@
#include <postgres.h>
#include <catalog/objectaddress.h>

/* range_start and range_end are in PG internal timestamp format. */
typedef int (*chunk_insert_check_hook_type)(Oid ht_oid, int64 range_start, int64 range_end);
typedef void (*hypertable_drop_hook_type)(const char *schema_name, const char *table_name);
typedef void (*hypertable_drop_chunks_hook_type)(Oid osm_chunk_oid,
const char *hypertable_schema_name,
const char *hypertable_name, int64 range_start,
int64 range_end);

/*
* Object Storage Manager callbacks.
*
* chunk_insert_check_hook - checks whether the specified range is managed by OSM
* hypertable_drop_hook - used for OSM catalog cleanups
*/
/* This struct is retained for backward compatibility. We'll remove this in one
* of the upcoming releases
*/
typedef struct
{
chunk_insert_check_hook_type chunk_insert_check_hook;
hypertable_drop_hook_type hypertable_drop_hook;
} OsmCallbacks;

extern OsmCallbacks *ts_get_osm_callbacks(void);
typedef struct
{
int64 version_num;
chunk_insert_check_hook_type chunk_insert_check_hook;
hypertable_drop_hook_type hypertable_drop_hook;
hypertable_drop_chunks_hook_type hypertable_drop_chunks_hook;
} OsmCallbacks_Versioned;

extern chunk_insert_check_hook_type ts_get_osm_chunk_insert_hook(void);
extern hypertable_drop_hook_type ts_get_osm_hypertable_drop_hook(void);
extern hypertable_drop_chunks_hook_type ts_get_osm_hypertable_drop_chunks_hook(void);

#endif /* TIMESCALEDB_OSM_CALLBACKS_H */
23 changes: 22 additions & 1 deletion tsl/test/expected/chunk_utils_internal.out
Original file line number Diff line number Diff line change
Expand Up @@ -787,7 +787,7 @@ where conrelid = 'child_hyper_constr'::regclass ORDER BY 1;
hyper_constr_temp_check
(1 row)

--TEST policy is not applied on OSM chunk
--TEST retention policy is applied on OSM chunk by calling registered callback
CREATE OR REPLACE FUNCTION dummy_now_smallint() RETURNS BIGINT LANGUAGE SQL IMMUTABLE as 'SELECT 500::bigint' ;
SELECT set_integer_now_func('hyper_constr', 'dummy_now_smallint');
set_integer_now_func
Expand All @@ -796,8 +796,23 @@ SELECT set_integer_now_func('hyper_constr', 'dummy_now_smallint');
(1 row)

SELECT add_retention_policy('hyper_constr', 100::int) AS deljob_id \gset
--add hooks for osm callbacks that are triggered when drop_chunks is invoked---
SELECT ts_setup_osm_hook();
ts_setup_osm_hook
-------------------

(1 row)

SELECT drop_chunks('hyper_constr', 10::int);
NOTICE: hypertable_drop_chunks_hook (-9223372036854775808 10)
drop_chunks
-------------
(0 rows)

CALL run_job(:deljob_id);
NOTICE: hypertable_drop_chunks_hook (-9223372036854775808 400)
CALL run_job(:deljob_id);
NOTICE: hypertable_drop_chunks_hook (-9223372036854775808 400)
SELECT chunk_name, range_start, range_end
FROM chunk_view
WHERE hypertable_name = 'hyper_constr'
Expand All @@ -807,6 +822,12 @@ ORDER BY chunk_name;
child_hyper_constr | Sat Jan 09 20:00:54.775806 294247 PST | infinity
(1 row)

SELECT ts_undo_osm_hook();
ts_undo_osm_hook
------------------

(1 row)

----- TESTS for copy into frozen chunk ------------
\c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER
CREATE TABLE test1.copy_test (
Expand Down
6 changes: 5 additions & 1 deletion tsl/test/sql/chunk_utils_internal.sql
Original file line number Diff line number Diff line change
Expand Up @@ -465,18 +465,22 @@ SELECT * FROM hyper_constr order by time;
SELECT conname FROM pg_constraint
where conrelid = 'child_hyper_constr'::regclass ORDER BY 1;

--TEST policy is not applied on OSM chunk
--TEST retention policy is applied on OSM chunk by calling registered callback
CREATE OR REPLACE FUNCTION dummy_now_smallint() RETURNS BIGINT LANGUAGE SQL IMMUTABLE as 'SELECT 500::bigint' ;

SELECT set_integer_now_func('hyper_constr', 'dummy_now_smallint');
SELECT add_retention_policy('hyper_constr', 100::int) AS deljob_id \gset

--add hooks for osm callbacks that are triggered when drop_chunks is invoked---
SELECT ts_setup_osm_hook();
SELECT drop_chunks('hyper_constr', 10::int);
CALL run_job(:deljob_id);
CALL run_job(:deljob_id);
SELECT chunk_name, range_start, range_end
FROM chunk_view
WHERE hypertable_name = 'hyper_constr'
ORDER BY chunk_name;
SELECT ts_undo_osm_hook();

----- TESTS for copy into frozen chunk ------------
\c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER
Expand Down
22 changes: 18 additions & 4 deletions tsl/test/src/test_chunk_stats.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ ts_test_chunk_stats_insert(PG_FUNCTION_ARGS)

typedef int (*chunk_insert_check_hook_type)(Oid, int64, int64);
typedef void (*hypertable_drop_hook_type)(const char *, const char *);
typedef void (*hypertable_drop_chunks_hook_type)(Oid, const char *, const char *, int64, int64);

static int
osm_insert_hook_mock(Oid ht_oid, int64 range_start, int64 range_end)
Expand All @@ -53,8 +54,19 @@ osm_ht_drop_hook_mock(const char *schema_name, const char *table_name)
elog(NOTICE, "hypertable_drop_hook");
}

OsmCallbacks fake_osm_callbacks = { .chunk_insert_check_hook = osm_insert_hook_mock,
.hypertable_drop_hook = osm_ht_drop_hook_mock };
static void
osm_ht_drop_chunks_hook_mock(Oid osm_chunk_oid, const char *schema_name, const char *table_name,
int64 range_start, int64 range_end)
{
elog(NOTICE, "hypertable_drop_chunks_hook (%ld %ld)", range_start, range_end);
}

OsmCallbacks_Versioned fake_osm_callbacks = {
.version_num = 1,
.chunk_insert_check_hook = osm_insert_hook_mock,
.hypertable_drop_hook = osm_ht_drop_hook_mock,
.hypertable_drop_chunks_hook = osm_ht_drop_chunks_hook_mock,
};

/*
* Dummy function to mock OSM_INSERT hook called at chunk creation for tiered data
Expand All @@ -63,7 +75,8 @@ TS_FUNCTION_INFO_V1(ts_setup_osm_hook);
Datum
ts_setup_osm_hook(PG_FUNCTION_ARGS)
{
OsmCallbacks **ptr = (OsmCallbacks **) find_rendezvous_variable("osm_callbacks");
OsmCallbacks_Versioned **ptr =
(OsmCallbacks_Versioned **) find_rendezvous_variable("osm_callbacks_versioned");
*ptr = &fake_osm_callbacks;

PG_RETURN_NULL();
Expand All @@ -73,7 +86,8 @@ TS_FUNCTION_INFO_V1(ts_undo_osm_hook);
Datum
ts_undo_osm_hook(PG_FUNCTION_ARGS)
{
OsmCallbacks **ptr = (OsmCallbacks **) find_rendezvous_variable("osm_callbacks");
OsmCallbacks_Versioned **ptr =
(OsmCallbacks_Versioned **) find_rendezvous_variable("osm_callbacks_versioned");
*ptr = NULL;

PG_RETURN_NULL();
Expand Down

0 comments on commit 79022f0

Please sign in to comment.