Skip to content

Commit

Permalink
Fix race condition when creating CAgg
Browse files Browse the repository at this point in the history
Creating one or more Continuous Aggregates over the same hypertable
concurrently in different sessions lead to a race condition on checking
for the existance of the trigger `ts_cagg_invalidation_trigger`.

Fixed it by using the `OR REPLACE` option for the `CREATE TRIGGER`
statement introduced in PG14 and for prior versions taking an
`ShareUpdateExclusiveLock` and holding it until the end of the
transaction when checking the trigger existance in the
`check_trigger_exists_hypertable` function.
  • Loading branch information
fabriziomello committed Sep 15, 2023
1 parent c802c31 commit 0a4b43c
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 12 deletions.
4 changes: 4 additions & 0 deletions src/trigger.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ ts_trigger_create_on_chunk(Oid trigger_oid, const char *chunk_schema_name,
Assert(IsA(stmt, CreateTrigStmt));
stmt->relation->relname = (char *) chunk_table_name;
stmt->relation->schemaname = (char *) chunk_schema_name;
#if PG14_GE
/* Using OR REPLACE option introduced on Postgres 14 */
stmt->replace = true;
#endif

CreateTrigger(stmt,
def,
Expand Down
17 changes: 15 additions & 2 deletions tsl/src/continuous_aggs/create.c
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,9 @@ static void create_bucket_function_catalog_entry(int32 matht_id, bool experiment
const char *origin, const char *timezone);
static void cagg_create_hypertable(int32 hypertable_id, Oid mat_tbloid, const char *matpartcolname,
int64 mat_tbltimecol_interval);
#if PG14_LT
static bool check_trigger_exists_hypertable(Oid relid, char *trigname);
#endif
static void cagg_add_trigger_hypertable(Oid relid, int32 hypertable_id);
static void mattablecolumninfo_add_mattable_index(MatTableColumnInfo *matcolinfo, Hypertable *ht);
static int32 mattablecolumninfo_create_materialization_table(
Expand Down Expand Up @@ -272,6 +274,7 @@ cagg_create_hypertable(int32 hypertable_id, Oid mat_tbloid, const char *matpartc
errmsg("could not create materialization hypertable")));
}

#if PG14_LT
static bool
check_trigger_exists_hypertable(Oid relid, char *trigname)
{
Expand All @@ -281,7 +284,7 @@ check_trigger_exists_hypertable(Oid relid, char *trigname)
HeapTuple tuple;
bool trg_found = false;

tgrel = table_open(TriggerRelationId, AccessShareLock);
tgrel = table_open(TriggerRelationId, ShareUpdateExclusiveLock);
ScanKeyInit(&skey[0],
Anum_pg_trigger_tgrelid,
BTEqualStrategyNumber,
Expand All @@ -300,9 +303,10 @@ check_trigger_exists_hypertable(Oid relid, char *trigname)
}
}
systable_endscan(tgscan);
table_close(tgrel, AccessShareLock);
table_close(tgrel, NoLock);
return trg_found;
}
#endif

/*
* Add continuous agg invalidation trigger to hypertable
Expand All @@ -324,6 +328,10 @@ cagg_add_trigger_hypertable(Oid relid, int32 hypertable_id)
CreateTrigStmt stmt_template = {
.type = T_CreateTrigStmt,
.row = true,
#if PG14_GE
/* Using OR REPLACE option introduced on Postgres 14 */
.replace = true,
#endif
.timing = TRIGGER_TYPE_AFTER,
.trigname = CAGGINVAL_TRIGGER_NAME,
.relation = makeRangeVar(schema, relname, -1),
Expand All @@ -332,8 +340,13 @@ cagg_add_trigger_hypertable(Oid relid, int32 hypertable_id)
.args = NIL, /* to be filled in later */
.events = TRIGGER_TYPE_INSERT | TRIGGER_TYPE_UPDATE | TRIGGER_TYPE_DELETE,
};

#if PG14_LT
/* OR REPLACE was introduced in Postgres 14 so this check make no sense */
if (check_trigger_exists_hypertable(relid, CAGGINVAL_TRIGGER_NAME))
return;
#endif

