Skip to content

Commit

Permalink
Fix errors and add isolation tests for OSM API
Browse files Browse the repository at this point in the history
This commit fixes two issues with the osm range update API:

1. Deadlock between two concurrent range updates.
Two transactions concurrently attempting to update the range
of the OSM chunk by calling hypertable_osm_range_update would
deadlock because the dimension slice tuple was first locked with
a FOR KEY SHARE lock, then a FOR UPDATE lock was requested before
proceeding with the dimension slice tuple udpate.
This commit fixes the deadlock by taking FOR UPDATE lock on the
tuple from the start, before proceeding to update it.

2. Tuple concurrently updated error for hypertable tuple.
When one session tries to update the range of the OSM chunk and another
enables compression on the hypertable, the update failed with tuple
concurrently updated error. This commit fixes this by first locking the
hypertable tuple with a FOR UPDATE lock before proceeding to UPDATE it.

Isolation tests for OSM range API are also added.
  • Loading branch information
konskov committed Oct 5, 2023
1 parent 0d6f5f2 commit 7a5cecf
Show file tree
Hide file tree
Showing 7 changed files with 395 additions and 16 deletions.
3 changes: 2 additions & 1 deletion src/chunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -2988,7 +2988,8 @@ chunk_tuple_delete(TupleInfo *ti, DropBehavior behavior, bool preserve_chunk_cat
DimensionSlice *slice =
ts_dimension_slice_scan_by_id_and_lock(cc->fd.dimension_slice_id,
&tuplock,
CurrentMemoryContext);
CurrentMemoryContext,
AccessShareLock);
/* If the slice is not found in the scan above, the table is
* broken so we do not delete the slice. We proceed
* anyway since users need to be able to drop broken tables or
Expand Down
4 changes: 2 additions & 2 deletions src/dimension_slice.c
Original file line number Diff line number Diff line change
Expand Up @@ -758,7 +758,7 @@ dimension_slice_tuple_found(TupleInfo *ti, void *data)
* it to not change nor disappear. */
DimensionSlice *
ts_dimension_slice_scan_by_id_and_lock(int32 dimension_slice_id, const ScanTupLock *tuplock,
MemoryContext mctx)
MemoryContext mctx, LOCKMODE lockmode)
{
DimensionSlice *slice = NULL;
ScanKeyData scankey[1];
Expand All @@ -775,7 +775,7 @@ ts_dimension_slice_scan_by_id_and_lock(int32 dimension_slice_id, const ScanTupLo
dimension_slice_tuple_found,
&slice,
1,
AccessShareLock,
lockmode,
tuplock,
mctx);

Expand Down
3 changes: 2 additions & 1 deletion src/dimension_slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ extern bool ts_dimension_slice_scan_for_existing(const DimensionSlice *slice,
const ScanTupLock *tuplock);
extern DimensionSlice *ts_dimension_slice_scan_by_id_and_lock(int32 dimension_slice_id,
const ScanTupLock *tuplock,
MemoryContext mctx);
MemoryContext mctx,
LOCKMODE lockmode);
extern DimensionVec *ts_dimension_slice_scan_by_dimension(int32 dimension_id, int limit);
extern DimensionVec *ts_dimension_slice_scan_by_dimension_before_point(int32 dimension_id,
int64 point, int limit,
Expand Down
89 changes: 78 additions & 11 deletions src/hypertable.c
Original file line number Diff line number Diff line change
Expand Up @@ -3142,6 +3142,78 @@ ts_hypertable_update_dimension_partitions(const Hypertable *ht)
return false;
}

