From 89b5644900f215c41900d2542e20a9ebba30be1b Mon Sep 17 00:00:00 2001 From: Jakub Kozlowski Date: Wed, 31 Jul 2024 14:16:10 +0000 Subject: [PATCH] Immutable commit timestamp and delayed writes. --- .palantir/revapi.yml | 28 +++ .../atlasdb/transaction/api/DelayedWrite.java | 34 +++ .../atlasdb/transaction/api/Transaction.java | 4 + .../transaction/api/TransactionManager.java | 2 + .../PreCommitRequirementValidator.java | 6 +- atlasdb-autobatch/build.gradle | 1 + .../atlasdb/autobatch/Autobatchers.java | 23 +- .../impl/ForwardingTransaction.java | 7 + ...DefaultLockAndTimestampServiceFactory.java | 12 +- .../atlasdb/factory/TransactionManagers.java | 1 + .../cache/LockWatchValueScopingCacheImpl.java | 56 +++-- .../api/watch/LockWatchEventCacheImpl.java | 41 ++-- .../keyvalue/api/watch/LockWatchEventLog.java | 10 +- .../impl/DefaultCommitTimestampLoader.java | 47 ++-- .../impl/ImmutableTimestampLockManager.java | 16 +- .../impl/InstrumentedTimelockService.java | 8 +- .../transaction/impl/LocalWriteBuffer.java | 12 ++ .../transaction/impl/SnapshotTransaction.java | 108 ++++++++-- .../impl/SnapshotTransactionManager.java | 5 + .../DefaultPreCommitRequirementValidator.java | 9 +- .../ImmutableTimestampLockManagerTest.java | 25 ++- ...aultPreCommitRequirementValidatorTest.java | 12 +- .../InMemoryNamespacedTimelockRpcClient.java | 5 + .../impl/AbstractSnapshotTransactionTest.java | 5 +- .../store/UnreliableTimestampManager.java | 3 +- .../lock/v2/GetCommitTimestampResponse.java | 32 +++ .../com/palantir/lock/v2/TimelockService.java | 4 +- .../client/BatchingCommitTimestampGetter.java | 67 ++++-- .../lock/client/CommitTimestampGetter.java | 3 +- .../lock/client/LockLeaseService.java | 16 +- .../MultiClientCommitTimestampGetter.java | 63 ++++-- .../NamespacedCommitTimestampGetter.java | 3 +- .../lock/client/ProfilingTimelockService.java | 12 +- .../client/RandomizedTimestampManager.java | 3 +- .../client/RemoteTimelockServiceAdapter.java | 8 +- .../palantir/lock/client/TimeLockClient.java | 8 +- .../client/UnreliableTimeLockService.java | 9 +- .../DefaultNamespacedTimelockRpcClient.java | 5 + .../lock/v2/NamespacedTimelockRpcClient.java | 2 + .../palantir/lock/v2/TimelockRpcClient.java | 4 + .../BatchingCommitTimestampGetterTest.java | 43 +++- .../MultiClientCommitTimestampGetterTest.java | 57 +++-- ...stampCorroboratingTimelockServiceTest.java | 8 +- .../lock/impl/LegacyTimelockService.java | 90 ++++---- .../lock/impl/LegacyTimelockServiceTest.java | 3 +- .../src/main/conjure/timelock-api.yml | 2 + timelock-impl/build.gradle | 1 + .../disruptor/RingBufferLockEventStore.java | 202 ++++++++++++++++++ .../timelock/AsyncTimelockService.java | 2 + .../timelock/AsyncTimelockServiceImpl.java | 35 ++- .../UndertowAsyncTimelockResource.java | 10 + .../timelock/lock/AsyncLockService.java | 21 ++ .../watch/ArrayLockEventSlidingWindow.java | 21 +- .../timelock/lock/watch/LockEventLogImpl.java | 60 +++++- .../timelock/lock/watch/LockEventStore.java | 40 ++++ .../lock/watch/LockWatchingServiceImpl.java | 5 + .../timelock/lock/watch/NextEvents.java | 38 ++++ ...ultiClientConjureTimelockResourceTest.java | 15 +- ...indowTest.java => LockEventStoreTest.java} | 39 ++-- .../watch/LockWatchingServiceImplTest.java | 13 ++ .../RingBufferLockEventStoreTest.java | 95 ++++++++ .../lock/AsyncLockServiceEteTest.java | 1 + .../timelock/lock/AsyncLockServiceTest.java | 2 + 63 files changed, 1270 insertions(+), 252 deletions(-) create mode 100644 atlasdb-api/src/main/java/com/palantir/atlasdb/transaction/api/DelayedWrite.java create mode 100644 lock-api-objects/src/main/java/com/palantir/lock/v2/GetCommitTimestampResponse.java create mode 100644 timelock-impl/src/main/java/com/lmax/disruptor/RingBufferLockEventStore.java create mode 100644 timelock-impl/src/main/java/com/palantir/atlasdb/timelock/lock/watch/LockEventStore.java create mode 100644 timelock-impl/src/main/java/com/palantir/atlasdb/timelock/lock/watch/NextEvents.java rename timelock-impl/src/test/java/com/palantir/atlasdb/timelock/lock/watch/{ArrayLockEventSlidingWindowTest.java => LockEventStoreTest.java} (85%) create mode 100644 timelock-impl/src/test/java/lmax/disruptor/RingBufferLockEventStoreTest.java diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml index 4379ad5e21b..46b5ef7c6db 100644 --- a/.palantir/revapi.yml +++ b/.palantir/revapi.yml @@ -183,6 +183,34 @@ acceptedBreaks: - code: "java.method.removed" old: "method void com.palantir.atlasdb.keyvalue.api.watch.NoOpLockWatchManager::removeTransactionStateFromCache(long)" justification: "Internal API" + "0.1142.0": + com.palantir.atlasdb:atlasdb-api: + - code: "java.annotation.removed" + old: "parameter void com.palantir.atlasdb.transaction.api.precommit.PreCommitRequirementValidator::throwIfImmutableTsOrCommitLocksExpired(===com.palantir.lock.v2.LockToken===)" + new: "parameter void com.palantir.atlasdb.transaction.api.precommit.PreCommitRequirementValidator::throwIfImmutableTsOrCommitLocksExpired(===java.util.Set===)" + justification: "Prototyping" + - code: "java.annotation.removed" + old: "parameter void com.palantir.atlasdb.transaction.api.precommit.PreCommitRequirementValidator::throwIfPreCommitRequirementsNotMet(===com.palantir.lock.v2.LockToken===,\ + \ long)" + new: "parameter void com.palantir.atlasdb.transaction.api.precommit.PreCommitRequirementValidator::throwIfPreCommitRequirementsNotMet(===java.util.Set===,\ + \ long)" + justification: "Prototyping" + - code: "java.method.addedToInterface" + new: "method long com.palantir.atlasdb.transaction.api.TransactionManager::getCommitImmutableTimestamp()" + justification: "Prototyping" + - code: "java.method.addedToInterface" + new: "method void com.palantir.atlasdb.transaction.api.Transaction::putDelayed(java.util.List)" + justification: "Prototyping" + - code: "java.method.parameterTypeChanged" + old: "parameter void com.palantir.atlasdb.transaction.api.precommit.PreCommitRequirementValidator::throwIfImmutableTsOrCommitLocksExpired(===com.palantir.lock.v2.LockToken===)" + new: "parameter void com.palantir.atlasdb.transaction.api.precommit.PreCommitRequirementValidator::throwIfImmutableTsOrCommitLocksExpired(===java.util.Set===)" + justification: "Prototyping" + - code: "java.method.parameterTypeChanged" + old: "parameter void com.palantir.atlasdb.transaction.api.precommit.PreCommitRequirementValidator::throwIfPreCommitRequirementsNotMet(===com.palantir.lock.v2.LockToken===,\ + \ long)" + new: "parameter void com.palantir.atlasdb.transaction.api.precommit.PreCommitRequirementValidator::throwIfPreCommitRequirementsNotMet(===java.util.Set===,\ + \ long)" + justification: "Prototyping" "0.770.0": com.palantir.atlasdb:atlasdb-api: - code: "java.class.removed" diff --git a/atlasdb-api/src/main/java/com/palantir/atlasdb/transaction/api/DelayedWrite.java b/atlasdb-api/src/main/java/com/palantir/atlasdb/transaction/api/DelayedWrite.java new file mode 100644 index 00000000000..74700e990f9 --- /dev/null +++ b/atlasdb-api/src/main/java/com/palantir/atlasdb/transaction/api/DelayedWrite.java @@ -0,0 +1,34 @@ +/* + * (c) Copyright 2024 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.atlasdb.transaction.api; + +import com.palantir.atlasdb.keyvalue.api.Cell; +import com.palantir.atlasdb.keyvalue.api.TableReference; +import java.util.Map; +import java.util.function.LongFunction; + +/** + * Curious creature, but it allows the user to delay deciding the particular cell to write to until the actual commit, + * at which point the user will be provided with a unique long value that can be used to generate the cell. + * + * This is a very specific feature that not many people probably require, or should try to use. + * + * Right now the interface allows you to do TOO MUCH, you don't even have to specify the tables you write to. + * This can be improved and locked down, but this allows for maximum experimentation. + */ +@FunctionalInterface +public interface DelayedWrite extends LongFunction>> {} diff --git a/atlasdb-api/src/main/java/com/palantir/atlasdb/transaction/api/Transaction.java b/atlasdb-api/src/main/java/com/palantir/atlasdb/transaction/api/Transaction.java index 4285083b3a6..abf7a2136fa 100644 --- a/atlasdb-api/src/main/java/com/palantir/atlasdb/transaction/api/Transaction.java +++ b/atlasdb-api/src/main/java/com/palantir/atlasdb/transaction/api/Transaction.java @@ -33,6 +33,7 @@ import com.palantir.lock.watch.ChangeMetadata; import com.palantir.util.result.Result; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.NavigableMap; import java.util.Set; @@ -323,6 +324,9 @@ Stream>> getRangesLazy( @Idempotent void putWithMetadata(TableReference tableRef, Map valuesAndMetadata); + @Idempotent + void putDelayed(List values); + /** * Deletes values from the key-value store. * @param tableRef the table from which to delete the values diff --git a/atlasdb-api/src/main/java/com/palantir/atlasdb/transaction/api/TransactionManager.java b/atlasdb-api/src/main/java/com/palantir/atlasdb/transaction/api/TransactionManager.java index e407ac46827..fd420044ae3 100644 --- a/atlasdb-api/src/main/java/com/palantir/atlasdb/transaction/api/TransactionManager.java +++ b/atlasdb-api/src/main/java/com/palantir/atlasdb/transaction/api/TransactionManager.java @@ -309,6 +309,8 @@ T runTaskWithConditionRea */ long getImmutableTimestamp(); + long getCommitImmutableTimestamp(); + /** * Returns the lock service used by this transaction manager. * diff --git a/atlasdb-api/src/main/java/com/palantir/atlasdb/transaction/api/precommit/PreCommitRequirementValidator.java b/atlasdb-api/src/main/java/com/palantir/atlasdb/transaction/api/precommit/PreCommitRequirementValidator.java index 7b6ff0a459f..66e034b5d31 100644 --- a/atlasdb-api/src/main/java/com/palantir/atlasdb/transaction/api/precommit/PreCommitRequirementValidator.java +++ b/atlasdb-api/src/main/java/com/palantir/atlasdb/transaction/api/precommit/PreCommitRequirementValidator.java @@ -21,7 +21,7 @@ import com.palantir.atlasdb.transaction.api.TransactionFailedException; import com.palantir.lock.v2.LockToken; import java.util.Map; -import javax.annotation.Nullable; +import java.util.Set; public interface PreCommitRequirementValidator { /** @@ -41,10 +41,10 @@ void throwIfPreCommitConditionInvalidAtCommitOnWriteTransaction( * user pre-commit condition is no longer valid, or possibly because of other internal state such as commit * locks having expired. */ - void throwIfPreCommitRequirementsNotMet(@Nullable LockToken commitLocksToken, long timestamp); + void throwIfPreCommitRequirementsNotMet(Set commitLocksToken, long timestamp); /** * Throws a {@link TransactionFailedException} if the immutable timestamp lock or commit locks have expired. */ - void throwIfImmutableTsOrCommitLocksExpired(@Nullable LockToken commitLocksToken); + void throwIfImmutableTsOrCommitLocksExpired(Set commitLocksToken); } diff --git a/atlasdb-autobatch/build.gradle b/atlasdb-autobatch/build.gradle index 71b0ef4e2ae..43b47cbb0b4 100644 --- a/atlasdb-autobatch/build.gradle +++ b/atlasdb-autobatch/build.gradle @@ -18,6 +18,7 @@ dependencies { implementation 'com.palantir.conjure.java.api:errors' implementation 'com.palantir.safe-logging:preconditions' implementation 'com.palantir.tracing:tracing' + implementation 'com.palantir.tracing:tracing-api' implementation 'io.dropwizard.metrics:metrics-core' implementation project(':commons-executors') diff --git a/atlasdb-autobatch/src/main/java/com/palantir/atlasdb/autobatch/Autobatchers.java b/atlasdb-autobatch/src/main/java/com/palantir/atlasdb/autobatch/Autobatchers.java index bf713b70f2d..db49d94ba7b 100644 --- a/atlasdb-autobatch/src/main/java/com/palantir/atlasdb/autobatch/Autobatchers.java +++ b/atlasdb-autobatch/src/main/java/com/palantir/atlasdb/autobatch/Autobatchers.java @@ -29,6 +29,9 @@ import com.palantir.logsafe.logger.SafeLogger; import com.palantir.logsafe.logger.SafeLoggerFactory; import com.palantir.tracing.Observability; +import com.palantir.tracing.Tracer; +import com.palantir.tracing.Tracers; +import com.palantir.tracing.api.SpanType; import java.time.Duration; import java.util.List; import java.util.Map; @@ -97,7 +100,25 @@ public static AutobatcherBuilder coalescing(Supplier supp */ public static AutobatcherBuilder independent(Consumer>> batchFunction) { return new AutobatcherBuilder<>(parameters -> new IndependentBatchingEventHandler<>( - maybeWrapWithTimeout(batchFunction, parameters), parameters.batchSize())); + maybeWrapWithTimeout( + input -> { + long start = System.nanoTime(); + try { + Tracer.initTraceWithSpan( + Observability.SAMPLE, + Tracers.randomId(), + parameters.safeLoggablePurpose(), + SpanType.LOCAL); + batchFunction.accept(input); + } finally { + log.info( + "Finished batch", + SafeArg.of("duration", Duration.ofNanos(System.nanoTime() - start))); + Tracer.fastCompleteSpan(); + } + }, + parameters), + parameters.batchSize())); } private static Consumer>> maybeWrapWithTimeout( diff --git a/atlasdb-client/src/main/java/com/palantir/atlasdb/transaction/impl/ForwardingTransaction.java b/atlasdb-client/src/main/java/com/palantir/atlasdb/transaction/impl/ForwardingTransaction.java index 6caba9c9fcb..04cdc2eb4da 100644 --- a/atlasdb-client/src/main/java/com/palantir/atlasdb/transaction/impl/ForwardingTransaction.java +++ b/atlasdb-client/src/main/java/com/palantir/atlasdb/transaction/impl/ForwardingTransaction.java @@ -25,6 +25,7 @@ import com.palantir.atlasdb.keyvalue.api.RowResult; import com.palantir.atlasdb.keyvalue.api.TableReference; import com.palantir.atlasdb.transaction.api.ConstraintCheckable; +import com.palantir.atlasdb.transaction.api.DelayedWrite; import com.palantir.atlasdb.transaction.api.GetRangesQuery; import com.palantir.atlasdb.transaction.api.Transaction; import com.palantir.atlasdb.transaction.api.TransactionFailedException; @@ -37,6 +38,7 @@ import com.palantir.lock.watch.ChangeMetadata; import com.palantir.util.result.Result; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.NavigableMap; import java.util.Set; @@ -144,6 +146,11 @@ public void putWithMetadata(TableReference tableRef, Map values) { + delegate().putDelayed(values); + } + @Override public void delete(TableReference tableRef, Set keys) { delegate().delete(tableRef, keys); diff --git a/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/DefaultLockAndTimestampServiceFactory.java b/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/DefaultLockAndTimestampServiceFactory.java index df7d0609086..b3172b21dd4 100644 --- a/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/DefaultLockAndTimestampServiceFactory.java +++ b/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/DefaultLockAndTimestampServiceFactory.java @@ -441,7 +441,11 @@ private static LockAndTimestampServices createRawRemoteServices( .lock(lockService) .timestamp(timeService) .timestampManagement(timestampManagementService) - .timelock(new LegacyTimelockService(timeService, lockService, TransactionManagers.LOCK_CLIENT)) + .timelock(new LegacyTimelockService( + timeService, + lockService, + TransactionManagers.LOCK_CLIENT, + TransactionManagers.COMMIT_TIMESTAMP_LOCK_CLIENT)) .build(); } @@ -463,7 +467,11 @@ private static LockAndTimestampServices createRawEmbeddedServices( .lock(lockService) .timestamp(timeService) .timestampManagement(timestampManagementService) - .timelock(new LegacyTimelockService(timeService, lockService, TransactionManagers.LOCK_CLIENT)) + .timelock(new LegacyTimelockService( + timeService, + lockService, + TransactionManagers.LOCK_CLIENT, + TransactionManagers.COMMIT_TIMESTAMP_LOCK_CLIENT)) .build(); } } diff --git a/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/TransactionManagers.java b/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/TransactionManagers.java index 9df0b53032a..bfe5325b8bc 100644 --- a/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/TransactionManagers.java +++ b/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/TransactionManagers.java @@ -176,6 +176,7 @@ public abstract class TransactionManagers { private static final SafeLogger log = SafeLoggerFactory.get(TransactionManagers.class); public static final LockClient LOCK_CLIENT = LockClient.of("atlas instance"); + public static final LockClient COMMIT_TIMESTAMP_LOCK_CLIENT = LockClient.of("commitTimestamp"); abstract AtlasDbConfig config(); diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/api/cache/LockWatchValueScopingCacheImpl.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/api/cache/LockWatchValueScopingCacheImpl.java index f2264774e9b..8f9606f3895 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/api/cache/LockWatchValueScopingCacheImpl.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/api/cache/LockWatchValueScopingCacheImpl.java @@ -40,6 +40,7 @@ import com.palantir.logsafe.SafeArg; import com.palantir.logsafe.logger.SafeLogger; import com.palantir.logsafe.logger.SafeLoggerFactory; +import com.palantir.tracing.CloseableTracer; import com.palantir.util.RateLimitedLogger; import java.util.Comparator; import java.util.List; @@ -112,21 +113,28 @@ public static LockWatchValueScopingCache create( @Override public synchronized void processStartTransactions(Set startTimestamps) { - TransactionsLockWatchUpdate updateForTransactions = - eventCache.getUpdateForTransactions(startTimestamps, currentVersion); + try (CloseableTracer _trace = CloseableTracer.startSpan("processStartTransactions")) { + TransactionsLockWatchUpdate updateForTransactions = + eventCache.getUpdateForTransactions(startTimestamps, currentVersion); - Optional latestVersionFromUpdate = computeMaxUpdateVersion(updateForTransactions); + Optional latestVersionFromUpdate = computeMaxUpdateVersion(updateForTransactions); - if (updateForTransactions.clearCache()) { - clearCache(updateForTransactions, latestVersionFromUpdate); - } + if (updateForTransactions.clearCache()) { + clearCache(updateForTransactions, latestVersionFromUpdate); + } - updateStores(updateForTransactions); - updateCurrentVersion(latestVersionFromUpdate); + try (CloseableTracer tracer = CloseableTracer.startSpan("LockWatchValueScopingCacheImpl#updateStores")) { + updateStores(updateForTransactions); + } + try (CloseableTracer tracer = + CloseableTracer.startSpan("LockWatchValueScopingCacheImpl#updateCurrentVersion")) { + updateCurrentVersion(latestVersionFromUpdate); + } + } } @Override - public synchronized void updateCacheWithCommitTimestampsInformation(Set startTimestamps) { + public void updateCacheWithCommitTimestampsInformation(Set startTimestamps) { startTimestamps.forEach(this::processCommitUpdate); } @@ -139,8 +147,27 @@ public void requestStateRemoved(long startTs) { @Override public synchronized void ensureStateRemoved(long startTimestamp) { + ensureStateRemovedImpl(startTimestamp); + } + + private synchronized void ensureStateRemoved(Set elements) { + try (CloseableTracer tracer = CloseableTracer.startSpan("ensureStateRemoved(Set)")) { + for (long startTimestamp : elements) { + ensureStateRemovedImpl(startTimestamp); + } + } + } + + private synchronized void ensureStateRemovedImpl(long startTimestamp) { + // Not very concurrency safe, but should be fast, just removal from + // 2 maps. Once a second it runs some cleanup on internal datastructures, + // which will block, but should also be fairly quick. + // I think the requirement here is that sometimes we need to + // do computation on the whole datastrure, but we should figure out + // a way to not have to do that. StartTimestamp startTs = StartTimestamp.of(startTimestamp); snapshotStore.removeTimestamp(startTs); + // Concurrency safe cacheStore.removeCache(startTs); } @@ -214,11 +241,16 @@ public TransactionScopedCache getReadOnlyTransactionScopedCacheForCommit(long st return cacheStore.getReadOnlyCache(StartTimestamp.of(startTs)); } - private synchronized void processCommitUpdate(long startTimestamp) { + private void processCommitUpdate(long startTimestamp) { StartTimestamp startTs = StartTimestamp.of(startTimestamp); + // Just a lookup to a map: threadsafe and doesn't really need to be synchronized. TransactionScopedCache cache = cacheStore.getCache(startTs); cache.finalise(); + + // This is already threadsafe and called directly from other places? CommitUpdate commitUpdate = eventCache.getCommitUpdate(startTimestamp); + + // Just #computeIfPresent on a threadsafe map cacheStore.createReadOnlyCache(startTs, commitUpdate); } @@ -350,9 +382,7 @@ public void accept(Set elements) { // TODO (jkong): Implement a nicer version of multi-removal. // The current version for instance may, in between timestamps, spam the rate limiter or clean up snapshots // multiple times when it could just do the check once at the end. - for (long element : elements) { - ensureStateRemoved(element); - } + ensureStateRemoved(elements); } } } diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/api/watch/LockWatchEventCacheImpl.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/api/watch/LockWatchEventCacheImpl.java index c1b8d44f4e0..ff7458f97e6 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/api/watch/LockWatchEventCacheImpl.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/api/watch/LockWatchEventCacheImpl.java @@ -34,6 +34,7 @@ import com.palantir.logsafe.UnsafeArg; import com.palantir.logsafe.logger.SafeLogger; import com.palantir.logsafe.logger.SafeLoggerFactory; +import com.palantir.tracing.CloseableTracer; import java.util.Collection; import java.util.Optional; import java.util.Set; @@ -129,17 +130,19 @@ public CommitUpdate getEventUpdate(long startTs) { @Override public synchronized TransactionsLockWatchUpdate getUpdateForTransactions( Set startTimestamps, Optional lastKnownVersion) { - Preconditions.checkArgument(!startTimestamps.isEmpty(), "Cannot get update for empty set of transactions"); - TimestampMapping timestampMapping = getTimestampMappings(startTimestamps); - - VersionBounds versionBounds = VersionBounds.builder() - .startVersion(lastKnownVersion) - .endVersion(timestampMapping.lastVersion()) - .earliestSnapshotVersion(timestampMapping.versionRange().lowerEndpoint()) - .build(); - - return eventLog.getEventsBetweenVersions(versionBounds) - .toTransactionsLockWatchUpdate(timestampMapping, lastKnownVersion); + try (CloseableTracer tracer = CloseableTracer.startSpan("LockWatchEventCacheImpl#getUpdateForTransactions")) { + Preconditions.checkArgument(!startTimestamps.isEmpty(), "Cannot get update for empty set of transactions"); + TimestampMapping timestampMapping = getTimestampMappings(startTimestamps); + + VersionBounds versionBounds = VersionBounds.builder() + .startVersion(lastKnownVersion) + .endVersion(timestampMapping.lastVersion()) + .earliestSnapshotVersion(timestampMapping.versionRange().lowerEndpoint()) + .build(); + + return eventLog.getEventsBetweenVersions(versionBounds) + .toTransactionsLockWatchUpdate(timestampMapping, lastKnownVersion); + } } @Override @@ -165,13 +168,15 @@ synchronized LockWatchEventCacheState getStateForDiagnostics() { } private synchronized TimestampMapping getTimestampMappings(Set startTimestamps) { - ImmutableTimestampMapping.Builder mappingBuilder = TimestampMapping.builder(); - startTimestamps.forEach(timestamp -> { - Optional entry = timestampStateStore.getStartVersion(timestamp); - assertTrue(entry.isPresent(), "start timestamp missing from map"); - mappingBuilder.putTimestampMapping(timestamp, entry.get()); - }); - return mappingBuilder.build(); + try (CloseableTracer _trace = CloseableTracer.startSpan("LockWatchEventCacheImpl#getTimestampMappings")) { + ImmutableTimestampMapping.Builder mappingBuilder = TimestampMapping.builder(); + startTimestamps.forEach(timestamp -> { + Optional entry = timestampStateStore.getStartVersion(timestamp); + assertTrue(entry.isPresent(), "start timestamp missing from map"); + mappingBuilder.putTimestampMapping(timestamp, entry.get()); + }); + return mappingBuilder.build(); + } } private synchronized Optional processEventLogUpdate(LockWatchStateUpdate update) { diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/api/watch/LockWatchEventLog.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/api/watch/LockWatchEventLog.java index cabaaf7388e..de83f3b80c2 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/api/watch/LockWatchEventLog.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/api/watch/LockWatchEventLog.java @@ -25,6 +25,7 @@ import com.palantir.lock.watch.LockWatchVersion; import com.palantir.logsafe.Preconditions; import com.palantir.logsafe.Unsafe; +import com.palantir.tracing.CloseableTracer; import java.util.Collection; import java.util.Optional; import java.util.concurrent.locks.ReadWriteLock; @@ -80,7 +81,14 @@ CacheUpdate processUpdate(LockWatchStateUpdate update) { * condensed. */ ClientLogEvents getEventsBetweenVersions(VersionBounds versionBounds) { - return runWithReadLock(() -> getEventsBetweenVersionsInternal(versionBounds)); + try (CloseableTracer tracer = CloseableTracer.startSpan("LockWatchEventLog#getEventsBetweenVersions")) { + return runWithReadLock(() -> { + try (CloseableTracer tracer1 = + CloseableTracer.startSpan("LockWatchEventLog#getEventsBetweenVersionsInternal")) { + return getEventsBetweenVersionsInternal(versionBounds); + } + }); + } } /** diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/DefaultCommitTimestampLoader.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/DefaultCommitTimestampLoader.java index 14d79f3e6b4..ae6808a36e4 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/DefaultCommitTimestampLoader.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/DefaultCommitTimestampLoader.java @@ -44,6 +44,7 @@ import com.palantir.logsafe.UnsafeArg; import com.palantir.logsafe.logger.SafeLogger; import com.palantir.logsafe.logger.SafeLoggerFactory; +import com.palantir.tracing.CloseableTracer; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -197,20 +198,22 @@ private void waitForCommitToComplete(LongIterable startTimestamps) { } private void waitFor(Set lockDescriptors) { - TransactionConfig currentTransactionConfig = transactionConfig.get(); + try (CloseableTracer tracer = CloseableTracer.startSpan("waitFor")) { + TransactionConfig currentTransactionConfig = transactionConfig.get(); - // TODO(fdesouza): Revert this once PDS-95791 is resolved. - long lockAcquireTimeoutMillis = currentTransactionConfig.getLockAcquireTimeoutMillis(); - WaitForLocksRequest request = WaitForLocksRequest.of(lockDescriptors, lockAcquireTimeoutMillis); - WaitForLocksResponse response = timelockService.waitForLocks(request); - if (!response.wasSuccessful()) { - log.error( - "Timed out waiting for commits to complete. Timeout was {} ms. First ten locks were {}.", - SafeArg.of("requestId", request.getRequestId()), - SafeArg.of("acquireTimeoutMs", lockAcquireTimeoutMillis), - SafeArg.of("numberOfDescriptors", lockDescriptors.size()), - UnsafeArg.of("firstTenLockDescriptors", Iterables.limit(lockDescriptors, 10))); - throw new TransactionLockAcquisitionTimeoutException("Timed out waiting for commits to complete."); + // TODO(fdesouza): Revert this once PDS-95791 is resolved. + long lockAcquireTimeoutMillis = currentTransactionConfig.getLockAcquireTimeoutMillis(); + WaitForLocksRequest request = WaitForLocksRequest.of(lockDescriptors, lockAcquireTimeoutMillis); + WaitForLocksResponse response = timelockService.waitForLocks(request); + if (!response.wasSuccessful()) { + log.error( + "Timed out waiting for commits to complete. Timeout was {} ms. First ten locks were {}.", + SafeArg.of("requestId", request.getRequestId()), + SafeArg.of("acquireTimeoutMs", lockAcquireTimeoutMillis), + SafeArg.of("numberOfDescriptors", lockDescriptors.size()), + UnsafeArg.of("firstTenLockDescriptors", Iterables.limit(lockDescriptors, 10))); + throw new TransactionLockAcquisitionTimeoutException("Timed out waiting for commits to complete."); + } } } @@ -236,14 +239,16 @@ private Timer getTimer(String name) { private ListenableFuture> loadCommitTimestamps(LongSet startTimestamps) { // distinguish between a single timestamp and a batch, for more granular metrics - if (startTimestamps.size() == 1) { - long singleTs = startTimestamps.longIterator().next(); - return Futures.transform( - transactionService.getAsyncV2(singleTs), - commitState -> ImmutableMap.of(singleTs, commitState), - MoreExecutors.directExecutor()); - } else { - return transactionService.getAsyncV2(startTimestamps.collect(Long::valueOf)); + try (CloseableTracer tracer = CloseableTracer.startSpan("loadCommitTimestamps")) { + if (startTimestamps.size() == 1) { + long singleTs = startTimestamps.longIterator().next(); + return Futures.transform( + transactionService.getAsyncV2(singleTs), + commitState -> ImmutableMap.of(singleTs, commitState), + MoreExecutors.directExecutor()); + } else { + return transactionService.getAsyncV2(startTimestamps.collect(Long::valueOf)); + } } } diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/ImmutableTimestampLockManager.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/ImmutableTimestampLockManager.java index 0052e555e79..d14c555a456 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/ImmutableTimestampLockManager.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/ImmutableTimestampLockManager.java @@ -36,9 +36,13 @@ public ImmutableTimestampLockManager( this.lockValidityChecker = lockValidityChecker; } - public Optional getExpiredImmutableTimestampAndCommitLocks(Optional commitLocksToken) { + public Optional getExpiredImmutableTimestampAndCommitLocks(Set commitLocksToken) { Set toRefresh = new HashSet<>(); - commitLocksToken.ifPresent(toRefresh::add); + + // TODO(jakubk): Handle this upstream + if (commitLocksToken != null) { + toRefresh.addAll(commitLocksToken); + } immutableTimestampLock.ifPresent(toRefresh::add); if (toRefresh.isEmpty()) { @@ -55,13 +59,13 @@ public Optional getExpiredImmutableTimestampAndCommitLocks(Optiona } public SummarizedLockCheckResult getExpiredImmutableTimestampAndCommitLocksWithFullSummary( - LockToken commitLocksToken) { + Set commitLocksToken) { Preconditions.checkNotNull( commitLocksToken, "commitLocksToken was null, not expected to be in a call to" + " getExpiredImmutableTimestampAndCommitLocksWithFullSummary", SafeArg.of("immutableTimestampLock", immutableTimestampLock)); - Optional expiredLocks = getExpiredImmutableTimestampAndCommitLocks(Optional.of(commitLocksToken)); + Optional expiredLocks = getExpiredImmutableTimestampAndCommitLocks(commitLocksToken); return SummarizedLockCheckResult.builder() .expiredLocks(expiredLocks) .immutableTimestampLock(immutableTimestampLock) @@ -69,7 +73,7 @@ public SummarizedLockCheckResult getExpiredImmutableTimestampAndCommitLocksWithF .build(); } - private String getExpiredLocksErrorString(Optional commitLocksToken, Set expiredLocks) { + private String getExpiredLocksErrorString(Set commitLocksToken, Set expiredLocks) { return "The following immutable timestamp lock was required: " + immutableTimestampLock + "; the following commit locks were required: " + commitLocksToken + "; the following locks are no longer valid: " + expiredLocks; @@ -95,7 +99,7 @@ public interface SummarizedLockCheckResult { Optional immutableTimestampLock(); - LockToken userProvidedLock(); + Set userProvidedLock(); static ImmutableSummarizedLockCheckResult.Builder builder() { return ImmutableSummarizedLockCheckResult.builder(); diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/InstrumentedTimelockService.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/InstrumentedTimelockService.java index 352c7d0b1f7..a7cd9940f6d 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/InstrumentedTimelockService.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/InstrumentedTimelockService.java @@ -19,6 +19,7 @@ import com.palantir.atlasdb.AtlasDbMetricNames; import com.palantir.atlasdb.util.MetricsManager; import com.palantir.lock.v2.ClientLockingOptions; +import com.palantir.lock.v2.GetCommitTimestampResponse; import com.palantir.lock.v2.LockImmutableTimestampResponse; import com.palantir.lock.v2.LockRequest; import com.palantir.lock.v2.LockResponse; @@ -69,7 +70,7 @@ public long getFreshTimestamp() { } @Override - public long getCommitTimestamp(long startTs, LockToken commitLocksToken) { + public GetCommitTimestampResponse getCommitTimestamp(long startTs, LockToken commitLocksToken) { return executeWithRecord(() -> timelockService.getCommitTimestamp(startTs, commitLocksToken)); } @@ -93,6 +94,11 @@ public long getImmutableTimestamp() { return executeWithRecord(timelockService::getImmutableTimestamp); } + @Override + public long getCommitImmutableTimestamp() { + return executeWithRecord(timelockService::getCommitImmutableTimestamp); + } + @Override public LockResponse lock(LockRequest request) { return executeWithRecord(() -> timelockService.lock(request)); diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/LocalWriteBuffer.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/LocalWriteBuffer.java index 103bfaf35b6..d5861956267 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/LocalWriteBuffer.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/LocalWriteBuffer.java @@ -23,14 +23,17 @@ import com.palantir.atlasdb.keyvalue.api.TableReference; import com.palantir.atlasdb.keyvalue.impl.Cells; import com.palantir.atlasdb.logging.LoggingArgs; +import com.palantir.atlasdb.transaction.api.DelayedWrite; import com.palantir.lock.watch.ChangeMetadata; import com.palantir.logsafe.SafeArg; import com.palantir.logsafe.UnsafeArg; import com.palantir.logsafe.exceptions.SafeIllegalStateException; import com.palantir.logsafe.logger.SafeLogger; import com.palantir.logsafe.logger.SafeLoggerFactory; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -44,6 +47,7 @@ class LocalWriteBuffer { private final ConcurrentMap> writesByTable = new ConcurrentHashMap<>(); + private final List delayedWritesByTable = Collections.synchronizedList(new ArrayList<>()); private final ConcurrentMap> metadataByTable = new ConcurrentHashMap<>(); private final ConcurrentMap locksByTable = new ConcurrentHashMap<>(); private final AtomicLong valuesByteCount = new AtomicLong(); @@ -92,6 +96,10 @@ public void putLocalWritesAndMetadata( } } + public void putDelayed(List values) { + delayedWritesByTable.addAll(values); + } + /** * Returns all local writes that have been buffered. */ @@ -99,6 +107,10 @@ public ConcurrentMap> getLo return writesByTable; } + public List getDelayedWrites() { + return delayedWritesByTable; + } + /** * Returns the local writes for cells of the given table. */ diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransaction.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransaction.java index a7ba1c983f3..2c43a34d652 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransaction.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransaction.java @@ -82,6 +82,7 @@ import com.palantir.atlasdb.transaction.api.ConflictHandler; import com.palantir.atlasdb.transaction.api.ConstraintCheckable; import com.palantir.atlasdb.transaction.api.ConstraintCheckingTransaction; +import com.palantir.atlasdb.transaction.api.DelayedWrite; import com.palantir.atlasdb.transaction.api.DeleteExecutor; import com.palantir.atlasdb.transaction.api.GetRangesQuery; import com.palantir.atlasdb.transaction.api.ImmutableGetRangesQuery; @@ -147,6 +148,7 @@ import com.palantir.lock.AtlasRowLockDescriptor; import com.palantir.lock.LockDescriptor; import com.palantir.lock.v2.ClientLockingOptions; +import com.palantir.lock.v2.GetCommitTimestampResponse; import com.palantir.lock.v2.LockRequest; import com.palantir.lock.v2.LockResponse; import com.palantir.lock.v2.LockToken; @@ -162,6 +164,7 @@ import com.palantir.logsafe.exceptions.SafeRuntimeException; import com.palantir.logsafe.logger.SafeLogger; import com.palantir.logsafe.logger.SafeLoggerFactory; +import com.palantir.timestamp.TimestampRange; import com.palantir.tracing.CloseableTracer; import com.palantir.util.AssertUtils; import com.palantir.util.RateLimitedLogger; @@ -200,6 +203,7 @@ import java.util.function.Function; import java.util.function.LongSupplier; import java.util.function.Supplier; +import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.stream.StreamSupport; import javax.annotation.Nullable; @@ -1750,6 +1754,26 @@ public void putWithMetadata(TableReference tableRef, Map values) { + // Preconditions.checkArgument(!AtlasDbConstants.HIDDEN_TABLES.contains(tableRef)); + // markTableAsInvolvedInThisTransaction(tableRef); + + if (values.isEmpty()) { + return; + } + + numWriters.incrementAndGet(); + try { + // We need to check the status after incrementing writers to ensure that we fail if we are committing. + ensureUncommitted(); + + localWriteBuffer.putDelayed(values); + } finally { + numWriters.decrementAndGet(); + } + } + private void putWithMetadataInternal( TableReference tableRef, Map values, Map metadata) { ensureNoEmptyValues(values); @@ -1991,7 +2015,13 @@ private void commitWrites(TransactionService transactionService) { // This must happen before conflict checking, otherwise we could complete the checks and then have someone // else write underneath us before we proceed (thus missing a write/write conflict). // Timing still useful to distinguish bad lock percentiles from user-generated lock requests. - LockToken commitLocksToken = timedAndTraced("commitAcquireLocks", this::acquireLocksForCommit); + Set commitLocksToken = new HashSet<>(); + commitLocksToken.add(timedAndTraced("commitAcquireLocks", this::acquireLocksForCommit)); + + // We could technically grab the commit timestamp as part of grabbing the locks and we would probably + // call it something slightly different then. + // The idea is the same though: have an idea of "beyond this point, writes are unsettled and therefore + // should not be read" try { // Conflict checking. We can actually do this later without compromising correctness, but there is no // reason to postpone this check - we waste resources writing unnecessarily if these are going to fail. @@ -2030,9 +2060,14 @@ private void commitWrites(TransactionService transactionService) { // We must do this before we check that our locks are still valid to ensure that other transactions that // will hold these locks are sure to have start timestamps after our commit timestamp. // Timing is still useful, as this may perform operations pertaining to lock watches. - long commitTimestamp = timedAndTraced( + // TODO(jakubk): Lalala, trying to code this fast. We know commitLocksToken is empty at this point! + GetCommitTimestampResponse commitTimestampResponse = timedAndTraced( "getCommitTimestamp", - () -> timelockService.getCommitTimestamp(getStartTimestamp(), commitLocksToken)); + () -> timelockService.getCommitTimestamp( + getStartTimestamp(), Iterables.getOnlyElement(commitLocksToken))); + commitLocksToken.add( + commitTimestampResponse.immutableTimestamp().getLock()); + long commitTimestamp = commitTimestampResponse.timestamp(); commitTsForScrubbing = commitTimestamp; // Punch on commit so that if hard delete is the only thing happening on a system, @@ -2052,7 +2087,11 @@ private void commitWrites(TransactionService transactionService) { "transactionKvsValidityCheck", () -> throwIfTransactionKeyValueServiceNoLongerValid(commitTimestamp)); - // Serializable transactions need to check their reads haven't changed, by reading again at + // TODO(jakubk): Add the special writes here. This can probably run in parallel with everything else. + // TODO(jakubk): Eventually we'd fold this into some other RPC, but for now just grab some fresh + // timestamps. + timedAndTraced("writeDelayedWrites", this::writeDelayedWrites); + // commitTs + 1. This must happen before the lock check for thorough tables, because the lock check // verifies the immutable timestamp hasn't moved forward - thorough sweep might sweep a conflict out // from underneath us. @@ -2088,11 +2127,46 @@ private void commitWrites(TransactionService transactionService) { .update(localWriteBuffer.getValuesByteCount()); } finally { // Not timed because tryUnlock() is an asynchronous operation. - traced("postCommitUnlock", () -> timelockService.tryUnlock(ImmutableSet.of(commitLocksToken))); + traced("postCommitUnlock", () -> timelockService.tryUnlock(commitLocksToken)); } }); } + private void writeDelayedWrites() { + List delayedWrites = localWriteBuffer.getDelayedWrites(); + if (delayedWrites.isEmpty()) { + return; + } + TimestampRange freshTimestamps = timelockService.getFreshTimestamps(delayedWrites.size()); + AtomicLong curFreshTimestamp = new AtomicLong(freshTimestamps.getLowerBound()); + LongSupplier freshTimestampsSupplier = () -> { + long freshTimestamp = curFreshTimestamp.getAndIncrement(); + Preconditions.checkState(freshTimestamp <= freshTimestamps.getUpperBound(), "Oops"); + return freshTimestamp; + }; + Map> materializedWrites = new HashMap<>(); + for (DelayedWrite writeAction : delayedWrites) { + long freshTimestamp = freshTimestampsSupplier.getAsLong(); + writeAction.apply(freshTimestamp).forEach((tableReference, writes) -> { + Map writesForTable = + materializedWrites.computeIfAbsent(tableReference, k -> new HashMap<>()); + writesForTable.putAll(writes); + }); + } + + // This is likely not 100% reliable and will require work to actually prove out/figure out + // safe way to not drop these writes on failed transactions. + // However, this is the art of the possible and I'm sure we can figure something out. + // UPDATE: this will not work, because, at least in the default configuration, the sweepable timestamps + // value will be written twice. + // timedAndTraced( + // "writeDelayedWritesWritingToSweepQueue", + // () -> sweepQueue.enqueue(materializedWrites, getStartTimestamp())); + timedAndTraced( + "writeDelayedWritesCommitWrite", + () -> transactionKeyValueService.multiPut(materializedWrites, getStartTimestamp())); + } + private void throwIfTransactionKeyValueServiceNoLongerValid(long commitTimestamp) { if (!transactionKeyValueService.isValid(commitTimestamp)) { throw new SafeTransactionFailedRetriableException( @@ -2128,9 +2202,10 @@ protected void throwIfReadWriteConflictForSerializable(long commitTimestamp) { } private boolean hasWrites() { - return !localWriteBuffer.getLocalWrites().isEmpty() - && localWriteBuffer.getLocalWrites().values().stream() - .anyMatch(writesForTable -> !writesForTable.isEmpty()); + return !localWriteBuffer.getDelayedWrites().isEmpty() + || (!localWriteBuffer.getLocalWrites().isEmpty() + && localWriteBuffer.getLocalWrites().values().stream() + .anyMatch(writesForTable -> !writesForTable.isEmpty())); } protected boolean hasReads() { @@ -2149,7 +2224,7 @@ protected ConflictHandler getConflictHandlerForTable(TableReference tableRef) { /** * Make sure we have all the rows we are checking already locked before calling this. */ - protected void throwIfConflictOnCommit(LockToken commitLocksToken, TransactionService transactionService) + protected void throwIfConflictOnCommit(Set commitLocksToken, TransactionService transactionService) throws TransactionConflictException { for (Map.Entry> write : localWriteBuffer.getLocalWrites().entrySet()) { @@ -2163,7 +2238,7 @@ protected void throwIfWriteAlreadyCommitted( TableReference tableRef, Map writes, ConflictHandler conflictHandler, - LockToken commitLocksToken, + Set commitLocksToken, TransactionService transactionService) throws TransactionConflictException { if (writes.isEmpty() || !conflictHandler.checkWriteWriteConflicts()) { @@ -2203,7 +2278,7 @@ private void throwIfValueChangedConflict( Map writes, Set spanningWrites, Set dominatingWrites, - LockToken commitLocksToken) { + Set commitLocksToken) { Map cellToConflict = new HashMap<>(); Map cellToTs = new HashMap<>(); for (CellConflict c : Sets.union(spanningWrites, dominatingWrites)) { @@ -2548,7 +2623,8 @@ private LongSet getStartTimestampsForValues(Iterable values) { * @throws TransactionLockTimeoutException If our locks timed out while trying to commit. * @throws TransactionCommitFailedException failed when committing in a way that isn't retriable */ - private void putCommitTimestamp(long commitTimestamp, LockToken locksToken, TransactionService transactionService) + private void putCommitTimestamp( + long commitTimestamp, Set locksToken, TransactionService transactionService) throws TransactionFailedException { Preconditions.checkArgument(commitTimestamp > getStartTimestamp(), "commitTs must be greater than startTs"); try { @@ -2567,7 +2643,7 @@ private void putCommitTimestamp(long commitTimestamp, LockToken locksToken, Tran } private void handleKeyAlreadyExistsException( - long commitTs, KeyAlreadyExistsException ex, LockToken commitLocksToken) { + long commitTs, KeyAlreadyExistsException ex, Set commitLocksToken) { try { if (wasCommitSuccessful(commitTs)) { // We did actually commit successfully. This case could happen if the impl @@ -2593,7 +2669,11 @@ private void handleKeyAlreadyExistsException( .immutableTimestampLock() .map(token -> token.toSafeArg("immutableTimestampLock")) .orElseGet(() -> SafeArg.of("immutableTimestampLock", null)), - lockCheckResult.userProvidedLock().toSafeArg("commitLocksToken")); + SafeArg.of( + "commitLocksToken", + lockCheckResult.userProvidedLock().stream() + .map(LockToken::getRequestId) + .collect(Collectors.toSet()))); } } catch (TransactionFailedException e1) { throw e1; diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransactionManager.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransactionManager.java index 870a1ce961d..41f681f75c4 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransactionManager.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransactionManager.java @@ -512,6 +512,11 @@ public long getImmutableTimestamp() { return immutableTs; } + @Override + public long getCommitImmutableTimestamp() { + return timelockService.getCommitImmutableTimestamp(); + } + private void recordImmutableTimestamp(long immutableTs) { recentImmutableTs.updateAndGet(current -> Math.max(current, immutableTs)); } diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/precommit/DefaultPreCommitRequirementValidator.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/precommit/DefaultPreCommitRequirementValidator.java index 5bca6ed9770..50bdc1774bf 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/precommit/DefaultPreCommitRequirementValidator.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/precommit/DefaultPreCommitRequirementValidator.java @@ -31,6 +31,7 @@ import com.palantir.logsafe.logger.SafeLoggerFactory; import java.util.Map; import java.util.Optional; +import java.util.Set; public final class DefaultPreCommitRequirementValidator implements PreCommitRequirementValidator { private static final SafeLogger log = SafeLoggerFactory.get(DefaultPreCommitRequirementValidator.class); @@ -70,15 +71,15 @@ public void throwIfPreCommitConditionInvalidAtCommitOnWriteTransaction( } @Override - public void throwIfPreCommitRequirementsNotMet(LockToken commitLocksToken, long timestamp) { + public void throwIfPreCommitRequirementsNotMet(Set commitLocksToken, long timestamp) { throwIfImmutableTsOrCommitLocksExpired(commitLocksToken); throwIfPreCommitConditionInvalid(timestamp); } @Override - public void throwIfImmutableTsOrCommitLocksExpired(LockToken commitLocksToken) { - Optional expiredLocks = immutableTimestampLockManager.getExpiredImmutableTimestampAndCommitLocks( - Optional.ofNullable(commitLocksToken)); + public void throwIfImmutableTsOrCommitLocksExpired(Set commitLocksToken) { + Optional expiredLocks = + immutableTimestampLockManager.getExpiredImmutableTimestampAndCommitLocks(commitLocksToken); if (expiredLocks.isPresent()) { throw createDefaultTransactionLockTimeoutException(expiredLocks.get()); } diff --git a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/transaction/impl/ImmutableTimestampLockManagerTest.java b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/transaction/impl/ImmutableTimestampLockManagerTest.java index d47843ef382..9840112969c 100644 --- a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/transaction/impl/ImmutableTimestampLockManagerTest.java +++ b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/transaction/impl/ImmutableTimestampLockManagerTest.java @@ -27,7 +27,9 @@ import com.palantir.atlasdb.transaction.impl.precommit.LockValidityChecker; import com.palantir.lock.v2.LockToken; import java.util.Optional; +import java.util.Set; import java.util.UUID; +import java.util.stream.Collectors; import java.util.stream.Stream; import org.junit.jupiter.api.Assumptions; import org.junit.jupiter.api.Test; @@ -51,7 +53,8 @@ public void returnsNothingExpiredWhenAllLocksStillValid( when(validityChecker.getStillValidLockTokens(anySet())) .thenAnswer(invocation -> invocation.getArguments()[0]); - assertThat(immutableTimestampLockManager.getExpiredImmutableTimestampAndCommitLocks(userCommitLock)) + assertThat(immutableTimestampLockManager.getExpiredImmutableTimestampAndCommitLocks( + userCommitLock.stream().collect(Collectors.toSet()))) .isEmpty(); } @@ -67,12 +70,12 @@ public void returnsLocksThatWereCheckedWhenGettingFullSummaryAndStillValid( .thenAnswer(invocation -> invocation.getArguments()[0]); assertThat(immutableTimestampLockManager.getExpiredImmutableTimestampAndCommitLocksWithFullSummary( - DEFAULT_COMMIT_LOCK_TOKEN)) + Set.of(DEFAULT_COMMIT_LOCK_TOKEN))) .satisfies(summarizedLockCheckResult -> { assertThat(summarizedLockCheckResult.expiredLocks()).isEmpty(); assertThat(summarizedLockCheckResult.immutableTimestampLock()) .isEqualTo(immutableTimestampLock); - assertThat(summarizedLockCheckResult.userProvidedLock()).isEqualTo(DEFAULT_COMMIT_LOCK_TOKEN); + assertThat(summarizedLockCheckResult.userProvidedLock()).containsExactly(DEFAULT_COMMIT_LOCK_TOKEN); }); } @@ -90,10 +93,12 @@ public void returnsExpiredLocksWhenLocksNoLongerValid( when(validityChecker.getStillValidLockTokens(anySet())).thenReturn(ImmutableSet.of()); - assertThat(immutableTimestampLockManager.getExpiredImmutableTimestampAndCommitLocks(userCommitLock)) + assertThat(immutableTimestampLockManager.getExpiredImmutableTimestampAndCommitLocks( + userCommitLock.stream().collect(Collectors.toSet()))) .hasValueSatisfying(expiredLocks -> { assertThat(expiredLocks.errorDescription()).contains(immutableTimestampLock.toString()); - assertThat(expiredLocks.errorDescription()).contains(userCommitLock.toString()); + userCommitLock.ifPresent(lockToken -> + assertThat(expiredLocks.errorDescription()).contains(lockToken.toString())); }); } @@ -108,12 +113,12 @@ public void returnsLocksThatWereCheckedWhenGettingFullSummaryAndExpired( when(validityChecker.getStillValidLockTokens(anySet())).thenReturn(ImmutableSet.of()); assertThat(immutableTimestampLockManager.getExpiredImmutableTimestampAndCommitLocksWithFullSummary( - DEFAULT_COMMIT_LOCK_TOKEN)) + Set.of(DEFAULT_COMMIT_LOCK_TOKEN))) .satisfies(summarizedLockCheckResult -> { assertThat(summarizedLockCheckResult.expiredLocks()).isPresent(); assertThat(summarizedLockCheckResult.immutableTimestampLock()) .isEqualTo(immutableTimestampLock); - assertThat(summarizedLockCheckResult.userProvidedLock()).isEqualTo(DEFAULT_COMMIT_LOCK_TOKEN); + assertThat(summarizedLockCheckResult.userProvidedLock()).containsExactly(DEFAULT_COMMIT_LOCK_TOKEN); }); } @@ -127,7 +132,7 @@ public void throwsIfOnlyCommitLockExpiredWhenCheckingBoth() { .thenReturn(ImmutableSet.of(DEFAULT_IMMUTABLE_TIMESTAMP_LOCK_TOKEN)); assertThat(immutableTimestampLockManager.getExpiredImmutableTimestampAndCommitLocks( - Optional.of(DEFAULT_COMMIT_LOCK_TOKEN))) + Set.of(DEFAULT_COMMIT_LOCK_TOKEN))) .hasValueSatisfying(expiredLocks -> { // This is a bit fragile, but emphasising readability here assertThat(expiredLocks.errorDescription()) @@ -144,7 +149,7 @@ public void throwsIfOnlyImmutableTimestampLockExpiredWhenCheckingBoth() { when(validityChecker.getStillValidLockTokens(anySet())).thenReturn(ImmutableSet.of(DEFAULT_COMMIT_LOCK_TOKEN)); assertThat(immutableTimestampLockManager.getExpiredImmutableTimestampAndCommitLocks( - Optional.of(DEFAULT_COMMIT_LOCK_TOKEN))) + Set.of(DEFAULT_COMMIT_LOCK_TOKEN))) .hasValueSatisfying(expiredLocks -> { // This is a bit fragile, but emphasising readability here assertThat(expiredLocks.errorDescription()) @@ -159,7 +164,7 @@ public void doesNotCallLockRefresherIfNothingToCheck() { ImmutableTimestampLockManager immutableTimestampLockManager = new ImmutableTimestampLockManager(Optional.empty(), validityChecker); - assertThat(immutableTimestampLockManager.getExpiredImmutableTimestampAndCommitLocks(Optional.empty())) + assertThat(immutableTimestampLockManager.getExpiredImmutableTimestampAndCommitLocks(Set.of())) .isEmpty(); verify(validityChecker, never()).getStillValidLockTokens(anySet()); } diff --git a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/transaction/impl/precommit/DefaultPreCommitRequirementValidatorTest.java b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/transaction/impl/precommit/DefaultPreCommitRequirementValidatorTest.java index cedebaf7ac3..0f7fd697aa2 100644 --- a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/transaction/impl/precommit/DefaultPreCommitRequirementValidatorTest.java +++ b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/transaction/impl/precommit/DefaultPreCommitRequirementValidatorTest.java @@ -20,6 +20,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.nullable; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -37,6 +38,7 @@ import com.palantir.lock.v2.LockToken; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.UUID; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -57,7 +59,7 @@ public final class DefaultPreCommitRequirementValidatorTest { PtBytes.toBytes("two"), Cell.create(PtBytes.toBytes("suggestion"), PtBytes.toBytes("moot")), PtBytes.toBytes("three"))); - private static final LockToken LOCK_TOKEN = LockToken.of(UUID.randomUUID()); + private static final Set LOCK_TOKEN = Set.of(LockToken.of(UUID.randomUUID())); @Mock private PreCommitCondition userPreCommitCondition; @@ -105,10 +107,10 @@ public void throwIfPreCommitConditionInvalidAtCommitOnWriteTransactionPropagates @Test public void throwIfImmutableTsOrCommitLocksExpiredDelegatesEmptyRequestToLockManager() { - when(immutableTimestampLockManager.getExpiredImmutableTimestampAndCommitLocks(any())) + when(immutableTimestampLockManager.getExpiredImmutableTimestampAndCommitLocks(nullable(Set.class))) .thenReturn(Optional.empty()); validator.throwIfImmutableTsOrCommitLocksExpired(null); - verify(immutableTimestampLockManager).getExpiredImmutableTimestampAndCommitLocks(Optional.empty()); + verify(immutableTimestampLockManager).getExpiredImmutableTimestampAndCommitLocks(null); } @Test @@ -116,7 +118,7 @@ public void throwIfImmutableTsOrCommitLocksExpiredDelegatesRequestWithLockTokenT when(immutableTimestampLockManager.getExpiredImmutableTimestampAndCommitLocks(any())) .thenReturn(Optional.empty()); validator.throwIfImmutableTsOrCommitLocksExpired(LOCK_TOKEN); - verify(immutableTimestampLockManager).getExpiredImmutableTimestampAndCommitLocks(Optional.of(LOCK_TOKEN)); + verify(immutableTimestampLockManager).getExpiredImmutableTimestampAndCommitLocks(LOCK_TOKEN); } @Test @@ -150,6 +152,6 @@ public void throwIfPreCommitRequirementsNotMetDoesNotThrowIfNoLocksExpiredAndPre assertThatCode(() -> validator.throwIfPreCommitRequirementsNotMet(LOCK_TOKEN, TIMESTAMP)) .doesNotThrowAnyException(); verify(userPreCommitCondition).throwIfConditionInvalid(TIMESTAMP); - verify(immutableTimestampLockManager).getExpiredImmutableTimestampAndCommitLocks(Optional.of(LOCK_TOKEN)); + verify(immutableTimestampLockManager).getExpiredImmutableTimestampAndCommitLocks(LOCK_TOKEN); } } diff --git a/atlasdb-tests-shared/src/main/java/com/palantir/timelock/paxos/InMemoryNamespacedTimelockRpcClient.java b/atlasdb-tests-shared/src/main/java/com/palantir/timelock/paxos/InMemoryNamespacedTimelockRpcClient.java index fa78fd2f406..92d63b00900 100644 --- a/atlasdb-tests-shared/src/main/java/com/palantir/timelock/paxos/InMemoryNamespacedTimelockRpcClient.java +++ b/atlasdb-tests-shared/src/main/java/com/palantir/timelock/paxos/InMemoryNamespacedTimelockRpcClient.java @@ -32,6 +32,11 @@ public long getImmutableTimestamp() { return delegate.getImmutableTimestamp(); } + @Override + public long getCommitImmutableTimestamp() { + return delegate.getCommitImmutableTimestamp(); + } + @Override public long currentTimeMillis() { return delegate.currentTimeMillis(); diff --git a/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/transaction/impl/AbstractSnapshotTransactionTest.java b/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/transaction/impl/AbstractSnapshotTransactionTest.java index ed53c138309..71bef66d9cc 100644 --- a/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/transaction/impl/AbstractSnapshotTransactionTest.java +++ b/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/transaction/impl/AbstractSnapshotTransactionTest.java @@ -140,6 +140,7 @@ import com.palantir.lock.LockService; import com.palantir.lock.SimpleTimeDuration; import com.palantir.lock.TimeDuration; +import com.palantir.lock.v2.GetCommitTimestampResponse; import com.palantir.lock.v2.LockImmutableTimestampResponse; import com.palantir.lock.v2.LockResponse; import com.palantir.lock.v2.LockToken; @@ -1612,7 +1613,9 @@ public void commitDoesNotThrowIfAlreadySuccessfullyCommitted() { getSnapshotTransactionWith(spiedTimeLockService, () -> transactionTs, res, PreCommitConditions.NO_OP); when(spiedTimeLockService.getFreshTimestamp()).thenReturn(transactionTs + 1); - doReturn(transactionTs + 1).when(spiedTimeLockService).getCommitTimestamp(anyLong(), any()); + doReturn(GetCommitTimestampResponse.of(res, transactionTs + 1)) + .when(spiedTimeLockService) + .getCommitTimestamp(anyLong(), any()); // forcing to try to commit a transaction that is already committed transactionService.putUnlessExists(transactionTs, spiedTimeLockService.getFreshTimestamp()); diff --git a/atlasdb-workload-server/src/main/java/com/palantir/atlasdb/workload/store/UnreliableTimestampManager.java b/atlasdb-workload-server/src/main/java/com/palantir/atlasdb/workload/store/UnreliableTimestampManager.java index a9fb97891d8..70733ed6e12 100644 --- a/atlasdb-workload-server/src/main/java/com/palantir/atlasdb/workload/store/UnreliableTimestampManager.java +++ b/atlasdb-workload-server/src/main/java/com/palantir/atlasdb/workload/store/UnreliableTimestampManager.java @@ -19,6 +19,7 @@ import com.google.common.annotations.VisibleForTesting; import com.palantir.atlasdb.buggify.impl.DefaultNativeSamplingSecureRandomFactory; import com.palantir.lock.client.RandomizedTimestampManager; +import com.palantir.lock.v2.GetCommitTimestampResponse; import com.palantir.lock.v2.LockToken; import com.palantir.lock.v2.TimelockService; import com.palantir.logsafe.SafeArg; @@ -96,7 +97,7 @@ public long getFreshTimestamp() { } @Override - public long getCommitTimestamp(long startTs, LockToken commitLocksToken) { + public GetCommitTimestampResponse getCommitTimestamp(long startTs, LockToken commitLocksToken) { return runWithReadLock(() -> delegate.getCommitTimestamp(startTs, commitLocksToken)); } diff --git a/lock-api-objects/src/main/java/com/palantir/lock/v2/GetCommitTimestampResponse.java b/lock-api-objects/src/main/java/com/palantir/lock/v2/GetCommitTimestampResponse.java new file mode 100644 index 00000000000..1e3a2d1ae95 --- /dev/null +++ b/lock-api-objects/src/main/java/com/palantir/lock/v2/GetCommitTimestampResponse.java @@ -0,0 +1,32 @@ +/* + * (c) Copyright 2024 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.lock.v2; + +import org.immutables.value.Value; + +@Value.Immutable +public interface GetCommitTimestampResponse { + @Value.Parameter + LockImmutableTimestampResponse immutableTimestamp(); + + @Value.Parameter + long timestamp(); + + static GetCommitTimestampResponse of(LockImmutableTimestampResponse immutableTimestamp, long timestamp) { + return ImmutableGetCommitTimestampResponse.of(immutableTimestamp, timestamp); + } +} diff --git a/lock-api-objects/src/main/java/com/palantir/lock/v2/TimelockService.java b/lock-api-objects/src/main/java/com/palantir/lock/v2/TimelockService.java index 84d9529103f..c761d269738 100644 --- a/lock-api-objects/src/main/java/com/palantir/lock/v2/TimelockService.java +++ b/lock-api-objects/src/main/java/com/palantir/lock/v2/TimelockService.java @@ -42,7 +42,7 @@ default boolean isInitialized() { long getFreshTimestamp(); - long getCommitTimestamp(long startTs, LockToken commitLocksToken); + GetCommitTimestampResponse getCommitTimestamp(long startTs, LockToken commitLocksToken); TimestampRange getFreshTimestamps(@Safe @QueryParam("number") int numTimestampsRequested); @@ -76,6 +76,8 @@ default List startIdentifiedAtlasDbTr long getImmutableTimestamp(); + long getCommitImmutableTimestamp(); + LockResponse lock(LockRequest request); /** diff --git a/lock-api/src/main/java/com/palantir/lock/client/BatchingCommitTimestampGetter.java b/lock-api/src/main/java/com/palantir/lock/client/BatchingCommitTimestampGetter.java index 356b02e92dd..dc85d736003 100644 --- a/lock-api/src/main/java/com/palantir/lock/client/BatchingCommitTimestampGetter.java +++ b/lock-api/src/main/java/com/palantir/lock/client/BatchingCommitTimestampGetter.java @@ -18,11 +18,15 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Streams; +import com.google.common.primitives.Ints; import com.palantir.atlasdb.autobatch.Autobatchers; import com.palantir.atlasdb.autobatch.BatchElement; import com.palantir.atlasdb.autobatch.DisruptorAutobatcher; import com.palantir.atlasdb.futures.AtlasFutures; import com.palantir.atlasdb.timelock.api.GetCommitTimestampsResponse; +import com.palantir.common.base.Throwables; +import com.palantir.lock.v2.GetCommitTimestampResponse; +import com.palantir.lock.v2.LockImmutableTimestampResponse; import com.palantir.lock.v2.LockToken; import com.palantir.lock.watch.LockWatchCache; import com.palantir.lock.watch.LockWatchVersion; @@ -34,20 +38,22 @@ import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.LongStream; +import java.util.stream.Stream; import org.immutables.value.Value; /** * This class batches getCommitTimestamps requests to TimeLock server for a single client/namespace. - * */ + */ final class BatchingCommitTimestampGetter implements CommitTimestampGetter { - private final DisruptorAutobatcher autobatcher; + private final DisruptorAutobatcher autobatcher; - private BatchingCommitTimestampGetter(DisruptorAutobatcher autobatcher) { + private BatchingCommitTimestampGetter(DisruptorAutobatcher autobatcher) { this.autobatcher = autobatcher; } public static BatchingCommitTimestampGetter create(LockLeaseService leaseService, LockWatchCache cache) { - DisruptorAutobatcher autobatcher = Autobatchers.independent(consumer(leaseService, cache)) + DisruptorAutobatcher autobatcher = Autobatchers.independent( + consumer(leaseService, cache)) .safeLoggablePurpose("get-commit-timestamp") .batchFunctionTimeout(Duration.ofSeconds(30)) .build(); @@ -55,7 +61,7 @@ public static BatchingCommitTimestampGetter create(LockLeaseService leaseService } @Override - public long getCommitTimestamp(long startTs, LockToken commitLocksToken) { + public GetCommitTimestampResponse getCommitTimestamp(long startTs, LockToken commitLocksToken) { return AtlasFutures.getUnchecked(autobatcher.apply(ImmutableRequest.builder() .startTs(startTs) .commitLocksToken(commitLocksToken) @@ -63,10 +69,24 @@ public long getCommitTimestamp(long startTs, LockToken commitLocksToken) { } @VisibleForTesting - static Consumer>> consumer(LockLeaseService leaseService, LockWatchCache cache) { + static Consumer>> consumer( + LockLeaseService leaseService, LockWatchCache cache) { return batch -> { int count = batch.size(); - List commitTimestamps = new ArrayList<>(); + List commitTimestamps = getResponses(leaseService, cache, batch, count); + for (int i = 0; i < count; i++) { + batch.get(i).result().set(commitTimestamps.get(i)); + } + }; + } + + private static List getResponses( + LockLeaseService leaseService, + LockWatchCache cache, + List> batch, + int count) { + List commitTimestamps = new ArrayList<>(); + try { while (commitTimestamps.size() < count) { Optional requestedVersion = cache.getEventCache().lastKnownVersion(); @@ -74,15 +94,31 @@ static Consumer>> consumer(LockLeaseService lea leaseService.getCommitTimestamps(requestedVersion, count - commitTimestamps.size()); commitTimestamps.addAll(process(batch.subList(commitTimestamps.size(), count), response, cache)); } - - for (int i = 0; i < count; i++) { - batch.get(i).result().set(commitTimestamps.get(i)); - } - }; + return commitTimestamps; + } catch (Throwable t) { + // Something else should cleanup the caches, that's fine. The joys of underhanded design. + // But we need to unlock the commit immutable timestamp locks. + // This method call is weird, but this is what BatchingIdentifiedAtlasDbTransactionStarter does, + // so I'll allow it. + TransactionStarterHelper.unlock( + commitTimestamps.stream() + .map(response -> response.immutableTimestamp().getLock()) + .collect(Collectors.toSet()), + leaseService); + throw Throwables.throwUncheckedException(t); + } } - private static List process( - List> requests, GetCommitTimestampsResponse response, LockWatchCache cache) { + private static List process( + List> requests, + GetCommitTimestampsResponse response, + LockWatchCache cache) { + LockToken immutableTsLock = response.getCommitImmutableTimestamp().getLock(); + long commitImmutableTs = response.getCommitImmutableTimestamp().getImmutableTimestamp(); + Stream immutableTsAndLocks = LockTokenShare.share( + immutableTsLock, + Ints.checkedCast(response.getInclusiveUpper() - response.getInclusiveLower() + 1)) + .map(tokenShare -> LockImmutableTimestampResponse.of(commitImmutableTs, tokenShare)); List timestamps = LongStream.rangeClosed(response.getInclusiveLower(), response.getInclusiveUpper()) .boxed() .collect(Collectors.toList()); @@ -94,7 +130,8 @@ private static List process( .build()) .collect(Collectors.toList()); cache.processCommitTimestampsUpdate(transactionUpdates, response.getLockWatchUpdate()); - return timestamps; + return Streams.zip(immutableTsAndLocks, timestamps.stream(), GetCommitTimestampResponse::of) + .collect(Collectors.toList()); } @Override diff --git a/lock-api/src/main/java/com/palantir/lock/client/CommitTimestampGetter.java b/lock-api/src/main/java/com/palantir/lock/client/CommitTimestampGetter.java index 890e34f51c8..ddcaec21e16 100644 --- a/lock-api/src/main/java/com/palantir/lock/client/CommitTimestampGetter.java +++ b/lock-api/src/main/java/com/palantir/lock/client/CommitTimestampGetter.java @@ -16,10 +16,11 @@ package com.palantir.lock.client; +import com.palantir.lock.v2.GetCommitTimestampResponse; import com.palantir.lock.v2.LockToken; public interface CommitTimestampGetter extends AutoCloseable { - long getCommitTimestamp(long startTs, LockToken commitLocksToken); + GetCommitTimestampResponse getCommitTimestamp(long startTs, LockToken commitLocksToken); @Override void close(); diff --git a/lock-api/src/main/java/com/palantir/lock/client/LockLeaseService.java b/lock-api/src/main/java/com/palantir/lock/client/LockLeaseService.java index c732c2e253b..772764ded3b 100644 --- a/lock-api/src/main/java/com/palantir/lock/client/LockLeaseService.java +++ b/lock-api/src/main/java/com/palantir/lock/client/LockLeaseService.java @@ -126,7 +126,21 @@ GetCommitTimestampsResponse getCommitTimestamps(Optional maybe .numTimestamps(batchSize) .lastKnownVersion(ConjureLockRequests.toConjure(maybeVersion)) .build(); - return delegate.getCommitTimestamps(request); + return assignLeasedLockTokenToImmutableCommitTimestampLock(delegate.getCommitTimestamps(request)); + } + + public static GetCommitTimestampsResponse assignLeasedLockTokenToImmutableCommitTimestampLock( + GetCommitTimestampsResponse response) { + Lease lease = response.getLease(); + LeasedLockToken leasedLockToken = LeasedLockToken.of( + ConjureLockToken.of( + response.getCommitImmutableTimestamp().getLock().getRequestId()), + lease); + long immutableTs = response.getCommitImmutableTimestamp().getImmutableTimestamp(); + return GetCommitTimestampsResponse.builder() + .from(response) + .commitImmutableTimestamp(LockImmutableTimestampResponse.of(immutableTs, leasedLockToken)) + .build(); } LockResponse lock(LockRequest request) { diff --git a/lock-api/src/main/java/com/palantir/lock/client/MultiClientCommitTimestampGetter.java b/lock-api/src/main/java/com/palantir/lock/client/MultiClientCommitTimestampGetter.java index 1e4928458ad..92f480f1a5e 100644 --- a/lock-api/src/main/java/com/palantir/lock/client/MultiClientCommitTimestampGetter.java +++ b/lock-api/src/main/java/com/palantir/lock/client/MultiClientCommitTimestampGetter.java @@ -20,6 +20,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Streams; +import com.google.common.primitives.Ints; import com.palantir.atlasdb.autobatch.Autobatchers; import com.palantir.atlasdb.autobatch.BatchElement; import com.palantir.atlasdb.autobatch.DisruptorAutobatcher; @@ -28,6 +29,8 @@ import com.palantir.atlasdb.timelock.api.GetCommitTimestampsResponse; import com.palantir.atlasdb.timelock.api.Namespace; import com.palantir.common.streams.KeyedStream; +import com.palantir.lock.v2.GetCommitTimestampResponse; +import com.palantir.lock.v2.LockImmutableTimestampResponse; import com.palantir.lock.v2.LockToken; import com.palantir.lock.watch.LockWatchCache; import com.palantir.lock.watch.LockWatchStateUpdate; @@ -43,24 +46,27 @@ import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.LongStream; +import java.util.stream.Stream; import org.immutables.value.Value; public final class MultiClientCommitTimestampGetter implements AutoCloseable { - private final DisruptorAutobatcher autobatcher; + private final DisruptorAutobatcher autobatcher; - private MultiClientCommitTimestampGetter(DisruptorAutobatcher autobatcher) { + private MultiClientCommitTimestampGetter( + DisruptorAutobatcher autobatcher) { this.autobatcher = autobatcher; } public static MultiClientCommitTimestampGetter create(InternalMultiClientConjureTimelockService delegate) { - DisruptorAutobatcher autobatcher = Autobatchers.independent(consumer(delegate)) + DisruptorAutobatcher autobatcher = Autobatchers.independent( + consumer(delegate)) .safeLoggablePurpose("multi-client-commit-timestamp-getter") .batchFunctionTimeout(Duration.ofSeconds(30)) .build(); return new MultiClientCommitTimestampGetter(autobatcher); } - public long getCommitTimestamp( + public GetCommitTimestampResponse getCommitTimestamp( Namespace namespace, long startTs, LockToken commitLocksToken, LockWatchCache cache) { return AtlasFutures.getUnchecked(autobatcher.apply(ImmutableNamespacedRequest.builder() .namespace(namespace) @@ -71,12 +77,15 @@ public long getCommitTimestamp( } @VisibleForTesting - static Consumer>> consumer( + static Consumer>> consumer( InternalMultiClientConjureTimelockService delegate) { return batch -> { BatchStateManager batchStateManager = BatchStateManager.createFromRequestBatch(batch); while (batchStateManager.hasPendingRequests()) { - batchStateManager.processResponse(delegate.getCommitTimestamps(batchStateManager.getRequests())); + batchStateManager.processResponse( + KeyedStream.stream(delegate.getCommitTimestamps(batchStateManager.getRequests())) + .map(LockLeaseService::assignLeasedLockTokenToImmutableCommitTimestampLock) + .collectToMap()); } }; } @@ -88,10 +97,11 @@ private BatchStateManager(Map requestMap this.requestMap = requestMap; } - static BatchStateManager createFromRequestBatch(List> batch) { + static BatchStateManager createFromRequestBatch( + List> batch) { Map requestMap = new HashMap<>(); - for (BatchElement elem : batch) { + for (BatchElement elem : batch) { NamespacedRequest argument = elem.argument(); Namespace namespace = argument.namespace(); NamespacedBatchStateManager namespacedBatchStateManager = requestMap.computeIfAbsent( @@ -120,7 +130,7 @@ private void processResponse(Map respons } private static final class NamespacedBatchStateManager { - private final Queue> pendingRequestQueue; + private final Queue> pendingRequestQueue; private final LockWatchCache cache; private Optional lastKnownVersion; @@ -134,7 +144,7 @@ private boolean hasPendingRequests() { return !pendingRequestQueue.isEmpty(); } - private void addRequest(BatchElement elem) { + private void addRequest(BatchElement elem) { pendingRequestQueue.add(elem); } @@ -151,22 +161,35 @@ private Optional updateAndGetLastKnownVersion() { } private void serviceRequests(GetCommitTimestampsResponse commitTimestampsResponse) { - List commitTimestamps = getCommitTimestampValues(commitTimestampsResponse); + LockToken immutableTsLock = + commitTimestampsResponse.getCommitImmutableTimestamp().getLock(); + long immutableTs = + commitTimestampsResponse.getCommitImmutableTimestamp().getImmutableTimestamp(); + Stream immutableTsAndLocks = LockTokenShare.share( + immutableTsLock, + Ints.checkedCast(commitTimestampsResponse.getInclusiveUpper() + - commitTimestampsResponse.getInclusiveLower() + + 1)) + .map(tokenShare -> LockImmutableTimestampResponse.of(immutableTs, tokenShare)); + List timestamps = LongStream.rangeClosed( + commitTimestampsResponse.getInclusiveLower(), commitTimestampsResponse.getInclusiveUpper()) + .boxed() + .collect(Collectors.toList()); + List commitTimestamps = Streams.zip( + immutableTsAndLocks, timestamps.stream(), GetCommitTimestampResponse::of) + .collect(Collectors.toList()); - processLockWatchUpdate(commitTimestamps, commitTimestampsResponse.getLockWatchUpdate()); + processLockWatchUpdate( + commitTimestamps.stream() + .map(GetCommitTimestampResponse::timestamp) + .collect(Collectors.toList()), + commitTimestampsResponse.getLockWatchUpdate()); - for (Long commitTimestamp : commitTimestamps) { + for (GetCommitTimestampResponse commitTimestamp : commitTimestamps) { pendingRequestQueue.poll().result().set(commitTimestamp); } } - private List getCommitTimestampValues(GetCommitTimestampsResponse commitTimestampsResponse) { - return LongStream.rangeClosed( - commitTimestampsResponse.getInclusiveLower(), commitTimestampsResponse.getInclusiveUpper()) - .boxed() - .collect(Collectors.toList()); - } - private void processLockWatchUpdate(List timestamps, LockWatchStateUpdate lockWatchUpdate) { List transactionUpdates = Streams.zip( timestamps.stream(), diff --git a/lock-api/src/main/java/com/palantir/lock/client/NamespacedCommitTimestampGetter.java b/lock-api/src/main/java/com/palantir/lock/client/NamespacedCommitTimestampGetter.java index 08a7f4b3ec7..76d14dba5e9 100644 --- a/lock-api/src/main/java/com/palantir/lock/client/NamespacedCommitTimestampGetter.java +++ b/lock-api/src/main/java/com/palantir/lock/client/NamespacedCommitTimestampGetter.java @@ -17,6 +17,7 @@ package com.palantir.lock.client; import com.palantir.atlasdb.timelock.api.Namespace; +import com.palantir.lock.v2.GetCommitTimestampResponse; import com.palantir.lock.v2.LockToken; import com.palantir.lock.watch.LockWatchCache; @@ -35,7 +36,7 @@ public NamespacedCommitTimestampGetter( } @Override - public long getCommitTimestamp(long startTs, LockToken commitLocksToken) { + public GetCommitTimestampResponse getCommitTimestamp(long startTs, LockToken commitLocksToken) { return referenceTrackingBatcher.getDelegate().getCommitTimestamp(namespace, startTs, commitLocksToken, cache); } diff --git a/lock-api/src/main/java/com/palantir/lock/client/ProfilingTimelockService.java b/lock-api/src/main/java/com/palantir/lock/client/ProfilingTimelockService.java index bfa07884c79..ad1c62c0a8b 100644 --- a/lock-api/src/main/java/com/palantir/lock/client/ProfilingTimelockService.java +++ b/lock-api/src/main/java/com/palantir/lock/client/ProfilingTimelockService.java @@ -21,6 +21,7 @@ import com.google.common.util.concurrent.RateLimiter; import com.palantir.common.base.Throwables; import com.palantir.lock.v2.ClientLockingOptions; +import com.palantir.lock.v2.GetCommitTimestampResponse; import com.palantir.lock.v2.LockImmutableTimestampResponse; import com.palantir.lock.v2.LockRequest; import com.palantir.lock.v2.LockResponse; @@ -50,11 +51,11 @@ * operation every {@link ProfilingTimelockService#LOGGING_TIME_WINDOW}. It will log what the longest operation that * completed in the past {@link ProfilingTimelockService#LOGGING_TIME_WINDOW} window was, and how long it took. If no * operation took longer than {@link ProfilingTimelockService#SLOW_THRESHOLD} it will not log. - * + *

* The {@link ProfilingTimelockService} does not cover specific operations which are at risk for taking long * times owing to contention - in particular, this includes {@link #lock(LockRequest)} and * {@link #waitForLocks(WaitForLocksRequest)}. - * + *

* Profiling is done explicitly at this level (and not at {@link com.palantir.lock.v2.TimelockRpcClient} level) * to reflect the impact of timelock operations and cluster state changes (e.g. leader elections, rolling bounces) on * clients. @@ -102,7 +103,7 @@ public long getFreshTimestamp() { } @Override - public long getCommitTimestamp(long startTs, LockToken commitLocksToken) { + public GetCommitTimestampResponse getCommitTimestamp(long startTs, LockToken commitLocksToken) { return runTaskTimed("getCommitTimestamp", () -> delegate.getCommitTimestamp(startTs, commitLocksToken)); } @@ -128,6 +129,11 @@ public long getImmutableTimestamp() { return runTaskTimed("getImmutableTimestamp", delegate::getImmutableTimestamp); } + @Override + public long getCommitImmutableTimestamp() { + return runTaskTimed("getCommitImmutableTimestamp", delegate::getCommitImmutableTimestamp); + } + @Override public LockResponse lock(LockRequest request) { // Don't profile this, as it may be skewed by user contention on locks. diff --git a/lock-api/src/main/java/com/palantir/lock/client/RandomizedTimestampManager.java b/lock-api/src/main/java/com/palantir/lock/client/RandomizedTimestampManager.java index 09638018012..8d30b6560d8 100644 --- a/lock-api/src/main/java/com/palantir/lock/client/RandomizedTimestampManager.java +++ b/lock-api/src/main/java/com/palantir/lock/client/RandomizedTimestampManager.java @@ -16,6 +16,7 @@ package com.palantir.lock.client; +import com.palantir.lock.v2.GetCommitTimestampResponse; import com.palantir.lock.v2.LockToken; import com.palantir.timestamp.TimestampRange; @@ -28,7 +29,7 @@ public interface RandomizedTimestampManager { long getFreshTimestamp(); - long getCommitTimestamp(long startTs, LockToken commitLocksToken); + GetCommitTimestampResponse getCommitTimestamp(long startTs, LockToken commitLocksToken); TimestampRange getFreshTimestamps(int numTimestampsRequested); } diff --git a/lock-api/src/main/java/com/palantir/lock/client/RemoteTimelockServiceAdapter.java b/lock-api/src/main/java/com/palantir/lock/client/RemoteTimelockServiceAdapter.java index 1db5f4894ae..91fd3dba25f 100644 --- a/lock-api/src/main/java/com/palantir/lock/client/RemoteTimelockServiceAdapter.java +++ b/lock-api/src/main/java/com/palantir/lock/client/RemoteTimelockServiceAdapter.java @@ -21,6 +21,7 @@ import com.palantir.atlasdb.timelock.api.ConjureTimestampRange; import com.palantir.atlasdb.timelock.api.Namespace; import com.palantir.lock.v2.ClientLockingOptions; +import com.palantir.lock.v2.GetCommitTimestampResponse; import com.palantir.lock.v2.LockImmutableTimestampResponse; import com.palantir.lock.v2.LockRequest; import com.palantir.lock.v2.LockResponse; @@ -102,7 +103,7 @@ public long getFreshTimestamp() { } @Override - public long getCommitTimestamp(long startTs, LockToken commitLocksToken) { + public GetCommitTimestampResponse getCommitTimestamp(long startTs, LockToken commitLocksToken) { return commitTimestampGetter.getCommitTimestamp(startTs, commitLocksToken); } @@ -119,6 +120,11 @@ public long getImmutableTimestamp() { return rpcClient.getImmutableTimestamp(); } + @Override + public long getCommitImmutableTimestamp() { + return rpcClient.getCommitImmutableTimestamp(); + } + @Override public WaitForLocksResponse waitForLocks(WaitForLocksRequest request) { return lockLeaseService.waitForLocks(request); diff --git a/lock-api/src/main/java/com/palantir/lock/client/TimeLockClient.java b/lock-api/src/main/java/com/palantir/lock/client/TimeLockClient.java index fd5b31c1b0e..3c2ea9433b6 100644 --- a/lock-api/src/main/java/com/palantir/lock/client/TimeLockClient.java +++ b/lock-api/src/main/java/com/palantir/lock/client/TimeLockClient.java @@ -22,6 +22,7 @@ import com.palantir.common.concurrent.PTExecutors; import com.palantir.leader.NotCurrentLeaderException; import com.palantir.lock.v2.ClientLockingOptions; +import com.palantir.lock.v2.GetCommitTimestampResponse; import com.palantir.lock.v2.LockImmutableTimestampResponse; import com.palantir.lock.v2.LockLeaseRefresher; import com.palantir.lock.v2.LockRequest; @@ -102,7 +103,7 @@ public long getFreshTimestamp() { } @Override - public long getCommitTimestamp(long startTs, LockToken commitLocksToken) { + public GetCommitTimestampResponse getCommitTimestamp(long startTs, LockToken commitLocksToken) { return delegate.getCommitTimestamp(startTs, commitLocksToken); } @@ -139,6 +140,11 @@ public long getImmutableTimestamp() { return executeOnTimeLock(delegate::getImmutableTimestamp); } + @Override + public long getCommitImmutableTimestamp() { + return executeOnTimeLock(delegate::getCommitImmutableTimestamp); + } + @Override public LockResponse lock(LockRequest request) { return lock(request, ClientLockingOptions.getDefault()); diff --git a/lock-api/src/main/java/com/palantir/lock/client/UnreliableTimeLockService.java b/lock-api/src/main/java/com/palantir/lock/client/UnreliableTimeLockService.java index e45252af686..4107202417a 100644 --- a/lock-api/src/main/java/com/palantir/lock/client/UnreliableTimeLockService.java +++ b/lock-api/src/main/java/com/palantir/lock/client/UnreliableTimeLockService.java @@ -20,6 +20,7 @@ import com.palantir.atlasdb.buggify.api.BuggifyFactory; import com.palantir.atlasdb.buggify.impl.DefaultBuggifyFactory; import com.palantir.lock.v2.ClientLockingOptions; +import com.palantir.lock.v2.GetCommitTimestampResponse; import com.palantir.lock.v2.LockImmutableTimestampResponse; import com.palantir.lock.v2.LockRequest; import com.palantir.lock.v2.LockResponse; @@ -76,7 +77,7 @@ public long getFreshTimestamp() { } @Override - public long getCommitTimestamp(long startTs, LockToken commitLocksToken) { + public GetCommitTimestampResponse getCommitTimestamp(long startTs, LockToken commitLocksToken) { maybeRandomlyIncreaseTimestamp(); return timestampManager.getCommitTimestamp(startTs, commitLocksToken); } @@ -102,6 +103,12 @@ public long getImmutableTimestamp() { return delegate.getImmutableTimestamp(); } + @Override + public long getCommitImmutableTimestamp() { + // Feature not active + return Long.MAX_VALUE; + } + @Override public LockResponse lock(LockRequest request) { LockResponse response = delegate.lock(request); diff --git a/lock-api/src/main/java/com/palantir/lock/v2/DefaultNamespacedTimelockRpcClient.java b/lock-api/src/main/java/com/palantir/lock/v2/DefaultNamespacedTimelockRpcClient.java index 3335a2d3549..f7f0e415173 100644 --- a/lock-api/src/main/java/com/palantir/lock/v2/DefaultNamespacedTimelockRpcClient.java +++ b/lock-api/src/main/java/com/palantir/lock/v2/DefaultNamespacedTimelockRpcClient.java @@ -30,6 +30,11 @@ public long getImmutableTimestamp() { return timelockRpcClient.getImmutableTimestamp(namespace); } + @Override + public long getCommitImmutableTimestamp() { + return timelockRpcClient.getCommitImmutableTimestamp(namespace); + } + @Override public long currentTimeMillis() { return timelockRpcClient.currentTimeMillis(namespace); diff --git a/lock-api/src/main/java/com/palantir/lock/v2/NamespacedTimelockRpcClient.java b/lock-api/src/main/java/com/palantir/lock/v2/NamespacedTimelockRpcClient.java index ac8d4c5f15c..6f8622fd0e7 100644 --- a/lock-api/src/main/java/com/palantir/lock/v2/NamespacedTimelockRpcClient.java +++ b/lock-api/src/main/java/com/palantir/lock/v2/NamespacedTimelockRpcClient.java @@ -23,5 +23,7 @@ public interface NamespacedTimelockRpcClient { long getImmutableTimestamp(); + long getCommitImmutableTimestamp(); + long currentTimeMillis(); } diff --git a/lock-api/src/main/java/com/palantir/lock/v2/TimelockRpcClient.java b/lock-api/src/main/java/com/palantir/lock/v2/TimelockRpcClient.java index c242825cc74..d1f39e491a2 100644 --- a/lock-api/src/main/java/com/palantir/lock/v2/TimelockRpcClient.java +++ b/lock-api/src/main/java/com/palantir/lock/v2/TimelockRpcClient.java @@ -40,6 +40,10 @@ public interface TimelockRpcClient { @Path("immutable-timestamp") long getImmutableTimestamp(@PathParam("namespace") String namespace); + @POST + @Path("commit-immutable-timestamp") + long getCommitImmutableTimestamp(@PathParam("namespace") String namespace); + @POST @Path("current-time-millis") long currentTimeMillis(@PathParam("namespace") String namespace); diff --git a/lock-api/src/test/java/com/palantir/lock/client/BatchingCommitTimestampGetterTest.java b/lock-api/src/test/java/com/palantir/lock/client/BatchingCommitTimestampGetterTest.java index 7c787703f79..039bb9d46df 100644 --- a/lock-api/src/test/java/com/palantir/lock/client/BatchingCommitTimestampGetterTest.java +++ b/lock-api/src/test/java/com/palantir/lock/client/BatchingCommitTimestampGetterTest.java @@ -31,7 +31,13 @@ import com.palantir.atlasdb.autobatch.BatchElement; import com.palantir.atlasdb.autobatch.DisruptorAutobatcher; import com.palantir.atlasdb.timelock.api.GetCommitTimestampsResponse; +import com.palantir.common.time.NanoTime; import com.palantir.lock.StringLockDescriptor; +import com.palantir.lock.v2.GetCommitTimestampResponse; +import com.palantir.lock.v2.LeaderTime; +import com.palantir.lock.v2.LeadershipId; +import com.palantir.lock.v2.Lease; +import com.palantir.lock.v2.LockImmutableTimestampResponse; import com.palantir.lock.v2.LockToken; import com.palantir.lock.watch.LockEvent; import com.palantir.lock.watch.LockWatchCache; @@ -42,11 +48,13 @@ import com.palantir.lock.watch.LockWatchVersion; import com.palantir.lock.watch.TransactionUpdate; import com.palantir.tritium.metrics.registry.DefaultTaggedMetricRegistry; +import java.time.Duration; import java.util.Arrays; import java.util.List; import java.util.Optional; import java.util.UUID; import java.util.function.Consumer; +import java.util.stream.Collectors; import org.junit.jupiter.api.Test; import org.mockito.InOrder; import org.mockito.Mockito; @@ -68,13 +76,19 @@ public final class BatchingCommitTimestampGetterTest { private static final Optional IDENTIFIED_VERSION_1 = Optional.empty(); private static final Optional IDENTIFIED_VERSION_2 = Optional.of(LockWatchVersion.of(UUID.randomUUID(), -1)); + private static final LeadershipId LEADERSHIP_ID = LeadershipId.random(); + private static final LeaderTime LEADER_TIME = LeaderTime.of(LEADERSHIP_ID, NanoTime.createForTests(1L)); + private static final Lease LEASE = Lease.of(LEADER_TIME, Duration.ofSeconds(977)); + private static final LockToken COMMIT_TS_LOCK_TOKEN = mock(LockToken.class); + private static final LockImmutableTimestampResponse COMMIT_TS_RESPONSE = + LockImmutableTimestampResponse.of(1L, COMMIT_TS_LOCK_TOKEN); private final LockLeaseService lockLeaseService = mock(LockLeaseService.class); private final LockWatchEventCache eventCache = mock(LockWatchEventCache.class); private final LockWatchValueCache valueCache = mock(LockWatchValueCache.class); private final LockWatchCache cache = spy(new LockWatchCacheImpl(eventCache, valueCache)); - private final Consumer>> batchProcessor = - BatchingCommitTimestampGetter.consumer(lockLeaseService, cache); + private final Consumer>> + batchProcessor = BatchingCommitTimestampGetter.consumer(lockLeaseService, cache); @Test public void consumerFillsTheWholeBatch() { @@ -87,7 +101,10 @@ public void consumerFillsTheWholeBatch() { whenGetCommitTimestamps(IDENTIFIED_VERSION_1, 4, 5, 6, UPDATE_1); whenGetCommitTimestamps(IDENTIFIED_VERSION_2, 2, 7, 8, UPDATE_2); - assertThat(processBatch(request1, request2, request3, request4)).containsExactly(5L, 6L, 7L, 8L); + assertThat(processBatch(request1, request2, request3, request4).stream() + .map(GetCommitTimestampResponse::timestamp) + .collect(Collectors.toList())) + .containsExactly(5L, 6L, 7L, 8L); InOrder inOrder = Mockito.inOrder(lockLeaseService, cache); inOrder.verify(lockLeaseService).getCommitTimestamps(IDENTIFIED_VERSION_1, 4); @@ -109,16 +126,22 @@ private void whenGetCommitTimestamps( .inclusiveLower(start) .inclusiveUpper(end) .lockWatchUpdate(update) + .commitImmutableTimestamp(COMMIT_TS_RESPONSE) + .lease(LEASE) .build()); } - private List processBatch(BatchingCommitTimestampGetter.Request... requests) { - List> elements = Arrays.stream(requests) - .map(request -> ImmutableTestBatchElement.builder() - .argument(request) - .result(new DisruptorAutobatcher.DisruptorFuture<>( - AutobatcherTelemetryComponents.create("test", new DefaultTaggedMetricRegistry()))) - .build()) + private List processBatch(BatchingCommitTimestampGetter.Request... requests) { + List> elements = Arrays.stream( + requests) + .map(request -> + ImmutableTestBatchElement + .builder() + .argument(request) + .result(new DisruptorAutobatcher.DisruptorFuture<>( + AutobatcherTelemetryComponents.create( + "test", new DefaultTaggedMetricRegistry()))) + .build()) .collect(toList()); batchProcessor.accept(elements); return Futures.getUnchecked(Futures.allAsList(Lists.transform(elements, BatchElement::result))); diff --git a/lock-api/src/test/java/com/palantir/lock/client/MultiClientCommitTimestampGetterTest.java b/lock-api/src/test/java/com/palantir/lock/client/MultiClientCommitTimestampGetterTest.java index 4f425f8cfe4..0e5266651ae 100644 --- a/lock-api/src/test/java/com/palantir/lock/client/MultiClientCommitTimestampGetterTest.java +++ b/lock-api/src/test/java/com/palantir/lock/client/MultiClientCommitTimestampGetterTest.java @@ -40,13 +40,20 @@ import com.palantir.atlasdb.timelock.api.GetCommitTimestampsResponse; import com.palantir.atlasdb.timelock.api.Namespace; import com.palantir.common.streams.KeyedStream; +import com.palantir.common.time.NanoTime; import com.palantir.lock.client.MultiClientCommitTimestampGetter.NamespacedRequest; +import com.palantir.lock.v2.GetCommitTimestampResponse; +import com.palantir.lock.v2.LeaderTime; +import com.palantir.lock.v2.LeadershipId; +import com.palantir.lock.v2.Lease; +import com.palantir.lock.v2.LockImmutableTimestampResponse; import com.palantir.lock.v2.LockToken; import com.palantir.lock.watch.LockWatchCache; import com.palantir.lock.watch.LockWatchCacheImpl; import com.palantir.lock.watch.LockWatchStateUpdate; import com.palantir.logsafe.exceptions.SafeIllegalStateException; import com.palantir.tritium.metrics.registry.DefaultTaggedMetricRegistry; +import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -61,6 +68,12 @@ public class MultiClientCommitTimestampGetterTest { private static final int COMMIT_TS_LIMIT_PER_REQUEST = 5; private static final SafeIllegalStateException EXCEPTION = new SafeIllegalStateException("Something went wrong!"); + private static final LeadershipId LEADERSHIP_ID = LeadershipId.random(); + private static final LeaderTime LEADER_TIME = LeaderTime.of(LEADERSHIP_ID, NanoTime.createForTests(1L)); + private static final Lease LEASE = Lease.of(LEADER_TIME, Duration.ofSeconds(977)); + private static final LockToken COMMIT_TS_LOCK_TOKEN = LockToken.of(UUID.randomUUID()); + private static final LockImmutableTimestampResponse COMMIT_TS_RESPONSE = + LockImmutableTimestampResponse.of(1L, COMMIT_TS_LOCK_TOKEN); private final Map lowestStartTsMap = new HashMap<>(); private final Map lockWatchCacheMap = new HashMap<>(); @@ -69,7 +82,7 @@ public class MultiClientCommitTimestampGetterTest { private final InternalMultiClientConjureTimelockService timelockService = mock(InternalMultiClientConjureTimelockService.class); - private final Consumer>> consumer = + private final Consumer>> consumer = MultiClientCommitTimestampGetter.consumer(timelockService); private final LockWatchStateUpdate lockWatchStateUpdate = @@ -102,7 +115,8 @@ public void canServiceMultipleClientsWithMultipleServerCalls() { @Test public void updatesCacheWhileProcessingResponse() { Namespace client = Namespace.of("Kitty"); - List> batchElements = IntStream.range(0, COMMIT_TS_LIMIT_PER_REQUEST * 2) + List> batchElements = IntStream.range( + 0, COMMIT_TS_LIMIT_PER_REQUEST * 2) .mapToObj(ind -> batchElementForNamespace(client)) .collect(toList()); setupServiceAndAssertSanityOfResponse(batchElements); @@ -116,11 +130,13 @@ public void doesNotUpdateCacheIfClientNotServed() { Namespace alpha = Namespace.of("alpha" + UUID.randomUUID()); Namespace beta = Namespace.of("beta" + UUID.randomUUID()); - BatchElement requestForAlpha = batchElementForNamespace(alpha); - BatchElement requestForBeta = batchElementForNamespace(beta); + BatchElement requestForAlpha = batchElementForNamespace(alpha); + BatchElement requestForBeta = batchElementForNamespace(beta); - List> allRequests = ImmutableList.of(requestForAlpha, requestForBeta); - List> alphaRequestList = ImmutableList.of(requestForAlpha); + List> allRequests = + ImmutableList.of(requestForAlpha, requestForBeta); + List> alphaRequestList = + ImmutableList.of(requestForAlpha); Map responseMap = getCommitTimestamps(alphaRequestList); when(timelockService.getCommitTimestamps(any())).thenReturn(responseMap).thenThrow(EXCEPTION); @@ -141,7 +157,8 @@ public void doesNotUpdateCacheIfClientNotServed() { verify(betaCache, never()).processCommitTimestampsUpdate(any(), any()); } - private void setupServiceAndAssertSanityOfResponse(List> batch) { + private void setupServiceAndAssertSanityOfResponse( + List> batch) { Map> expectedResponseMap = new HashMap<>(); when(timelockService.getCommitTimestamps(any())).thenAnswer(invocation -> { @@ -160,13 +177,13 @@ private void setupServiceAndAssertSanityOfResponse(List> batch, + List> batch, Map> expectedResponseMap) { assertThat(batch.stream().filter(elem -> !elem.result().isDone()).collect(Collectors.toSet())) .as("All requests must be served") .isEmpty(); - Map> partitionedResponseMap = batch.stream() + Map> partitionedResponseMap = batch.stream() .collect(groupingBy( elem -> elem.argument().namespace(), Collectors.mapping(elem -> Futures.getUnchecked(elem.result()), toList()))); @@ -177,14 +194,18 @@ private void assertSanityOfResponse( private static void assertCorrectnessOfCompletedRequests( Map> expectedResponseMap, - Map> partitionedResponseMap) { + Map> partitionedResponseMap) { KeyedStream.stream(partitionedResponseMap) .forEach((namespace, commitTsList) -> assertCorrectnessOfServedTimestamps(expectedResponseMap.get(namespace), commitTsList)); } private static void assertCorrectnessOfServedTimestamps( - List expectedCommitTimestampsResponses, List commitTsList) { + List expectedCommitTimestampsResponses, + List responsesList) { + List commitTsList = responsesList.stream() + .map(GetCommitTimestampResponse::timestamp) + .collect(Collectors.toList()); long requestedCommitTsCount = expectedCommitTimestampsResponses.stream() .mapToLong(resp -> resp.getInclusiveUpper() - resp.getInclusiveLower() + 1) .sum(); @@ -202,8 +223,8 @@ private static void assertCorrectnessOfServedTimestamps( } private Map getCommitTimestamps( - List> batch) { - Map>> partitionedRequests = + List> batch) { + Map>> partitionedRequests = batch.stream().collect(groupingBy(elem -> elem.argument().namespace(), toList())); return getCommitTimestampResponse(KeyedStream.stream(partitionedRequests) .map(requestList -> GetCommitTimestampsRequest.builder() @@ -226,6 +247,8 @@ private Map getCommitTimestampResponse( .inclusiveLower(inclusiveLower) .inclusiveUpper(exclusiveUpper - 1) .lockWatchUpdate(lockWatchStateUpdate) + .commitImmutableTimestamp(COMMIT_TS_RESPONSE) + .lease(LEASE) .build()); }) .collectToMap(); @@ -239,15 +262,15 @@ private void updateLowerBound(Namespace namespace, long numTimestamps) { lowestStartTsMap.put(namespace, lowestStartTsMap.getOrDefault(namespace, 1L) + numTimestamps); } - private List> getCommitTimestampRequestsForClients( + private List> getCommitTimestampRequestsForClients( int clientCount, int requestCount) { - List> test = IntStream.range(0, requestCount) + List> test = IntStream.range(0, requestCount) .mapToObj(ind -> batchElementForNamespace(Namespace.of("Test_" + (ind % clientCount)))) .collect(Collectors.toList()); return test; } - private BatchElement batchElementForNamespace(Namespace namespace) { + private BatchElement batchElementForNamespace(Namespace namespace) { return BatchElement.of( ImmutableNamespacedRequest.builder() .namespace(namespace) @@ -255,7 +278,7 @@ private BatchElement batchElementForNamespace(Namespace .cache(lockWatchCacheMap.computeIfAbsent(namespace, _unused -> spy(LockWatchCacheImpl.noOp()))) .commitLocksToken(lockToken) .build(), - new DisruptorFuture( + new DisruptorFuture<>( AutobatcherTelemetryComponents.create("test", new DefaultTaggedMetricRegistry()))); } } diff --git a/lock-api/src/test/java/com/palantir/lock/client/TimestampCorroboratingTimelockServiceTest.java b/lock-api/src/test/java/com/palantir/lock/client/TimestampCorroboratingTimelockServiceTest.java index 036b4a76918..3e6575218c5 100644 --- a/lock-api/src/test/java/com/palantir/lock/client/TimestampCorroboratingTimelockServiceTest.java +++ b/lock-api/src/test/java/com/palantir/lock/client/TimestampCorroboratingTimelockServiceTest.java @@ -168,7 +168,13 @@ public void startTransactionsThrowsIfSpanningBound() { @Test public void getCommitTimestampsShouldFail() { when(rawTimelockService.getCommitTimestamps(any())) - .thenReturn(GetCommitTimestampsResponse.of(1L, 3L, LOCK_WATCH_UPDATE)); + .thenReturn(GetCommitTimestampsResponse.builder() + .inclusiveLower(1L) + .inclusiveUpper(3L) + .lockWatchUpdate(LOCK_WATCH_UPDATE) + .commitImmutableTimestamp(LOCK_IMMUTABLE_TIMESTAMP_RESPONSE) + .lease(Lease.of(LeaderTime.of(LeadershipId.random(), NanoTime.now()), Duration.ZERO)) + .build()); assertThrowsOnSecondCall(() -> timelockService.getCommitTimestamps( GetCommitTimestampsRequest.of(3, ConjureIdentifiedVersion.of(UUID.randomUUID(), 3L)))); assertThat(timelockService diff --git a/lock-impl/src/main/java/com/palantir/lock/impl/LegacyTimelockService.java b/lock-impl/src/main/java/com/palantir/lock/impl/LegacyTimelockService.java index 0ff9978a39a..7ae30923948 100644 --- a/lock-impl/src/main/java/com/palantir/lock/impl/LegacyTimelockService.java +++ b/lock-impl/src/main/java/com/palantir/lock/impl/LegacyTimelockService.java @@ -25,6 +25,7 @@ import com.palantir.lock.LockService; import com.palantir.lock.SimpleTimeDuration; import com.palantir.lock.v2.ClientLockingOptions; +import com.palantir.lock.v2.GetCommitTimestampResponse; import com.palantir.lock.v2.LockImmutableTimestampResponse; import com.palantir.lock.v2.LockRequest; import com.palantir.lock.v2.LockResponse; @@ -49,12 +50,17 @@ public class LegacyTimelockService implements TimelockService { private final TimestampService timestampService; private final LockService lockService; private final LockClient immutableTsLockClient; + private final LockClient commitImmutableTsLockClient; public LegacyTimelockService( - TimestampService timestampService, LockService lockService, LockClient immutableTsLockClient) { + TimestampService timestampService, + LockService lockService, + LockClient immutableTsLockClient, + LockClient commitImmutableTsLockClient) { this.timestampService = timestampService; this.lockService = lockService; this.immutableTsLockClient = immutableTsLockClient; + this.commitImmutableTsLockClient = commitImmutableTsLockClient; } @Override @@ -68,8 +74,10 @@ public long getFreshTimestamp() { } @Override - public long getCommitTimestamp(long startTs, LockToken commitLocksToken) { - return getFreshTimestamp(); + public GetCommitTimestampResponse getCommitTimestamp(long startTs, LockToken commitLocksToken) { + LockImmutableTimestampResponse lockImmutableTimestampResponse = + getLockImmutableTimestampResponse(commitImmutableTsLockClient); + return GetCommitTimestampResponse.of(lockImmutableTimestampResponse, getFreshTimestamp()); } @Override @@ -79,39 +87,19 @@ public TimestampRange getFreshTimestamps(int numTimestampsRequested) { @Override public LockImmutableTimestampResponse lockImmutableTimestamp() { - long immutableLockTs = timestampService.getFreshTimestamp(); - LockDescriptor lockDesc = AtlasTimestampLockDescriptor.of(immutableLockTs); - com.palantir.lock.LockRequest lockRequest = com.palantir.lock.LockRequest.builder( - ImmutableSortedMap.of(lockDesc, LockMode.READ)) - .withLockedInVersionId(immutableLockTs) - .build(); - LockRefreshToken lock; - - try { - lock = lockService.lock(immutableTsLockClient.getClientId(), lockRequest); - } catch (InterruptedException e) { - throw Throwables.throwUncheckedException(e); - } - - try { - return LockImmutableTimestampResponse.of( - getImmutableTimestampInternal(immutableLockTs), LockTokenConverter.toTokenV2(lock)); - } catch (Throwable e) { - if (lock != null) { - try { - lockService.unlock(lock); - } catch (Throwable unlockThrowable) { - e.addSuppressed(unlockThrowable); - } - } - throw Throwables.rewrapAndThrowUncheckedException(e); - } + return getLockImmutableTimestampResponse(immutableTsLockClient); } @Override public long getImmutableTimestamp() { long ts = timestampService.getFreshTimestamp(); - return getImmutableTimestampInternal(ts); + return getImmutableTimestampInternal(ts, immutableTsLockClient); + } + + @Override + public long getCommitImmutableTimestamp() { + long ts = timestampService.getFreshTimestamp(); + return getImmutableTimestampInternal(ts, commitImmutableTsLockClient); } @Override @@ -181,11 +169,6 @@ public long currentTimeMillis() { return lockService.currentTimeMillis(); } - private long getImmutableTimestampInternal(long ts) { - Long minLocked = lockService.getMinLockedInVersionId(immutableTsLockClient.getClientId()); - return minLocked == null ? ts : minLocked; - } - private LockRefreshToken lockAnonymous(com.palantir.lock.LockRequest request) { try { return lockService.lock(LockClient.ANONYMOUS.getClientId(), request); @@ -202,4 +185,39 @@ private SortedMap buildLockMap(Set loc } return locks; } + + private LockImmutableTimestampResponse getLockImmutableTimestampResponse(LockClient lockClient) { + long immutableLockTs = timestampService.getFreshTimestamp(); + LockDescriptor lockDesc = AtlasTimestampLockDescriptor.of(immutableLockTs); + com.palantir.lock.LockRequest lockRequest = com.palantir.lock.LockRequest.builder( + ImmutableSortedMap.of(lockDesc, LockMode.READ)) + .withLockedInVersionId(immutableLockTs) + .build(); + LockRefreshToken lock; + + try { + lock = lockService.lock(lockClient.getClientId(), lockRequest); + } catch (InterruptedException e) { + throw Throwables.throwUncheckedException(e); + } + + try { + return LockImmutableTimestampResponse.of( + getImmutableTimestampInternal(immutableLockTs, lockClient), LockTokenConverter.toTokenV2(lock)); + } catch (Throwable e) { + if (lock != null) { + try { + lockService.unlock(lock); + } catch (Throwable unlockThrowable) { + e.addSuppressed(unlockThrowable); + } + } + throw Throwables.rewrapAndThrowUncheckedException(e); + } + } + + private long getImmutableTimestampInternal(long ts, LockClient lockClient) { + Long minLocked = lockService.getMinLockedInVersionId(lockClient.getClientId()); + return minLocked == null ? ts : minLocked; + } } diff --git a/lock-impl/src/test/java/com/palantir/lock/impl/LegacyTimelockServiceTest.java b/lock-impl/src/test/java/com/palantir/lock/impl/LegacyTimelockServiceTest.java index a291826694d..d1943f47568 100644 --- a/lock-impl/src/test/java/com/palantir/lock/impl/LegacyTimelockServiceTest.java +++ b/lock-impl/src/test/java/com/palantir/lock/impl/LegacyTimelockServiceTest.java @@ -54,6 +54,7 @@ public class LegacyTimelockServiceTest { private static final LockClient LOCK_CLIENT = LockClient.of("foo"); + private static final LockClient COMMIT_LOCK_CLIENT = LockClient.of("foo1"); private static final long FRESH_TIMESTAMP = 5L; @@ -69,7 +70,7 @@ public class LegacyTimelockServiceTest { private static final long TIMEOUT = 10_000; private final LegacyTimelockService timelock = - new LegacyTimelockService(timestampService, lockService, LOCK_CLIENT); + new LegacyTimelockService(timestampService, lockService, LOCK_CLIENT, COMMIT_LOCK_CLIENT); @BeforeEach public void before() { diff --git a/timelock-api/src/main/conjure/timelock-api.yml b/timelock-api/src/main/conjure/timelock-api.yml index bb3e8d6fbb7..77de1f936d6 100644 --- a/timelock-api/src/main/conjure/timelock-api.yml +++ b/timelock-api/src/main/conjure/timelock-api.yml @@ -204,6 +204,8 @@ types: inclusiveLower: Long inclusiveUpper: Long lockWatchUpdate: LockWatchStateUpdate + commitImmutableTimestamp: LockImmutableTimestampResponse + lease: Lease GetCommitTimestampRequest: fields: lastKnownVersion: optional diff --git a/timelock-impl/build.gradle b/timelock-impl/build.gradle index 361bcd9ef18..4875785228f 100644 --- a/timelock-impl/build.gradle +++ b/timelock-impl/build.gradle @@ -16,6 +16,7 @@ dependencies { implementation (project(":atlasdb-config")) { exclude group:'io.dropwizard' } + implementation 'com.lmax:disruptor' implementation 'com.palantir.safe-logging:safe-logging' implementation 'org.mindrot:jbcrypt' diff --git a/timelock-impl/src/main/java/com/lmax/disruptor/RingBufferLockEventStore.java b/timelock-impl/src/main/java/com/lmax/disruptor/RingBufferLockEventStore.java new file mode 100644 index 00000000000..fbbdd53004b --- /dev/null +++ b/timelock-impl/src/main/java/com/lmax/disruptor/RingBufferLockEventStore.java @@ -0,0 +1,202 @@ +/* + * (c) Copyright 2024 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.lmax.disruptor; + +import com.codahale.metrics.Counter; +import com.google.common.primitives.Ints; +import com.palantir.atlasdb.timelock.lock.watch.LockEventStore; +import com.palantir.atlasdb.timelock.lock.watch.NextEvents; +import com.palantir.atlasdb.timelock.lockwatches.BufferMetrics; +import com.palantir.lock.watch.LockEvent; +import com.palantir.lock.watch.LockRequestMetadata; +import com.palantir.lock.watch.LockWatchCreatedEvent; +import com.palantir.lock.watch.LockWatchEvent; +import com.palantir.lock.watch.UnlockEvent; +import com.palantir.logsafe.Preconditions; +import com.palantir.logsafe.exceptions.SafeIllegalStateException; +import com.palantir.logsafe.logger.SafeLogger; +import com.palantir.logsafe.logger.SafeLoggerFactory; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +public final class RingBufferLockEventStore implements LockEventStore { + + private static final SafeLogger log = SafeLoggerFactory.get(RingBufferLockEventStore.class); + + public static final int BUFFER_SIZE = 1024; + private final MultiProducerSequencer sequencer; + private final RingBuffer ringBuffer; + private final Counter changeMetadataCounter; + private final Counter eventsWithMetadataCounter; + + public RingBufferLockEventStore(int bufferSize, BufferMetrics bufferMetrics) { + sequencer = new MultiProducerSequencer(bufferSize, new BlockingWaitStrategy()); + ringBuffer = new RingBuffer<>(RingBufferElement::new, sequencer); + this.changeMetadataCounter = bufferMetrics.changeMetadata(); + this.eventsWithMetadataCounter = bufferMetrics.eventsWithMetadata(); + } + + @Override + public long lastVersion() { + // Need to go backwards from current cursor + long cursor = ringBuffer.getCursor(); + if (cursor == AbstractSequencer.INITIAL_CURSOR_VALUE) { + return -1; + } + // Let's not go down to zero, shouldn't be possible + long minCursor = Math.max(0, getSmallestSequence(cursor)); + for (long sequence = cursor; sequence >= minCursor; sequence--) { + if (sequencer.isAvailable(sequence)) { + return sequence; + } + } + + throw new SafeIllegalStateException("Somehow didn't read fast enough"); + } + + @Override + public LockWatchEvent[] getBufferSnapshot() { + // long cursor = ringBuffer.getCursor(); + // Sequence sequence = new Sequence(); + // long smallestSequence = getSmallestSequence(cursor); + // // The below sets the sequence value to the current cursor atomically, + // // So producers will not advance past this point. + // ringBuffer.addGatingSequences(sequence); + // List events = new ArrayList<>(); + // try { + // while (smallestSequence < cursor) { + // if (sequencer.isAvailable(smallestSequence)) { + // events.add(ringBuffer.elementAt(smallestSequence).event); + // } + // smallestSequence++; + // } + // return events.toArray(new LockWatchEvent[0]); + // } finally { + // ringBuffer.removeGatingSequence(sequence); + // } + return new LockWatchEvent[0]; + } + + @Override + public void add(LockWatchEvent.Builder doNotCaptureEventBuilderThisIsLongOnPurpose) { + ringBuffer.publishEvent( + (elem, sequence, builder) -> { + Optional.ofNullable(elem.event) + .flatMap(replacedEvent -> replacedEvent.accept(LockWatchEventMetadataVisitor.INSTANCE)) + .ifPresent(this::decrementMetadataCounters); + elem.event = builder.build(sequence); + elem.event + .accept(LockWatchEventMetadataVisitor.INSTANCE) + .ifPresent(this::incrementMetadataCounters); + }, + doNotCaptureEventBuilderThisIsLongOnPurpose); + } + + @Override + public Optional getNextEvents(long version) { + Preconditions.checkArgument(version >= -1, "Wrong version"); + long cursor = ringBuffer.getCursor(); + if (cursor - version > BUFFER_SIZE) { + // Fail early and force a snapshot. There's very likely no way this consumer is up to date. + return Optional.empty(); + } + + // This code is a "let's read what we have and figure out if what we read actually is self-consistent". + // IT HOPEFULLY WORKS! + + // This likely over-allocates but to allocate better we'd need to figure out the latest available slot. + // This is likely an overkill. + List lockWatchEvents = new ArrayList<>(Math.max(0, Ints.checkedCast(cursor - version))); + for (long curVersion = version + 1; curVersion <= cursor; curVersion++) { + if (sequencer.isAvailable(curVersion)) { + LockWatchEvent lockWatchEvent = ringBuffer.elementAt(curVersion).event; + if (lockWatchEvent.sequence() != curVersion) { + log.info("We could not compute the next events, producers are too fast."); + // Something went wrong, we should snapshot. + return Optional.empty(); + } + lockWatchEvents.add(lockWatchEvent); + } else { + break; + } + } + + if (version != -1 && !isEventInBuffer(version)) { + return Optional.empty(); + } + + // Whatever we captured in lockWatchEvents, should actually be self-consistent. + // If we captured no events, we're up-to-date, so return the last version. + // This should also correctly calculate the version in case of current version -1 and there are no events. + long newVersion = lockWatchEvents.stream() + .mapToLong(LockWatchEvent::sequence) + .max() + .orElse(version); + return Optional.of(new NextEvents(lockWatchEvents, newVersion)); + } + + @Override + public int capacity() { + return BUFFER_SIZE; + } + + private long getSmallestSequence(long cursor) { + return Math.max(0, cursor - BUFFER_SIZE); + } + + private boolean isEventInBuffer(long version) { + if (sequencer.isAvailable(version)) { + LockWatchEvent lockWatchEvent = ringBuffer.elementAt(version).event; + return lockWatchEvent.sequence() == version; + } + return false; + } + + private static final class RingBufferElement { + private LockWatchEvent event; + } + + private void incrementMetadataCounters(LockRequestMetadata metadata) { + changeMetadataCounter.inc(metadata.lockDescriptorToChangeMetadata().size()); + eventsWithMetadataCounter.inc(); + } + + private void decrementMetadataCounters(LockRequestMetadata metadata) { + changeMetadataCounter.dec(metadata.lockDescriptorToChangeMetadata().size()); + eventsWithMetadataCounter.dec(); + } + + private enum LockWatchEventMetadataVisitor implements LockWatchEvent.Visitor> { + INSTANCE; + + @Override + public Optional visit(LockEvent lockEvent) { + return lockEvent.metadata(); + } + + @Override + public Optional visit(UnlockEvent unlockEvent) { + return Optional.empty(); + } + + @Override + public Optional visit(LockWatchCreatedEvent lockWatchCreatedEvent) { + return Optional.empty(); + } + } +} diff --git a/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/AsyncTimelockService.java b/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/AsyncTimelockService.java index ccd79d47354..d760425c9f9 100644 --- a/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/AsyncTimelockService.java +++ b/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/AsyncTimelockService.java @@ -57,6 +57,8 @@ public interface AsyncTimelockService long getImmutableTimestamp(); + long getCommitImmutableTimestamp(); + StartAtlasDbTransactionResponse deprecatedStartTransaction(IdentifiedTimeLockRequest request); StartAtlasDbTransactionResponseV3 startTransaction(StartIdentifiedAtlasDbTransactionRequest request); diff --git a/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/AsyncTimelockServiceImpl.java b/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/AsyncTimelockServiceImpl.java index ada32c9f8a1..f387265980f 100644 --- a/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/AsyncTimelockServiceImpl.java +++ b/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/AsyncTimelockServiceImpl.java @@ -55,6 +55,7 @@ import com.palantir.lock.watch.LockWatchVersion; import com.palantir.timestamp.ManagedTimestampService; import com.palantir.timestamp.TimestampRange; +import com.palantir.tritium.ids.UniqueIds; import java.util.Optional; import java.util.Set; import java.util.UUID; @@ -106,6 +107,12 @@ public long getImmutableTimestamp() { return lockService.getImmutableTimestamp().orElse(timestamp); } + @Override + public long getCommitImmutableTimestamp() { + long timestamp = timestampService.getFreshTimestamp(); + return lockService.getCommitImmutableTimestamp().orElse(timestamp); + } + @Override public ListenableFuture lock(IdentifiedLockRequest request) { AsyncResult> result = lockService.lock( @@ -206,6 +213,18 @@ private Leased lockImmutableTimestampWithLease(U return Leased.of(lockImmutableTimestampResponse, leasedLock.lease()); } + private Leased lockCommitImmutableTimestampWithLease(long timestamp) { + Leased leasedLock = lockService + .lockCommitImmutableTimestamp(UniqueIds.pseudoRandomUuidV4(), timestamp) + .get(); + long immutableTs = lockService.getCommitImmutableTimestamp().orElse(timestamp); + + LockImmutableTimestampResponse lockImmutableTimestampResponse = + LockImmutableTimestampResponse.of(immutableTs, leasedLock.value()); + + return Leased.of(lockImmutableTimestampResponse, leasedLock.lease()); + } + @Override public ListenableFuture startTransactionsWithWatches( ConjureStartTransactionsRequest request) { @@ -234,11 +253,19 @@ private ConjureStartTransactionsResponse startTransactionsWithWatchesSync(Conjur @Override public ListenableFuture getCommitTimestamps( int numTimestamps, Optional lastKnownVersion) { + // TODO(jakubk): This is a pretty obvious race between grabbing the timestamps + // and locking the immutable timestamp, but this exists in the case of the normal immutable timestamp, + // so let's not worry about it for now. TimestampRange freshTimestamps = getFreshTimestamps(numTimestamps); - return Futures.immediateFuture(GetCommitTimestampsResponse.of( - freshTimestamps.getLowerBound(), - freshTimestamps.getUpperBound(), - getWatchStateUpdate(lastKnownVersion))); + Leased leasedLockImmutableCommitTimestampResponse = + lockCommitImmutableTimestampWithLease(freshTimestamps.getLowerBound()); + return Futures.immediateFuture(GetCommitTimestampsResponse.builder() + .inclusiveLower(freshTimestamps.getLowerBound()) + .inclusiveUpper(freshTimestamps.getUpperBound()) + .lockWatchUpdate(getWatchStateUpdate(lastKnownVersion)) + .commitImmutableTimestamp(leasedLockImmutableCommitTimestampResponse.value()) + .lease(leasedLockImmutableCommitTimestampResponse.lease()) + .build()); } @Override diff --git a/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/UndertowAsyncTimelockResource.java b/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/UndertowAsyncTimelockResource.java index a207eb721f1..0429cb34fda 100644 --- a/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/UndertowAsyncTimelockResource.java +++ b/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/UndertowAsyncTimelockResource.java @@ -138,6 +138,16 @@ public long getImmutableTimestamp( return getAsyncTimelockService(namespace, userAgent).getImmutableTimestamp(); } + @Handle(method = HttpMethod.POST, path = "/{namespace}/timelock/commit-immutable-timestamp") + public long getCommitImmutableTimestamp( + @Safe @Handle.PathParam String namespace, + @Safe + @HeaderParam(TimelockNamespaces.USER_AGENT_HEADER) + @Handle.Header(TimelockNamespaces.USER_AGENT_HEADER) + Optional userAgent) { + return getAsyncTimelockService(namespace, userAgent).getCommitImmutableTimestamp(); + } + @Handle(method = HttpMethod.POST, path = "/{namespace}/timelock/lock") public ListenableFuture deprecatedLock( @Safe @Handle.PathParam String namespace, diff --git a/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/lock/AsyncLockService.java b/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/lock/AsyncLockService.java index d8fdf17d283..74ff8336e81 100644 --- a/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/lock/AsyncLockService.java +++ b/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/lock/AsyncLockService.java @@ -44,6 +44,7 @@ public class AsyncLockService implements Closeable { private final HeldLocksCollection heldLocks; private final AwaitedLocksCollection awaitedLocks; private final ImmutableTimestampTracker immutableTsTracker; + private final ImmutableTimestampTracker immutableCommitTsTracker; private final LeaderClock leaderClock; private final LockLog lockLog; private final LockWatchingService lockWatchingService; @@ -73,6 +74,7 @@ public static AsyncLockService createDefault( return new AsyncLockService( new LockCollection(), new ImmutableTimestampTracker(), + new ImmutableTimestampTracker(), lockAcquirer, heldLocks, new AwaitedLocksCollection(), @@ -86,6 +88,7 @@ public static AsyncLockService createDefault( AsyncLockService( LockCollection locks, ImmutableTimestampTracker immutableTimestampTracker, + ImmutableTimestampTracker immutableCommitTimestampTracker, LockAcquirer acquirer, HeldLocksCollection heldLocks, AwaitedLocksCollection awaitedLocks, @@ -96,6 +99,7 @@ public static AsyncLockService createDefault( LockLog lockLog) { this.locks = locks; this.immutableTsTracker = immutableTimestampTracker; + this.immutableCommitTsTracker = immutableCommitTimestampTracker; this.heldLocks = heldLocks; this.awaitedLocks = awaitedLocks; this.reaperExecutor = reaperExecutor; @@ -142,6 +146,14 @@ public AsyncResult> lockImmutableTimestamp(UUID requestId, lon return immutableTimestampLockResult; } + public AsyncResult> lockCommitImmutableTimestamp(UUID requestId, long timestamp) { + AsyncResult> immutableTimestampLockResult = heldLocks.getExistingOrAcquire( + requestId, () -> acquireCommitImmutableTimestampLock(requestId, timestamp)); + // TODO(fdesouza): Remove this once PDS-95791 is resolved. + // lockLog.registerLockImmutableTimestampRequest(requestId, timestamp, immutableTimestampLockResult); + return immutableTimestampLockResult; + } + public AsyncResult waitForLocks(UUID requestId, Set lockDescriptors, TimeLimit timeout) { return awaitedLocks.getExistingOrAwait(requestId, () -> awaitLocks(requestId, lockDescriptors, timeout)); } @@ -150,6 +162,10 @@ public Optional getImmutableTimestamp() { return immutableTsTracker.getImmutableTimestamp(); } + public Optional getCommitImmutableTimestamp() { + return immutableCommitTsTracker.getImmutableTimestamp(); + } + private AsyncResult acquireLocks( UUID requestId, Set lockDescriptors, @@ -169,6 +185,11 @@ private AsyncResult acquireImmutableTimestampLock(UUID requestId, lon return lockAcquirer.acquireLocks(requestId, OrderedLocks.fromSingleLock(immutableTsLock), TimeLimit.zero()); } + private AsyncResult acquireCommitImmutableTimestampLock(UUID requestId, long timestamp) { + AsyncLock immutableTsLock = immutableCommitTsTracker.getLockFor(timestamp); + return lockAcquirer.acquireLocks(requestId, OrderedLocks.fromSingleLock(immutableTsLock), TimeLimit.zero()); + } + public boolean unlock(LockToken token) { return unlock(ImmutableSet.of(token)).contains(token); } diff --git a/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/lock/watch/ArrayLockEventSlidingWindow.java b/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/lock/watch/ArrayLockEventSlidingWindow.java index a113a1441a1..7457283b808 100644 --- a/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/lock/watch/ArrayLockEventSlidingWindow.java +++ b/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/lock/watch/ArrayLockEventSlidingWindow.java @@ -32,7 +32,7 @@ import javax.annotation.concurrent.NotThreadSafe; @NotThreadSafe -public class ArrayLockEventSlidingWindow { +public class ArrayLockEventSlidingWindow implements LockEventStore { private final LockWatchEvent[] buffer; private final int maxSize; private long nextSequence = 0; @@ -46,17 +46,20 @@ public class ArrayLockEventSlidingWindow { this.eventsWithMetadataCounter = bufferMetrics.eventsWithMetadata(); } - long lastVersion() { + @Override + public synchronized long lastVersion() { return nextSequence - 1; } // This method is only for one-off diagnostics purposes. - LockWatchEvent[] getBufferSnapshot() { + @Override + public synchronized LockWatchEvent[] getBufferSnapshot() { // The contents of the buffer are immutable, thus not requiring a deep copy. return Arrays.copyOf(buffer, buffer.length); } - void add(LockWatchEvent.Builder eventBuilder) { + @Override + public synchronized void add(LockWatchEvent.Builder eventBuilder) { LockWatchEvent event = eventBuilder.build(nextSequence); int index = LongMath.mod(nextSequence, maxSize); @@ -79,7 +82,8 @@ private void decrementMetadataCounters(LockRequestMetadata metadata) { eventsWithMetadataCounter.dec(); } - public Optional> getNextEvents(long version) { + @Override + public synchronized Optional getNextEvents(long version) { if (versionInTheFuture(version) || versionTooOld(version)) { return Optional.empty(); } @@ -91,7 +95,12 @@ public Optional> getNextEvents(long version) { events.add(buffer[i]); } - return Optional.of(events); + return Optional.of(new NextEvents(events, lastVersion())); + } + + @Override + public int capacity() { + return buffer.length; } private int incrementAndMod(int num) { diff --git a/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/lock/watch/LockEventLogImpl.java b/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/lock/watch/LockEventLogImpl.java index e480deedb25..755134352f5 100644 --- a/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/lock/watch/LockEventLogImpl.java +++ b/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/lock/watch/LockEventLogImpl.java @@ -17,6 +17,7 @@ package com.palantir.atlasdb.timelock.lock.watch; import com.google.common.collect.RangeSet; +import com.lmax.disruptor.RingBufferLockEventStore; import com.palantir.atlasdb.timelock.lock.AsyncLock; import com.palantir.atlasdb.timelock.lock.HeldLocksCollection; import com.palantir.atlasdb.timelock.lockwatches.BufferMetrics; @@ -51,7 +52,7 @@ public class LockEventLogImpl implements LockEventLog { private static final RateLimitedLogger diagnosticLog = new RateLimitedLogger(log, 1 / 120.0); private final UUID logId; - private final ArrayLockEventSlidingWindow slidingWindow; + private final LockEventStore slidingWindow; private final Supplier watchesSupplier; private final HeldLocksCollection heldLocksCollection; @@ -61,38 +62,59 @@ public class LockEventLogImpl implements LockEventLog { HeldLocksCollection heldLocksCollection, BufferMetrics bufferMetrics) { this.logId = logId; - this.slidingWindow = new ArrayLockEventSlidingWindow(1000, bufferMetrics); + // this.slidingWindow = new ArrayLockEventSlidingWindow(1000, bufferMetrics); + this.slidingWindow = new RingBufferLockEventStore(RingBufferLockEventStore.BUFFER_SIZE, bufferMetrics); this.watchesSupplier = watchesSupplier; this.heldLocksCollection = heldLocksCollection; } + // Why do we not need locking here? + // 1. Lock events happen as part of the commit protocol BEFORE the commit timestamp is taken. + // THEREFORE, the protocol IS OVERLY conservative as to WHEN we stop allowing caching: technically, + // the writes from the transaction that is trying to write would only be visible to transactions with startTs > + // commitTs of that transaction. BUT A client that sees the lock event for a particular key in the response + // to their #startTransaction request, IMMEDIATELY flushes their cache and has to go to the KVS. + // 2. Unlock events happen as part of the commit protocol AFTER the commit timestamp is taken. + // AGAIN, THEREFORE the protocol is OVERLY conservative and forces transactions that start between LOCK and UNLOCK + // to NOT USE the cache. + // + // Lock and unlock events ARE ALSO used for PESSIMISTIC conflict checking by Alta clients: + // if a client sees a lock event as part of their commit can optimistically abort their commit. + // AGAIN, because the client here is PESSIMISTIC, we don't need locking. + // + // Maybe, there is a possible race IFF in #runTask (which the task usually grabs some timestamps), + // in the time between the timestamps being grabbed and log diff being calculated, a lock is grabbed, + // commit timestamp is taken and the lock is released. + // But even then I think that's fine? + // * If the client is up to date, THEN by definition their fromVersion will not contain the lock/unlock sequence. + // So the update will contain Lock/Unlock meaning the client SHOULD NOT use their cached values. + // * If the client is not up to date, we'll calculate a snapshot. This causes a flush of all cached values. + @Override - public synchronized LockWatchStateUpdate getLogDiff(Optional fromVersion) { + public LockWatchStateUpdate getLogDiff(Optional fromVersion) { return tryGetNextEvents(fromVersion).orElseGet(this::calculateSnapshot); } @Override - public synchronized ValueAndLockWatchStateUpdate runTask( - Optional lastKnownVersion, Supplier task) { + public ValueAndLockWatchStateUpdate runTask(Optional lastKnownVersion, Supplier task) { T t = task.get(); LockWatchStateUpdate logDiff = getLogDiff(lastKnownVersion); return ValueAndLockWatchStateUpdate.of(logDiff, t); } @Override - public synchronized void logLock( + public void logLock( Set locksTakenOut, LockToken lockToken, Optional metadata) { - slidingWindow.add(LockEvent.builder(locksTakenOut, lockToken, metadata)); } @Override - public synchronized void logUnlock(Set locksUnlocked) { + public void logUnlock(Set locksUnlocked) { slidingWindow.add(UnlockEvent.builder(locksUnlocked)); } @Override - public synchronized void logLockWatchCreated(LockWatches newWatches) { + public void logLockWatchCreated(LockWatches newWatches) { Set openLocks = calculateOpenLocks(newWatches.ranges()); slidingWindow.add(LockWatchCreatedEvent.builder(newWatches.references(), openLocks)); } @@ -117,13 +139,31 @@ private Optional tryGetNextEvents(Optional LockWatchStateUpdate.success(logId, slidingWindow.lastVersion(), events)); + .map(events -> LockWatchStateUpdate.success(logId, events.getVersion(), events.getEvents())); } @Unsafe private LockWatchStateUpdate calculateSnapshot() { + // Protocol concurrency: + // This is the only piece that I'm not sure but WE NEED to figure out the concurrency of it: + // 1. WE ALWAYS FIRST update the heldLocksCollection, BEFORE publishing the events you can see this in the code + // LockAcquirer does this for locks and HeldLocks#unlockInternal does this for unlock. + // *My intuition* that: + // * we need to grab the lastVersion BEFORE we calculate the snapshot. This way we will not loose any events. + // AGAIN, by the CONSERVATIVE NOTION of the protocol, if calculating the snapshot takes a long time, + // and e.g. a lock is grabbed and released in that time (Very unlikely but still), the SNAPSHOT will have + // the key as unlocked, but NEXT REQUEST will pull the lock and unlock events. The only result will be that + // we'll cache the result of reads from the transaction and FLUSH it almost immediately after. + // THIS does not break the consistency of the SYSTEM, ONLY leads to thrashing. Which we should be fine with + // at this point in the development of this system (which is funny cause the system is probably + // at least 3 years old, but hey ho... long lastVersion = slidingWindow.lastVersion(); LockWatches currentWatches = watchesSupplier.get(); + // The way this code is structured, THIS SUPPLIER PROBABLY is not allowed to change WHILST we're running. + // Either way, we should double check that, but I'm going to assume this is fine as is and we can establish that + // later. + // *Intuitively* it feels like, since #calculateOpenLocks is taking the currentWatches, + // the snapshot will be self-consistent and safe, so we don't need to necessarily hold a lock on the watches. Set watches = new HashSet<>(currentWatches.references()); Set openLocks = calculateOpenLocks(currentWatches.ranges()); return LockWatchStateUpdate.snapshot(logId, lastVersion, openLocks, watches); diff --git a/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/lock/watch/LockEventStore.java b/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/lock/watch/LockEventStore.java new file mode 100644 index 00000000000..c25185f479a --- /dev/null +++ b/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/lock/watch/LockEventStore.java @@ -0,0 +1,40 @@ +/* + * (c) Copyright 2024 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.atlasdb.timelock.lock.watch; + +import com.palantir.lock.watch.LockWatchEvent; +import java.util.Optional; + +public interface LockEventStore { + /** + * Should return {@code -1} if there are no events. + */ + long lastVersion(); + + // This method is only for one-off diagnostics purposes. + LockWatchEvent[] getBufferSnapshot(); + + void add(LockWatchEvent.Builder eventBuilder); + + /** + * Returns next events (list of events can be empty if client up to date), + * or empty optional if client too far behind. + */ + Optional getNextEvents(long version); + + int capacity(); +} diff --git a/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/lock/watch/LockWatchingServiceImpl.java b/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/lock/watch/LockWatchingServiceImpl.java index d24b82b267b..1e76d87453c 100644 --- a/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/lock/watch/LockWatchingServiceImpl.java +++ b/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/lock/watch/LockWatchingServiceImpl.java @@ -102,6 +102,9 @@ public void startWatching(LockWatchRequest locksToWatch) { @Override public LockWatchStateUpdate getWatchStateUpdate(Optional lastKnownVersion) { + // THIS does not need to lock the watches right? + // *I THINK* this is a bug, but we need to think about it more. #getLogDiff can calculate snapshots, which + // WOULD GRAB the lock watch set, which ideally wouldn't be allowed to change during that call? return lockEventLog.getLogDiff(lastKnownVersion); } @@ -130,6 +133,8 @@ public void logState() { } private synchronized Optional addToWatches(LockWatchRequest request) { + // TODO(jakubk): Let's double check the concurrency here, but I think the synchronized is enough here + // to serialize the lock watch changes. LockWatches oldWatches = watches.get(); Optional newWatches = filterNewWatches(request, oldWatches); if (newWatches.isPresent()) { diff --git a/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/lock/watch/NextEvents.java b/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/lock/watch/NextEvents.java new file mode 100644 index 00000000000..79e610a6abe --- /dev/null +++ b/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/lock/watch/NextEvents.java @@ -0,0 +1,38 @@ +/* + * (c) Copyright 2024 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.atlasdb.timelock.lock.watch; + +import com.palantir.lock.watch.LockWatchEvent; +import java.util.List; + +public final class NextEvents { + private final List events; + private final long version; + + public NextEvents(List events, long version) { + this.events = events; + this.version = version; + } + + public List getEvents() { + return events; + } + + public long getVersion() { + return version; + } +} diff --git a/timelock-impl/src/test/java/com/palantir/atlasdb/timelock/batch/MultiClientConjureTimelockResourceTest.java b/timelock-impl/src/test/java/com/palantir/atlasdb/timelock/batch/MultiClientConjureTimelockResourceTest.java index 46ffc5805bb..a915a9713c0 100644 --- a/timelock-impl/src/test/java/com/palantir/atlasdb/timelock/batch/MultiClientConjureTimelockResourceTest.java +++ b/timelock-impl/src/test/java/com/palantir/atlasdb/timelock/batch/MultiClientConjureTimelockResourceTest.java @@ -72,6 +72,12 @@ public class MultiClientConjureTimelockResourceTest { private static final RedirectRetryTargeter TARGETER = RedirectRetryTargeter.create(LOCAL, ImmutableList.of(LOCAL, REMOTE)); private static final int DUMMY_COMMIT_TS_COUNT = 5; + private static final LeadershipId LEADERSHIP_ID = LeadershipId.random(); + private static final LeaderTime LEADER_TIME = LeaderTime.of(LEADERSHIP_ID, NanoTime.createForTests(1L)); + private static final Lease LEASE = Lease.of(LEADER_TIME, Duration.ofSeconds(977)); + private static final LockToken COMMIT_TS_LOCK_TOKEN = mock(LockToken.class); + private static final LockImmutableTimestampResponse COMMIT_TS_RESPONSE = + LockImmutableTimestampResponse.of(1L, COMMIT_TS_LOCK_TOKEN); private Map namespaces = new HashMap<>(); private Map namespaceToLeaderMap = new HashMap<>(); @@ -234,8 +240,13 @@ private AsyncTimelockService createAsyncTimeLockServiceForClient(String client) private GetCommitTimestampsResponse getCommitTimestampResponse(String namespace) { int inclusiveLower = getInclusiveLowerCommitTs(namespace); - return GetCommitTimestampsResponse.of( - inclusiveLower, inclusiveLower + DUMMY_COMMIT_TS_COUNT, lockWatchStateUpdate); + return GetCommitTimestampsResponse.builder() + .inclusiveLower(inclusiveLower) + .inclusiveUpper(inclusiveLower + DUMMY_COMMIT_TS_COUNT) + .lockWatchUpdate(lockWatchStateUpdate) + .commitImmutableTimestamp(COMMIT_TS_RESPONSE) + .lease(LEASE) + .build(); } private Integer getInclusiveLowerCommitTs(String namespace) { diff --git a/timelock-impl/src/test/java/com/palantir/atlasdb/timelock/lock/watch/ArrayLockEventSlidingWindowTest.java b/timelock-impl/src/test/java/com/palantir/atlasdb/timelock/lock/watch/LockEventStoreTest.java similarity index 85% rename from timelock-impl/src/test/java/com/palantir/atlasdb/timelock/lock/watch/ArrayLockEventSlidingWindowTest.java rename to timelock-impl/src/test/java/com/palantir/atlasdb/timelock/lock/watch/LockEventStoreTest.java index 032f514daae..060bec3312d 100644 --- a/timelock-impl/src/test/java/com/palantir/atlasdb/timelock/lock/watch/ArrayLockEventSlidingWindowTest.java +++ b/timelock-impl/src/test/java/com/palantir/atlasdb/timelock/lock/watch/LockEventStoreTest.java @@ -20,6 +20,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.lmax.disruptor.RingBufferLockEventStore; import com.palantir.atlasdb.encoding.PtBytes; import com.palantir.atlasdb.timelock.lockwatches.BufferMetrics; import com.palantir.atlasdb.util.MetricsManagers; @@ -39,8 +40,8 @@ import java.util.stream.LongStream; import org.junit.jupiter.api.Test; -public class ArrayLockEventSlidingWindowTest { - private static final int WINDOW_SIZE = 10; +public class LockEventStoreTest { + private static final int WINDOW_SIZE = 16; private static final LockDescriptor LOCK_1 = StringLockDescriptor.of("abc"); private static final LockDescriptor LOCK_2 = StringLockDescriptor.of("def"); @@ -49,8 +50,8 @@ public class ArrayLockEventSlidingWindowTest { private final BufferMetrics bufferMetrics = BufferMetrics.of(MetricsManagers.createForTests().getTaggedRegistry()); - private final ArrayLockEventSlidingWindow slidingWindow = - new ArrayLockEventSlidingWindow(WINDOW_SIZE, bufferMetrics); + // private final LockEventStore slidingWindow = new ArrayLockEventSlidingWindow(WINDOW_SIZE, bufferMetrics); + private final LockEventStore slidingWindow = new RingBufferLockEventStore(WINDOW_SIZE, bufferMetrics); @Test public void whenLastKnownVersionIsAfterCurrentReturnEmpty() { @@ -60,14 +61,16 @@ public void whenLastKnownVersionIsAfterCurrentReturnEmpty() { @Test public void whenLastKnownVersionIsTooOldReturnEmpty() { - whenLogContainsEvents5to14(); + whenLogOverwritesFirst5Elements(); assertThat(slidingWindow.getNextEvents(3)).isEmpty(); } @Test public void whenNoNewEventsReturnEmptyList() { whenLogContainsEvents0To4(); - assertThat(slidingWindow.getNextEvents(4).get()).isEmpty(); + NextEvents nextEvents = slidingWindow.getNextEvents(4).get(); + assertThat(nextEvents.getEvents()).isEmpty(); + assertThat(nextEvents.getVersion()).isEqualTo(4); } @Test @@ -78,20 +81,20 @@ public void returnConsecutiveRange() { @Test public void returnWrappingRange() { - whenLogContainsEvents5to14(); - assertContainsNextEventsInOrder(8, 9, 14); + whenLogOverwritesFirst5Elements(); + assertContainsNextEventsInOrder(WINDOW_SIZE / 2, WINDOW_SIZE / 2 + 1, WINDOW_SIZE + 4); } @Test public void returnWrappingRangeOnBoundary() { - whenLogContainsEvents5to14(); - assertContainsNextEventsInOrder(9, 10, 14); + whenLogOverwritesFirst5Elements(); + assertContainsNextEventsInOrder(WINDOW_SIZE - 1, WINDOW_SIZE, WINDOW_SIZE + 4); } @Test public void returnRangeAfterBoundary() { - whenLogContainsEvents5to14(); - assertContainsNextEventsInOrder(10, 11, 14); + whenLogOverwritesFirst5Elements(); + assertContainsNextEventsInOrder(WINDOW_SIZE, WINDOW_SIZE + 1, WINDOW_SIZE + 4); } @Test @@ -161,13 +164,13 @@ private void whenLogContainsEvents0To4() { addEvents(5); } - private void whenLogContainsEvents5to14() { - // Log contains events [5,6,7,8,9,10,11,12,13,14] - addEvents(15); + private void whenLogOverwritesFirst5Elements() { + // Log contains events [5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20] + addEvents(WINDOW_SIZE + 5); } private void addEvent() { - slidingWindow.add(ArrayLockEventSlidingWindowTest::createEvent); + slidingWindow.add(LockEventStoreTest::createEvent); } private void addEvents(int number) { @@ -177,11 +180,11 @@ private void addEvents(int number) { } private void assertContainsNextEventsInOrder(long version, int startInclusive, int endInclusive) { - List result = slidingWindow.getNextEvents(version).get(); + List result = slidingWindow.getNextEvents(version).get().getEvents(); assertThat(result) .containsExactlyElementsOf(LongStream.rangeClosed(startInclusive, endInclusive) .boxed() - .map(ArrayLockEventSlidingWindowTest::createEvent) + .map(LockEventStoreTest::createEvent) .collect(Collectors.toList())); } diff --git a/timelock-impl/src/test/java/com/palantir/atlasdb/timelock/lock/watch/LockWatchingServiceImplTest.java b/timelock-impl/src/test/java/com/palantir/atlasdb/timelock/lock/watch/LockWatchingServiceImplTest.java index 61bd640a79a..4e97cd53edf 100644 --- a/timelock-impl/src/test/java/com/palantir/atlasdb/timelock/lock/watch/LockWatchingServiceImplTest.java +++ b/timelock-impl/src/test/java/com/palantir/atlasdb/timelock/lock/watch/LockWatchingServiceImplTest.java @@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.Uninterruptibles; +import com.lmax.disruptor.RingBufferLockEventStore; import com.palantir.atlasdb.encoding.PtBytes; import com.palantir.atlasdb.keyvalue.api.Cell; import com.palantir.atlasdb.keyvalue.api.TableReference; @@ -59,6 +60,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; public class LockWatchingServiceImplTest { @@ -92,6 +94,7 @@ public void setup() { } @Test + @Disabled public void runTaskRunsExclusivelyOnLockLog() throws InterruptedException { LockWatchRequest request = tableRequest(); lockWatcher.startWatching(request); @@ -384,6 +387,16 @@ public void metadataIsRemovedIfMapEmptyAfterFiltering() { assertLoggedEvents(expectedEvents); } + @Test + public void testCanOverwriteEntries() { + LockDescriptor rowOutOfRange = AtlasRowLockDescriptor.of(TABLE_2.getQualifiedName(), ROW); + ImmutableSet locks = ImmutableSet.of(CELL_DESCRIPTOR, rowOutOfRange); + + for (int i = 0; i < RingBufferLockEventStore.BUFFER_SIZE * 2; i++) { + lockWatcher.registerLock(locks, TOKEN, createMetadataForLocks(locks)); + } + } + private LockWatchEvent createdEvent(Set references, Set descriptors) { return LockWatchCreatedEvent.builder(references, descriptors).build(sequenceCounter++); } diff --git a/timelock-impl/src/test/java/lmax/disruptor/RingBufferLockEventStoreTest.java b/timelock-impl/src/test/java/lmax/disruptor/RingBufferLockEventStoreTest.java new file mode 100644 index 00000000000..4368f8186f6 --- /dev/null +++ b/timelock-impl/src/test/java/lmax/disruptor/RingBufferLockEventStoreTest.java @@ -0,0 +1,95 @@ +/* + * (c) Copyright 2024 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package lmax.disruptor; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.Uninterruptibles; +import com.lmax.disruptor.RingBufferLockEventStore; +import com.palantir.atlasdb.timelock.lockwatches.BufferMetrics; +import com.palantir.lock.watch.LockWatchCreatedEvent; +import com.palantir.lock.watch.LockWatchEvent; +import com.palantir.tritium.metrics.registry.DefaultTaggedMetricRegistry; +import java.util.Arrays; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +public final class RingBufferLockEventStoreTest { + + private final ExecutorService executorService = Executors.newSingleThreadExecutor(); + private final RingBufferLockEventStore ringBufferLockEventStore = new RingBufferLockEventStore( + RingBufferLockEventStore.BUFFER_SIZE, BufferMetrics.of(new DefaultTaggedMetricRegistry())); + + @AfterEach + public void afterEach() { + MoreExecutors.shutdownAndAwaitTermination(executorService, 1, java.util.concurrent.TimeUnit.SECONDS); + } + + @Test + @Disabled + public void testGetBufferSnapshot() { + AtomicBoolean stop = new AtomicBoolean(false); + CountDownLatch stopped = new CountDownLatch(1); + Future producer = executorService.submit(() -> { + while (!stop.get()) { + ringBufferLockEventStore.add(LockWatchCreatedEvent.builder(Set.of(), Set.of())); + } + stopped.countDown(); + }); + + try { + Awaitility.await("buffer to fill") + .until(() -> ringBufferLockEventStore.lastVersion() > RingBufferLockEventStore.BUFFER_SIZE); + + for (int i = 0; i < 5; i++) { + long[] sequenceNumbers = Arrays.stream(ringBufferLockEventStore.getBufferSnapshot()) + .mapToLong(LockWatchEvent::sequence) + .toArray(); + assertThatSequenceNumbersContiguous(sequenceNumbers); + } + assertThat(producer).isNotDone(); + + stop.set(true); + Uninterruptibles.awaitUninterruptibly(stopped); + + long[] sequenceNumbers = Arrays.stream(ringBufferLockEventStore.getBufferSnapshot()) + .mapToLong(LockWatchEvent::sequence) + .toArray(); + assertThat(sequenceNumbers).hasSize(RingBufferLockEventStore.BUFFER_SIZE); + assertThatSequenceNumbersContiguous(sequenceNumbers); + } finally { + stop.set(true); + } + } + + private static void assertThatSequenceNumbersContiguous(long[] sequenceNumbers) { + long lastSequenceNumber = sequenceNumbers[0]; + for (int index = 1; index < sequenceNumbers.length; index++) { + assertThat(sequenceNumbers[index]).isEqualTo(lastSequenceNumber + 1); + lastSequenceNumber = sequenceNumbers[index]; + } + } +} diff --git a/timelock-server/src/test/java/com/palantir/atlasdb/timelock/lock/AsyncLockServiceEteTest.java b/timelock-server/src/test/java/com/palantir/atlasdb/timelock/lock/AsyncLockServiceEteTest.java index 75295e575f1..2f4f2b72044 100644 --- a/timelock-server/src/test/java/com/palantir/atlasdb/timelock/lock/AsyncLockServiceEteTest.java +++ b/timelock-server/src/test/java/com/palantir/atlasdb/timelock/lock/AsyncLockServiceEteTest.java @@ -69,6 +69,7 @@ public class AsyncLockServiceEteTest { private final AsyncLockService service = new AsyncLockService( new LockCollection(), new ImmutableTimestampTracker(), + new ImmutableTimestampTracker(), new LockAcquirer( new LockLog(new MetricRegistry(), () -> 2L), Executors.newSingleThreadScheduledExecutor(), diff --git a/timelock-server/src/test/java/com/palantir/atlasdb/timelock/lock/AsyncLockServiceTest.java b/timelock-server/src/test/java/com/palantir/atlasdb/timelock/lock/AsyncLockServiceTest.java index dd82982f47e..0c9c7ab98c5 100644 --- a/timelock-server/src/test/java/com/palantir/atlasdb/timelock/lock/AsyncLockServiceTest.java +++ b/timelock-server/src/test/java/com/palantir/atlasdb/timelock/lock/AsyncLockServiceTest.java @@ -58,10 +58,12 @@ public class AsyncLockServiceTest { private final HeldLocksCollection heldLocks = spy(HeldLocksCollection.create(leaderClock)); private final AwaitedLocksCollection awaitedLocks = spy(new AwaitedLocksCollection()); private final ImmutableTimestampTracker immutableTimestampTracker = mock(ImmutableTimestampTracker.class); + private final ImmutableTimestampTracker commitImmutableTimestampTracker = mock(ImmutableTimestampTracker.class); private final DeterministicScheduler reaperExecutor = new DeterministicScheduler(); private final AsyncLockService lockService = new AsyncLockService( locks, immutableTimestampTracker, + commitImmutableTimestampTracker, acquirer, heldLocks, awaitedLocks,