Skip to content

Commit

Permalink
Fix concurrent locking with chunk_data_node table
Browse files Browse the repository at this point in the history
Concurrent insert into dist hypertable after a data node marked as
unavailable would produce 'tuple concurrently deleted` error.

The problem occurs because of missing tuple level locking during
scan and concurrent delete from chunk_data_node table afterwards,
which should be treated as `SELECT … FOR UPDATE` case instead.

Based on the fix by @erimatnor.

Fix timescale#5153
  • Loading branch information
pmwkaa committed Mar 6, 2023
1 parent 4c00750 commit 830c37b
Show file tree
Hide file tree
Showing 11 changed files with 237 additions and 159 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ accidentally triggering the load of a previous DB version.**
* #5317 Fix some incorrect memory handling
* #5367 Rename columns in old-style continuous aggregates
* #5384 Fix Hierarchical Continuous Aggregates chunk_interval_size
* #5153 Fix concurrent locking with chunk_data_node table

**Thanks**
* @Medvecrab for discovering an issue with copying NameData when forming heap tuples.
Expand Down
71 changes: 55 additions & 16 deletions src/ts_catalog/chunk_data_node.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* LICENSE-APACHE for a copy of the license.
*/
#include <postgres.h>
#include <access/tableam.h>
#include <catalog/pg_foreign_table.h>
#include <catalog/pg_foreign_server.h>
#include <catalog/dependency.h>
Expand All @@ -19,6 +20,7 @@
#include "hypertable_cache.h"
#include "scanner.h"
#include "chunk.h"
#include "debug_point.h"