ht = ts_hypertable_cache_get_cache_and_entry(relid, CACHE_FLAG_NONE, &hcache);
if (hypertable_is_distributed(ht))
{
Expand Down
27 changes: 18 additions & 9 deletions tsl/src/deparse.c
Original file line number Diff line number Diff line change
Expand Up @@ -1029,20 +1029,29 @@ deparse_create_trigger(CreateTrigStmt *stmt)
bool found_first_arg = false;

/*
CREATE [ CONSTRAINT ] TRIGGER name { BEFORE | AFTER | INSTEAD OF } { event [ OR ... ] }
ON table_name
[ FROM referenced_table_name ]
[ NOT DEFERRABLE | [ DEFERRABLE ] [ INITIALLY IMMEDIATE | INITIALLY DEFERRED ] ]
[ REFERENCING { { OLD | NEW } TABLE [ AS ] transition_relation_name } [ ... ] ]
[ FOR [ EACH ] { ROW | STATEMENT } ]
[ WHEN ( condition ) ]
EXECUTE { FUNCTION | PROCEDURE } function_name ( arguments )
*/
* CREATE [ OR REPLACE ] [ CONSTRAINT ] TRIGGER name
* { BEFORE | AFTER | INSTEAD OF } { event [ OR ... ] }
* ON table_name
* [ FROM referenced_table_name ]
* [ NOT DEFERRABLE | [ DEFERRABLE ] [ INITIALLY IMMEDIATE | INITIALLY DEFERRED ] ]
* [ REFERENCING { { OLD | NEW } TABLE [ AS ] transition_relation_name } [ ... ] ]
* [ FOR [ EACH ] { ROW | STATEMENT } ]
* [ WHEN ( condition ) ]
* EXECUTE { FUNCTION | PROCEDURE } function_name ( arguments )
*/
if (stmt->isconstraint)
elog(ERROR, "deparsing constraint triggers is not supported");

StringInfo command = makeStringInfo();
#if PG14_LT
appendStringInfo(command, "CREATE TRIGGER %s ", quote_identifier(stmt->trigname));
#else
/* Postgres 14 introduced OR REPLACE option */
appendStringInfo(command,
"CREATE %sTRIGGER %s ",
stmt->replace ? "OR REPLACE " : "",
quote_identifier(stmt->trigname));
#endif

if (TRIGGER_FOR_BEFORE(stmt->timing))
appendStringInfoString(command, "BEFORE");
Expand Down
25 changes: 24 additions & 1 deletion tsl/test/isolation/expected/cagg_insert.out
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Parsed test spec with 15 sessions
Parsed test spec with 17 sessions

starting permutation: LockInvalThrEx Refresh Refresh2 Refresh3 UnlockInvalThrEx
step LockInvalThrEx: BEGIN; LOCK TABLE _timescaledb_catalog.continuous_aggs_invalidation_threshold ;
Expand Down Expand Up @@ -237,3 +237,26 @@ step Refresh2: CALL refresh_continuous_aggregate('continuous_view_1', NULL, 15);
step Refresh3: CALL refresh_continuous_aggregate('continuous_view_2', NULL, 15);
step I2c: COMMIT;
step Refresh3: CALL refresh_continuous_aggregate('continuous_view_2', NULL, 15);

starting permutation: CreateMatView1_Begin CreateMatView2_Begin CreateMatView1_Commit CreateMatView2_Commit
step CreateMatView1_Begin:
BEGIN;
CREATE MATERIALIZED VIEW cagg1
WITH (timescaledb.continuous, timescaledb.materialized_only=true) AS
SELECT time_bucket('5', time), COUNT(location)
FROM ts_continuous_test_1
GROUP BY 1
WITH NO DATA;

step CreateMatView2_Begin:
BEGIN;
CREATE MATERIALIZED VIEW cagg2
WITH (timescaledb.continuous, timescaledb.materialized_only=true) AS
SELECT time_bucket('5', time), COUNT(location)
FROM ts_continuous_test_1
GROUP BY 1
WITH NO DATA;
<waiting ...>
step CreateMatView1_Commit: COMMIT;
step CreateMatView2_Begin: <... completed>
step CreateMatView2_Commit: COMMIT;
28 changes: 28 additions & 0 deletions tsl/test/isolation/specs/cagg_insert.spec
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,31 @@ session "LM"
step "LockMatInval" { BEGIN; LOCK TABLE _timescaledb_catalog.continuous_aggs_materialization_invalidation_log; }
step "UnlockMatInval" { ROLLBACK; }

# check for race condition creating triggers on the original hypertable
session "CM1"
step "CreateMatView1_Begin" {
BEGIN;
CREATE MATERIALIZED VIEW cagg1
WITH (timescaledb.continuous, timescaledb.materialized_only=true) AS
SELECT time_bucket('5', time), COUNT(location)
FROM ts_continuous_test_1
GROUP BY 1
WITH NO DATA;
}
step "CreateMatView1_Commit" { COMMIT; }

session "CM2"
step "CreateMatView2_Begin" {
BEGIN;
CREATE MATERIALIZED VIEW cagg2
WITH (timescaledb.continuous, timescaledb.materialized_only=true) AS
SELECT time_bucket('5', time), COUNT(location)
FROM ts_continuous_test_1
GROUP BY 1
WITH NO DATA;
}
step "CreateMatView2_Commit" { COMMIT; }

#only one refresh
permutation "LockInvalThrEx" "Refresh" "Refresh2" (Refresh) "Refresh3" (Refresh, Refresh2) "UnlockInvalThrEx"

Expand Down Expand Up @@ -161,3 +186,6 @@ permutation "I1" "Refresh" "LockInval" "Sb" "S1" "Refresh" "Sc" "UnlockInval"

permutation "I1" "I21" "Refresh1" "Refresh2" "Refresh3"
permutation "I1" "I2b" "I21" "Refresh2" "Refresh3" "I2c" "Refresh3"

# check for race condition creating triggers on the original hypertable by concurrent create matviews
permutation "CreateMatView1_Begin" "CreateMatView2_Begin" "CreateMatView1_Commit" "CreateMatView2_Commit"

0 comments on commit 0a4b43c

Please sign in to comment.