/*
* hypertable status update is done in two steps, similar to
* chunk_update_status
* This is again equivalent to a SELECT FOR UPDATE, followed by UPDATE
* 1. RowShareLock to SELECT for UPDATE
* 2. UPDATE status using RowExclusiveLock
*/
static bool
hypertable_update_status_osm(Hypertable *ht)
{
bool success = false;
ScanTupLock scantuplock = {
.waitpolicy = LockWaitBlock,
.lockmode = LockTupleExclusive,
};
ScanIterator iterator = ts_scan_iterator_create(HYPERTABLE, RowShareLock, CurrentMemoryContext);
iterator.ctx.index = catalog_get_index(ts_catalog_get(), HYPERTABLE, HYPERTABLE_ID_INDEX);
iterator.ctx.tuplock = &scantuplock;
ts_scan_iterator_scan_key_init(&iterator,
Anum_hypertable_pkey_idx_id,
BTEqualStrategyNumber,
F_INT4EQ,
Int32GetDatum(ht->fd.id));

ts_scanner_foreach(&iterator)
{
TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator);
int status;
bool status_isnull;

status = DatumGetInt32(slot_getattr(ti->slot, Anum_hypertable_status, &status_isnull));
Assert(!status_isnull);
if (status != ht->fd.status)
{
success = ts_hypertable_update(ht); // get RowExclusiveLock and update here
}
}
ts_scan_iterator_close(&iterator);
return success;
}