static void
chunk_data_node_insert_relation(const Relation rel, int32 chunk_id, int32 node_chunk_id,
Expand Down Expand Up @@ -83,7 +85,7 @@ ts_chunk_data_node_insert_multi(List *chunk_data_nodes)
static int
chunk_data_node_scan_limit_internal(ScanKeyData *scankey, int num_scankeys, int indexid,
tuple_found_func on_tuple_found, void *scandata, int limit,
LOCKMODE lock, MemoryContext mctx)
LOCKMODE lock, ScanTupLock *tuplock, MemoryContext mctx)
{
Catalog *catalog = ts_catalog_get();
ScannerCtx scanctx = {
Expand All @@ -94,6 +96,7 @@ chunk_data_node_scan_limit_internal(ScanKeyData *scankey, int num_scankeys, int
.data = scandata,
.limit = limit,
.tuple_found = on_tuple_found,
.tuplock = tuplock,
.lockmode = lock,
.scandirection = ForwardScanDirection,
.result_mctx = mctx,
Expand Down Expand Up @@ -162,7 +165,8 @@ static int
ts_chunk_data_node_scan_by_chunk_id_and_node_internal(int32 chunk_id, const char *node_name,
bool scan_by_remote_chunk_id,
tuple_found_func tuple_found, void *data,
LOCKMODE lockmode, MemoryContext mctx)
LOCKMODE lockmode, ScanTupLock *tuplock,
MemoryContext mctx)
{
ScanKeyData scankey[2];
int nkeys = 0;
Expand Down Expand Up @@ -203,12 +207,14 @@ ts_chunk_data_node_scan_by_chunk_id_and_node_internal(int32 chunk_id, const char
data,
0,
lockmode,
tuplock,
mctx);
}

static int
ts_chunk_data_node_scan_by_node_internal(const char *node_name, tuple_found_func tuple_found,
void *data, LOCKMODE lockmode, MemoryContext mctx)
void *data, LOCKMODE lockmode, ScanTupLock *tuplock,
MemoryContext mctx)
{
ScanKeyData scankey[1];

Expand All @@ -225,6 +231,7 @@ ts_chunk_data_node_scan_by_node_internal(const char *node_name, tuple_found_func
data,
0,
lockmode,
tuplock,
mctx);
}

Expand All @@ -233,13 +240,13 @@ List *
ts_chunk_data_node_scan_by_chunk_id(int32 chunk_id, MemoryContext mctx)
{
List *chunk_data_nodes = NIL;

ts_chunk_data_node_scan_by_chunk_id_and_node_internal(chunk_id,
NULL,
false,
chunk_data_node_tuple_found,
&chunk_data_nodes,
AccessShareLock,
NULL,
mctx);
return chunk_data_nodes;
}
Expand All @@ -249,13 +256,13 @@ List *
ts_chunk_data_node_scan_by_chunk_id_filter(int32 chunk_id, MemoryContext mctx)
{
List *chunk_data_nodes = NIL;

ts_chunk_data_node_scan_by_chunk_id_and_node_internal(chunk_id,
NULL,
false,
chunk_data_node_tuple_found_filter,
&chunk_data_nodes,
AccessShareLock,
NULL,
mctx);
return chunk_data_nodes;
}
Expand All @@ -266,13 +273,13 @@ chunk_data_node_scan_by_chunk_id_and_node_name(int32 chunk_id, const char *node_

{
List *chunk_data_nodes = NIL;

ts_chunk_data_node_scan_by_chunk_id_and_node_internal(chunk_id,
node_name,
scan_by_remote_chunk_id,
chunk_data_node_tuple_found,
&chunk_data_nodes,
AccessShareLock,
NULL,
mctx);
Assert(list_length(chunk_data_nodes) <= 1);

Expand Down Expand Up @@ -302,44 +309,76 @@ chunk_data_node_tuple_delete(TupleInfo *ti, void *data)
{
CatalogSecurityContext sec_ctx;

ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
ts_catalog_delete_tid(ti->scanrel, ts_scanner_get_tuple_tid(ti));
ts_catalog_restore_user(&sec_ctx);
switch (ti->lockresult)
{
case TM_Ok:
ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
ts_catalog_delete_tid(ti->scanrel, ts_scanner_get_tuple_tid(ti));
ts_catalog_restore_user(&sec_ctx);
break;
case TM_Deleted:
/* Already deleted, do nothing. */
break;
default:
Assert(false);
break;
}

return SCAN_CONTINUE;
}

int
ts_chunk_data_node_delete_by_chunk_id(int32 chunk_id)
{
ScanTupLock tuplock = {
.lockmode = LockTupleExclusive,
.waitpolicy = LockWaitBlock,
};

return ts_chunk_data_node_scan_by_chunk_id_and_node_internal(chunk_id,
NULL,
false,
chunk_data_node_tuple_delete,
NULL,
RowExclusiveLock,
&tuplock,
CurrentMemoryContext);
}

int
ts_chunk_data_node_delete_by_chunk_id_and_node_name(int32 chunk_id, const char *node_name)
{
return ts_chunk_data_node_scan_by_chunk_id_and_node_internal(chunk_id,
node_name,
false,
chunk_data_node_tuple_delete,
NULL,
RowExclusiveLock,
CurrentMemoryContext);
int count;

ScanTupLock tuplock = {
.lockmode = LockTupleExclusive,
.waitpolicy = LockWaitBlock,
};

count = ts_chunk_data_node_scan_by_chunk_id_and_node_internal(chunk_id,
node_name,
false,
chunk_data_node_tuple_delete,
NULL,
RowExclusiveLock,
&tuplock,
CurrentMemoryContext);
DEBUG_WAITPOINT("chunk_data_node_delete");
return count;
}

int
ts_chunk_data_node_delete_by_node_name(const char *node_name)
{
ScanTupLock tuplock = {
.lockmode = LockTupleExclusive,
.waitpolicy = LockWaitBlock,
};
return ts_chunk_data_node_scan_by_node_internal(node_name,
chunk_data_node_tuple_delete,
NULL,
RowExclusiveLock,
&tuplock,
CurrentMemoryContext);
}

Expand Down
8 changes: 8 additions & 0 deletions tsl/src/chunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,7 @@ void
chunk_update_stale_metadata(Chunk *new_chunk, List *chunk_data_nodes)
{
List *serveroids = NIL, *removeoids = NIL;
bool locked = false;
ChunkDataNode *cdn;
ListCell *lc;

Expand Down Expand Up @@ -807,6 +808,13 @@ chunk_update_stale_metadata(Chunk *new_chunk, List *chunk_data_nodes)
*/
if (!list_member_oid(serveroids, cdn->foreign_server_oid))
{
if (!locked)
{
LockRelationOid(ts_catalog_get()->tables[CHUNK_DATA_NODE].id,
ShareUpdateExclusiveLock);
locked = true;
}

chunk_update_foreign_server_if_needed(new_chunk, cdn->foreign_server_oid, false);
ts_chunk_data_node_delete_by_chunk_id_and_node_name(cdn->fd.chunk_id,
NameStr(cdn->fd.node_name));
Expand Down
3 changes: 3 additions & 0 deletions tsl/src/chunk_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
#include <fmgr.h>
#include <funcapi.h>
#include <miscadmin.h>
#include <storage/lmgr.h>
#include <storage/lockdefs.h>
#include <utils/array.h>
#include <utils/builtins.h>
#include <utils/jsonb.h>
Expand Down Expand Up @@ -1775,6 +1777,7 @@ chunk_api_call_chunk_drop_replica(const Chunk *chunk, const char *node_name, Oid
* This chunk might have this data node as primary, change that association
* if so. Then delete the chunk_id and node_name association.
*/
LockRelationOid(chunk->table_id, ShareUpdateExclusiveLock);
chunk_update_foreign_server_if_needed(chunk, serverid, false);
ts_chunk_data_node_delete_by_chunk_id_and_node_name(chunk->fd.id, node_name);
}
4 changes: 4 additions & 0 deletions tsl/src/data_node.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
#include <nodes/parsenodes.h>
#include <nodes/nodes.h>
#include <nodes/value.h>
#include <storage/lockdefs.h>
#include <storage/lmgr.h>
#include <utils/acl.h>
#include <utils/builtins.h>
#include <utils/array.h>
Expand Down Expand Up @@ -1157,6 +1159,8 @@ data_node_modify_hypertable_data_nodes(const char *node_name, List *hypertable_d
{
ChunkDataNode *cdn = lfirst(cs_lc);
const Chunk *chunk = ts_chunk_get_by_id(cdn->fd.chunk_id, true);
LockRelationOid(chunk->table_id, ShareUpdateExclusiveLock);

chunk_update_foreign_server_if_needed(chunk, cdn->foreign_server_oid, false);
ts_chunk_data_node_delete_by_chunk_id_and_node_name(cdn->fd.chunk_id,
NameStr(cdn->fd.node_name));
Expand Down
10 changes: 10 additions & 0 deletions tsl/src/fdw/modify_exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
#include <nodes/plannodes.h>
#include <commands/explain.h>
#include <foreign/fdwapi.h>
#include <storage/lmgr.h>
#include <storage/lockdefs.h>
#include <utils/rel.h>
#include <fmgr.h>
#include <miscadmin.h>
Expand Down Expand Up @@ -452,6 +454,8 @@ fdw_chunk_update_stale_metadata(TsFdwModifyState *fmstate)
/* get filtered list */
List *serveroids = get_chunk_data_nodes(rel->rd_id);
ListCell *lc;
bool chunk_is_locked = false;

Assert(list_length(serveroids) == fmstate->num_data_nodes);

all_data_nodes = ts_chunk_data_node_scan_by_chunk_id(chunk->fd.id, CurrentMemoryContext);
Expand All @@ -471,6 +475,12 @@ fdw_chunk_update_stale_metadata(TsFdwModifyState *fmstate)
if (!list_member_oid(serveroids, cdn->foreign_server_oid) &&
!list_member_oid(fmstate->stale_data_nodes, cdn->foreign_server_oid))
{
if (!chunk_is_locked)
{
LockRelationOid(chunk->table_id, ShareUpdateExclusiveLock);
chunk_is_locked = true;
}

chunk_update_foreign_server_if_needed(chunk, cdn->foreign_server_oid, false);
ts_chunk_data_node_delete_by_chunk_id_and_node_name(cdn->fd.chunk_id,
NameStr(cdn->fd.node_name));
Expand Down
57 changes: 57 additions & 0 deletions tsl/test/isolation/expected/dist_ha_chunk_drop.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
Parsed test spec with 3 sessions

starting permutation: s1_init s1_set_unavailable s3_lock_enable s1_insert s2_insert s3_lock_release s1_set_available
node_name
-----------
data_node_1
(1 row)

node_name
-----------
data_node_2
(1 row)

node_name
-----------
data_node_3
(1 row)

node_name
-----------
data_node_4
(1 row)

created
-------
t
(1 row)

step s1_init: INSERT INTO metric1(ts, val, dev_id) SELECT s.*, 3.14, d.* FROM generate_series('2021-08-17 00:00:00'::timestamp, '2021-08-17 00:00:59'::timestamp, '1 s'::interval) s CROSS JOIN generate_series(1, 500) d;
step s1_set_unavailable: SELECT alter_data_node('data_node_4', available=>false);
alter_data_node
--------------------------------------
(data_node_4,localhost,55432,cdha_4,f)
(1 row)

step s3_lock_enable: SELECT debug_waitpoint_enable('chunk_data_node_delete');
debug_waitpoint_enable
----------------------

(1 row)

step s1_insert: INSERT INTO metric1(ts, val, dev_id) SELECT s.*, 3.14, d.* FROM generate_series('2021-08-17 00:01:00'::timestamp, '2021-08-17 00:01:59'::timestamp, '1 s'::interval) s CROSS JOIN generate_series(1, 249) d; <waiting ...>
step s2_insert: INSERT INTO metric1(ts, val, dev_id) SELECT s.*, 3.14, d.* FROM generate_series('2021-08-17 00:01:00'::timestamp, '2021-08-17 00:01:59'::timestamp, '1 s'::interval) s CROSS JOIN generate_series(250, 499) d; <waiting ...>
step s3_lock_release: SELECT debug_waitpoint_release('chunk_data_node_delete');
debug_waitpoint_release
-----------------------

(1 row)

step s1_insert: <... completed>
step s2_insert: <... completed>
step s1_set_available: SELECT alter_data_node('data_node_4', available=>true);
alter_data_node
--------------------------------------
(data_node_4,localhost,55432,cdha_4,t)
(1 row)

Loading

0 comments on commit 830c37b

Please sign in to comment.