From 50349f62f14ba9ac7a9ddda18a2f507cca248308 Mon Sep 17 00:00:00 2001 From: Glenn Sheasby Date: Tue, 8 Mar 2022 16:20:33 +0000 Subject: [PATCH] Fix NPE caused by txn commit race condition (#5940) Fixed a race condition where if a transaction was just committed by another thread, we may return null values from TransactionService.get. --- ...ntCommitTimestampPutUnlessExistsTable.java | 8 +++--- ...mmitTimestampPutUnlessExistsTableTest.java | 25 +++++++++++++++++++ changelog/@unreleased/pr-5940.v2.yml | 8 ++++++ 3 files changed, 38 insertions(+), 3 deletions(-) create mode 100644 changelog/@unreleased/pr-5940.v2.yml diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/pue/ResilientCommitTimestampPutUnlessExistsTable.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/pue/ResilientCommitTimestampPutUnlessExistsTable.java index 19fa327aba1..2af6c67c3e9 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/pue/ResilientCommitTimestampPutUnlessExistsTable.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/pue/ResilientCommitTimestampPutUnlessExistsTable.java @@ -91,6 +91,7 @@ private Map processReads(Map reads, Map st // if we reach here, actual is guaranteed to be a staging value try { store.checkAndTouch(cell, actual); + checkAndTouch.put(cell, actual); } catch (CheckAndSetException e) { PutUnlessExistsValue kvsValue = encodingStrategy.decodeValueAsCommitTimestamp( startTs, Iterables.getOnlyElement(e.getActualValues())); @@ -100,10 +101,11 @@ private Map processReads(Map reads, Map st + "was found in the KVS", SafeArg.of("kvsValue", kvsValue), SafeArg.of("stagingValue", currentValue)); - continue; + } finally { + // If we got here after catching CheckAndSetException, then some other thread committed this + // transaction, and we must therefore return the commit timestamp. + resultBuilder.put(startTs, commitTs); } - checkAndTouch.put(cell, actual); - resultBuilder.put(startTs, commitTs); } store.put(KeyedStream.stream(checkAndTouch) .map(encodingStrategy::transformStagingToCommitted) diff --git a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/pue/ResilientCommitTimestampPutUnlessExistsTableTest.java b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/pue/ResilientCommitTimestampPutUnlessExistsTableTest.java index 4add06e79f5..6ef61137ec7 100644 --- a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/pue/ResilientCommitTimestampPutUnlessExistsTableTest.java +++ b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/pue/ResilientCommitTimestampPutUnlessExistsTableTest.java @@ -21,6 +21,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyMap; import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; @@ -28,6 +29,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.palantir.atlasdb.keyvalue.api.Cell; +import com.palantir.atlasdb.keyvalue.api.CheckAndSetException; import com.palantir.atlasdb.keyvalue.api.KeyAlreadyExistsException; import com.palantir.atlasdb.keyvalue.api.TableReference; import com.palantir.atlasdb.keyvalue.impl.InMemoryKeyValueService; @@ -86,6 +88,29 @@ public void pueThatThrowsIsCorrectedOnGet() throws ExecutionException, Interrupt verify(spiedStore).put(anyMap()); } + @Test + public void getReturnsStagingValuesThatWereCommittedBySomeoneElse() + throws ExecutionException, InterruptedException { + TwoPhaseEncodingStrategy strategy = TwoPhaseEncodingStrategy.INSTANCE; + + long startTimestamp = 1L; + long commitTimestamp = 2L; + Cell timestampAsCell = strategy.encodeStartTimestampAsCell(startTimestamp); + byte[] stagingValue = + strategy.encodeCommitTimestampAsValue(startTimestamp, PutUnlessExistsValue.staging(commitTimestamp)); + byte[] committedValue = + strategy.encodeCommitTimestampAsValue(startTimestamp, PutUnlessExistsValue.committed(commitTimestamp)); + spiedStore.putUnlessExists(timestampAsCell, stagingValue); + + List actualValues = ImmutableList.of(committedValue); + + doThrow(new CheckAndSetException("done elsewhere", timestampAsCell, stagingValue, actualValues)) + .when(spiedStore) + .checkAndTouch(timestampAsCell, stagingValue); + + assertThat(pueTable.get(startTimestamp).get()).isEqualTo(commitTimestamp); + } + @Test public void onceNonNullValueIsReturnedItIsAlwaysReturned() { PutUnlessExistsTable putUnlessExistsTable = new ResilientCommitTimestampPutUnlessExistsTable( diff --git a/changelog/@unreleased/pr-5940.v2.yml b/changelog/@unreleased/pr-5940.v2.yml new file mode 100644 index 00000000000..18acfc22c08 --- /dev/null +++ b/changelog/@unreleased/pr-5940.v2.yml @@ -0,0 +1,8 @@ +type: fix +fix: + description: 'Fixed a race condition where if a transaction was just committed by + another thread, users would receive null values from TransactionService.get. This + caused users to observe that a completed transaction was still running, but did + not lead to any correctness issue. ' + links: + - https://github.com/palantir/atlasdb/pull/5940