static DimensionSlice *
ts_chunk_get_osm_slice_and_lock(int32 osm_chunk_id, int32 time_dim_id)
{
ChunkConstraints *constraints =
ts_chunk_constraint_scan_by_chunk_id(osm_chunk_id, 1, CurrentMemoryContext);

for (int i = 0; i < constraints->num_constraints; i++)
{
ChunkConstraint *cc = chunk_constraints_get(constraints, i);
if (is_dimension_constraint(cc))
{
ScanTupLock tuplock = {
.lockmode = LockTupleExclusive,
.waitpolicy = LockWaitBlock,
};
if (!IsolationUsesXactSnapshot())
{
/* in read committed mode, we follow all updates to this tuple */
tuplock.lockflags |= TUPLE_LOCK_FLAG_FIND_LAST_VERSION;
}
DimensionSlice *dimslice =
ts_dimension_slice_scan_by_id_and_lock(cc->fd.dimension_slice_id,
&tuplock,
CurrentMemoryContext,
RowShareLock);
if (dimslice->fd.dimension_id == time_dim_id)
return dimslice;
}
}
return NULL;
}
/*
* hypertable_osm_range_update
* 0 hypertable REGCLASS,
Expand Down Expand Up @@ -3183,9 +3255,6 @@ ts_hypertable_osm_range_update(PG_FUNCTION_ARGS)
"no OSM chunk found for hypertable %s.%s",
quote_identifier(NameStr(ht->fd.schema_name)),
quote_identifier(NameStr(ht->fd.table_name)));

int32 dimension_slice_id = ts_chunk_get_osm_slice_id(osm_chunk_id, time_dim->fd.id);

/*
* range_start, range_end arguments must be converted to internal representation
* a NULL start value is interpreted as INT64_MAX - 1 and a NULL end value is
Expand Down Expand Up @@ -3227,15 +3296,12 @@ ts_hypertable_osm_range_update(PG_FUNCTION_ARGS)

bool overlap = false, range_invalid = false;

ScanTupLock tuplock = {
.lockmode = LockTupleExclusive,
.waitpolicy = LockWaitBlock,
};
DimensionSlice *slice =
ts_dimension_slice_scan_by_id_and_lock(dimension_slice_id, &tuplock, CurrentMemoryContext);
DimensionSlice *slice = ts_chunk_get_osm_slice_and_lock(osm_chunk_id, time_dim->fd.id);

if (!slice)
ereport(ERROR, errmsg("could not find slice with id %d", dimension_slice_id));
ereport(ERROR, errmsg("could not find time dimension slice for chunk %d", osm_chunk_id));

int32 dimension_slice_id = slice->fd.id;
overlap = ts_osm_chunk_range_overlaps(dimension_slice_id,
slice->fd.dimension_id,
range_start_internal,
Expand Down Expand Up @@ -3269,7 +3335,8 @@ ts_hypertable_osm_range_update(PG_FUNCTION_ARGS)
}
else
ht->fd.status = ts_clear_flags_32(ht->fd.status, HYPERTABLE_STATUS_OSM_CHUNK_NONCONTIGUOUS);
ts_hypertable_update(ht);

hypertable_update_status_osm(ht);
ts_cache_release(hcache);

slice->fd.range_start = range_start_internal;
Expand Down
203 changes: 203 additions & 0 deletions tsl/test/isolation/expected/osm_range_updates_iso.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
Parsed test spec with 8 sessions

starting permutation: LockDimSliceTuple UR1b UR1u UR2b UR2u UnlockDimSliceTuple UR1c UR2c
step LockDimSliceTuple:
BEGIN;
SELECT range_start, range_end FROM _timescaledb_catalog.dimension_slice
WHERE id IN ( SELECT ds.id FROM
_timescaledb_catalog.chunk ch, _timescaledb_catalog.chunk_constraint cc,
_timescaledb_catalog.dimension_slice ds, _timescaledb_catalog.hypertable ht
WHERE ht.table_name like 'osm_test' AND cc.chunk_id = ch.id AND ht.id = ch.hypertable_id
AND ds.id = cc.dimension_slice_id AND ch.osm_chunk = true
) FOR UPDATE;

range_start| range_end
-------------------+-------------------
9223372036854775806|9223372036854775807
(1 row)

step UR1b: BEGIN;
step UR1u: SELECT _timescaledb_functions.hypertable_osm_range_update('osm_test', 0, 10); <waiting ...>
step UR2b: BEGIN;
step UR2u: SELECT _timescaledb_functions.hypertable_osm_range_update('osm_test', 0, 10); <waiting ...>
step UnlockDimSliceTuple: ROLLBACK;
step UR1u: <... completed>
hypertable_osm_range_update
---------------------------
f
(1 row)

step UR1c: COMMIT;
step UR2u: <... completed>
hypertable_osm_range_update
---------------------------
f
(1 row)

step UR2c: COMMIT;

starting permutation: LockDimSliceTuple DTb UR1b DropOsmChunk UR1u UnlockDimSliceTuple DTc UR1c
step LockDimSliceTuple:
BEGIN;
SELECT range_start, range_end FROM _timescaledb_catalog.dimension_slice
WHERE id IN ( SELECT ds.id FROM
_timescaledb_catalog.chunk ch, _timescaledb_catalog.chunk_constraint cc,
_timescaledb_catalog.dimension_slice ds, _timescaledb_catalog.hypertable ht
WHERE ht.table_name like 'osm_test' AND cc.chunk_id = ch.id AND ht.id = ch.hypertable_id
AND ds.id = cc.dimension_slice_id AND ch.osm_chunk = true
) FOR UPDATE;

range_start| range_end
-------------------+-------------------
9223372036854775806|9223372036854775807
(1 row)

step DTb: BEGIN;
step UR1b: BEGIN;
step DropOsmChunk:
SELECT _timescaledb_functions.drop_chunk(chunk_table::regclass)
FROM (
SELECT format('%I.%I', c.schema_name, c.table_name) as chunk_table
FROM _timescaledb_catalog.chunk c, _timescaledb_catalog.hypertable ht
WHERE ht.id = c.hypertable_id AND ht.table_name = 'osm_test'
) sq;
<waiting ...>
step UR1u: SELECT _timescaledb_functions.hypertable_osm_range_update('osm_test', 0, 10); <waiting ...>
step UnlockDimSliceTuple: ROLLBACK;
step DropOsmChunk: <... completed>
drop_chunk
----------
t
(1 row)

step DTc: COMMIT;
step UR1u: <... completed>
ERROR: chunk deleted by other transaction
step UR1c: COMMIT;

starting permutation: LockDimSliceTuple DTb UR1b UR1u DropOsmChunk UnlockDimSliceTuple UR1c DTc
step LockDimSliceTuple:
BEGIN;
SELECT range_start, range_end FROM _timescaledb_catalog.dimension_slice
WHERE id IN ( SELECT ds.id FROM
_timescaledb_catalog.chunk ch, _timescaledb_catalog.chunk_constraint cc,
_timescaledb_catalog.dimension_slice ds, _timescaledb_catalog.hypertable ht
WHERE ht.table_name like 'osm_test' AND cc.chunk_id = ch.id AND ht.id = ch.hypertable_id
AND ds.id = cc.dimension_slice_id AND ch.osm_chunk = true
) FOR UPDATE;

range_start| range_end
-------------------+-------------------
9223372036854775806|9223372036854775807
(1 row)

step DTb: BEGIN;
step UR1b: BEGIN;
step UR1u: SELECT _timescaledb_functions.hypertable_osm_range_update('osm_test', 0, 10); <waiting ...>
step DropOsmChunk:
SELECT _timescaledb_functions.drop_chunk(chunk_table::regclass)
FROM (
SELECT format('%I.%I', c.schema_name, c.table_name) as chunk_table
FROM _timescaledb_catalog.chunk c, _timescaledb_catalog.hypertable ht
WHERE ht.id = c.hypertable_id AND ht.table_name = 'osm_test'
) sq;
<waiting ...>
step UnlockDimSliceTuple: ROLLBACK;
step UR1u: <... completed>
hypertable_osm_range_update
---------------------------
f
(1 row)

step UR1c: COMMIT;
step DropOsmChunk: <... completed>
drop_chunk
----------
t
(1 row)

step DTc: COMMIT;

starting permutation: LHTb LockHypertableTuple Cb UR1b Cenable UR1u UnlockHypertableTuple Ccommit UR1c
step LHTb: BEGIN;
step LockHypertableTuple:
SELECT table_name, compression_state, compressed_hypertable_id, status
FROM _timescaledb_catalog.hypertable WHERE table_name = 'osm_test' FOR UPDATE;

table_name|compression_state|compressed_hypertable_id|status
----------+-----------------+------------------------+------
osm_test | 0| | 3
(1 row)

step Cb: BEGIN;
step UR1b: BEGIN;
step Cenable:
ALTER TABLE osm_test set (timescaledb.compress);
<waiting ...>
step UR1u: SELECT _timescaledb_functions.hypertable_osm_range_update('osm_test', 0, 10); <waiting ...>
step UnlockHypertableTuple: ROLLBACK;
step Cenable: <... completed>
step Ccommit: COMMIT;
step UR1u: <... completed>
hypertable_osm_range_update
---------------------------
f
(1 row)

step UR1c: COMMIT;

starting permutation: Ab UR1b UR1u Aadd UR1c Ac
step Ab: BEGIN;
step UR1b: BEGIN;
step UR1u: SELECT _timescaledb_functions.hypertable_osm_range_update('osm_test', 0, 10);
hypertable_osm_range_update
---------------------------
f
(1 row)

step Aadd: ALTER TABLE osm_test ADD COLUMN b INTEGER;
step UR1c: COMMIT;
step Ac: COMMIT;

starting permutation: Ab UR1b Aadd UR1u UR1c Ac
step Ab: BEGIN;
step UR1b: BEGIN;
step Aadd: ALTER TABLE osm_test ADD COLUMN b INTEGER;
step UR1u: SELECT _timescaledb_functions.hypertable_osm_range_update('osm_test', 0, 10);
hypertable_osm_range_update
---------------------------
f
(1 row)

step UR1c: COMMIT;
step Ac: COMMIT;

starting permutation: LHTb Utest2b UR1b LockHypertableTuple UR1u Utest2u Utest2c UnlockHypertableTuple UR1c
step LHTb: BEGIN;
step Utest2b: BEGIN;
step UR1b: BEGIN;
step LockHypertableTuple:
SELECT table_name, compression_state, compressed_hypertable_id, status
FROM _timescaledb_catalog.hypertable WHERE table_name = 'osm_test' FOR UPDATE;

table_name|compression_state|compressed_hypertable_id|status
----------+-----------------+------------------------+------
osm_test | 0| | 3
(1 row)

step UR1u: SELECT _timescaledb_functions.hypertable_osm_range_update('osm_test', 0, 10); <waiting ...>
step Utest2u: SELECT _timescaledb_functions.hypertable_osm_range_update('osm_test2', 0, 20);
hypertable_osm_range_update
---------------------------
f
(1 row)

step Utest2c: COMMIT;
step UnlockHypertableTuple: ROLLBACK;
step UR1u: <... completed>
hypertable_osm_range_update
---------------------------
f
(1 row)

step UR1c: COMMIT;
3 changes: 2 additions & 1 deletion tsl/test/isolation/specs/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ list(
cagg_insert.spec
cagg_multi_iso.spec
cagg_concurrent_refresh.spec
deadlock_drop_chunks_compress.spec)
deadlock_drop_chunks_compress.spec
osm_range_updates_iso.spec)

if(ENABLE_MULTINODE_TESTS)
list(APPEND TEST_FILES cagg_concurrent_refresh_dist_ht.spec)
Expand Down
Loading

0 comments on commit 7a5cecf

Please sign in to comment.