Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
Add Lease Timestamps (#7305)
Browse files Browse the repository at this point in the history
  • Loading branch information
fsamuel-bs authored Oct 28, 2024
1 parent 119900c commit 9afedb1
Show file tree
Hide file tree
Showing 39 changed files with 376 additions and 38 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* (c) Copyright 2024 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.atlasdb.transaction.api;

import com.google.common.annotations.Beta;
import com.google.errorprone.annotations.RestrictedApi;
import com.palantir.atlasdb.common.api.annotations.ReviewedRestrictedApiUsage;
import com.palantir.atlasdb.common.api.timelock.TimestampLeaseName;
import java.util.function.Consumer;
import java.util.function.LongSupplier;

@Beta
public interface TimestampLeaseAwareTransaction {
/**
* Register pre-commit lambdas that are run on commit() just before the transaction is closed.
* <p>
* In the pre-commit lambdas, consumers can perform any actions they would inside a transaction.
* It is valid, for example, to read to or write from a table.
* <p>
* If {@code numLeasedTimestamps} is greater than 0, fresh timestamps will be fetched from {@link TransactionManager#getTimelockService()}
* and will be provided to the pre-commit lambda via the supplier on {@code Consumer<LongSupplier>}.
* <p>
* Clients can use {@link TimestampLeaseAwareTransactionManager#getLeasedTimestamp(TimestampLeaseName)}
* to fetch a timestamp before the earliest leased timestamp for a given {@code timestampLeaseName} on open
* transactions.
*
* @param leaseName the name of the lease the timestamps are bound to
* @param numLeasedTimestamps the number of timestamps that should be fetched and used in the pre-commit lambda
* @param preCommitAction the lambda executed just before commit. Note the consumer throws {@code RuntimeException}
* if a client requests more than specified in {@code numLeasedTimestamps}.
*/
@RestrictedApi(
explanation = "This API is only meant to be used by AtlasDb proxies that want to make use of the "
+ "performance improvements by tracking leased timestamps timestamps of open transactions."
+ " Misuse of this feature can cause correctness issues.",
link = "https://github.com/palantir/atlasdb/pull/7305",
allowedOnPath = ".*/src/test/.*", // Unsafe behavior in tests is ok.
allowlistAnnotations = {ReviewedRestrictedApiUsage.class})
void preCommit(TimestampLeaseName leaseName, int numLeasedTimestamps, Consumer<LongSupplier> preCommitAction);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* (c) Copyright 2024 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.atlasdb.transaction.api;

import com.google.errorprone.annotations.RestrictedApi;
import com.palantir.atlasdb.common.api.annotations.ReviewedRestrictedApiUsage;
import com.palantir.atlasdb.common.api.timelock.TimestampLeaseName;
import com.palantir.lock.v2.TimelockService;
import java.util.function.Consumer;

public interface TimestampLeaseAwareTransactionManager {
/**
* Returns the timestamp that is before leased timestamps returned by the consumer on {@link TimestampLeaseAwareTransaction#preCommit(TimestampLeaseName, int, Consumer)}
* for a {@code timestampLeaseName} in open transactions.
* <p>
* This is similar to {@link TransactionManager#getImmutableTimestamp()} as it returns a timestamp before timestamps
* in open transactions, but for the immutable timestamp the timestamps considered are start timestamps for open
* transactions, while for leased timestamps the timestamps considered are leased timestamps from the corresponding
* {@code timestampLeaseName} in open transactions.
* <p>
* If no transactions with a {@code timestampLeaseName} lock are open, this method returns a new fresh timestamp
* (i.e. equivalent to {@link TimelockService#getFreshTimestamp()}).
* <p>
* Consumers should to fetch the leased timestamp outside of transactions that potentially use it - if fetching the
* leased timestamp inside a transaction, it's possible for the transaction's start timestamp < leased timestamp,
* meaning the transaction cannot read all data up to leased timestamp.
*
* @param leaseName the name of the lease the timestamps are bound to
* @return the timestamp that is before any timestamp returned by the consumer of {@link TimestampLeaseAwareTransaction#preCommit(TimestampLeaseName, int, Consumer)}
* for open transactions.
*/
@RestrictedApi(
explanation = "This API is only meant to be used by AtlasDb proxies that want to make use of the"
+ " performance improvements by tracking leased timestamps timestamps of open transactions."
+ " Misuse of this feature can cause correctness issues.",
link = "https://github.com/palantir/atlasdb/pull/7305",
allowedOnPath = ".*/src/test/.*", // Unsafe behavior in tests is ok.
allowlistAnnotations = {ReviewedRestrictedApiUsage.class})
long getLeasedTimestamp(TimestampLeaseName leaseName);
}
14 changes: 14 additions & 0 deletions atlasdb-commons-api/build.gradle
Original file line number Diff line number Diff line change
@@ -1 +1,15 @@
apply from: "../gradle/shared.gradle"

dependencies {
compileOnly 'org.immutables:value::annotations'
annotationProcessor 'org.immutables:value'

implementation 'com.fasterxml.jackson.core:jackson-annotations'
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'com.google.errorprone:error_prone_annotations'
implementation 'com.palantir.safe-logging:preconditions'

testImplementation 'org.assertj:assertj-core'
testImplementation 'org.junit.jupiter:junit-jupiter'
testImplementation 'com.palantir.safe-logging:preconditions-assertj'
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,21 @@
* limitations under the License.
*/

package com.palantir.atlasdb.timelock.api;
package com.palantir.atlasdb.common.api.timelock;

import com.fasterxml.jackson.annotation.JsonValue;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.palantir.logsafe.Preconditions;
import com.palantir.logsafe.Safe;
import com.palantir.logsafe.SafeArg;
import java.util.Comparator;
import org.immutables.value.Value;

@JsonDeserialize(as = ImmutableTimestampLeaseName.class)
@JsonSerialize(as = ImmutableTimestampLeaseName.class)
@Value.Immutable
@Safe
public interface TimestampLeaseName {
Comparator<TimestampLeaseName> COMPARATOR = Comparator.comparing(TimestampLeaseName::name);

Expand All @@ -43,7 +45,7 @@ default void check() {
SafeArg.of("name", name()));
}

static TimestampLeaseName of(String name) {
static TimestampLeaseName of(@Safe String name) {
return ImmutableTimestampLeaseName.builder().name(name).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,19 @@
* limitations under the License.
*/

package com.palantir.atlasdb.timelock.api;

import static com.palantir.logsafe.testing.Assertions.assertThatLoggableExceptionThrownBy;
import static org.assertj.core.api.Assertions.assertThatCode;
package com.palantir.atlasdb.common.api.timelock;

import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.exceptions.SafeIllegalArgumentException;
import com.palantir.logsafe.testing.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

public final class TimestampLeaseNameTest {
@ValueSource(strings = {"ImmutableTimestamp", "ImmutableTimestampTest", "ImmutableTimestampX"})
@ParameterizedTest
public void throwsWhenProvidedNameStartsWithImmutableTimestamp(String name) {
assertThatLoggableExceptionThrownBy(() -> TimestampLeaseName.of(name))
Assertions.assertThatLoggableExceptionThrownBy(() -> TimestampLeaseName.of(name))
.isInstanceOf(SafeIllegalArgumentException.class)
.hasLogMessage("Name must not be a reserved name")
.hasExactlyArgs(SafeArg.of("name", name));
Expand All @@ -37,6 +35,6 @@ public void throwsWhenProvidedNameStartsWithImmutableTimestamp(String name) {
@ValueSource(strings = {"NotImmutableTimestamp", "CommitImmutableTimestamp", "Unrelated"})
@ParameterizedTest
public void doesNotThrowWhenProvidedNameDoesNotStartWithImmutableTimestamp(String name) {
assertThatCode(() -> TimestampLeaseName.of(name)).doesNotThrowAnyException();
Assertions.assertThatCode(() -> TimestampLeaseName.of(name)).doesNotThrowAnyException();
}
}
1 change: 0 additions & 1 deletion atlasdb-impl-shared/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ dependencies {
implementation project(":atlasdb-client")
implementation project(":atlasdb-commons")
implementation project(":atlasdb-coordination-impl")
implementation project(':timelock-api')
implementation project(":timestamp-api")

implementation 'com.github.ben-manes.caffeine:caffeine'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import com.codahale.metrics.Meter;
import com.palantir.atlasdb.AtlasDbMetricNames;
import com.palantir.atlasdb.common.api.annotations.ReviewedRestrictedApiUsage;
import com.palantir.atlasdb.timelock.api.TimestampLeaseName;
import com.palantir.atlasdb.common.api.timelock.TimestampLeaseName;
import com.palantir.atlasdb.util.MetricsManager;
import com.palantir.lock.v2.ClientLockingOptions;
import com.palantir.lock.v2.LockImmutableTimestampResponse;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import com.palantir.atlasdb.cell.api.DataKeyValueService;
import com.palantir.atlasdb.cleaner.api.Cleaner;
import com.palantir.atlasdb.common.api.annotations.ReviewedRestrictedApiUsage;
import com.palantir.atlasdb.common.api.timelock.TimestampLeaseName;
import com.palantir.atlasdb.debug.ConflictTracer;
import com.palantir.atlasdb.encoding.PtBytes;
import com.palantir.atlasdb.futures.AtlasFutures;
Expand Down Expand Up @@ -86,6 +87,7 @@
import com.palantir.atlasdb.transaction.api.GetRangesQuery;
import com.palantir.atlasdb.transaction.api.ImmutableGetRangesQuery;
import com.palantir.atlasdb.transaction.api.PreCommitCondition;
import com.palantir.atlasdb.transaction.api.TimestampLeaseAwareTransaction;
import com.palantir.atlasdb.transaction.api.TransactionCommitFailedException;
import com.palantir.atlasdb.transaction.api.TransactionConflictException;
import com.palantir.atlasdb.transaction.api.TransactionConflictException.CellConflict;
Expand Down Expand Up @@ -113,6 +115,7 @@
import com.palantir.atlasdb.transaction.api.snapshot.KeyValueSnapshotReaderManager;
import com.palantir.atlasdb.transaction.expectations.ExpectationsMetrics;
import com.palantir.atlasdb.transaction.impl.TransactionLocksManager.SummarizedLockCheckResult;
import com.palantir.atlasdb.transaction.impl.TransactionPreCommitActions.PerLeaseActions;
import com.palantir.atlasdb.transaction.impl.expectations.CellCountValidator;
import com.palantir.atlasdb.transaction.impl.expectations.TrackingDataKeyValueService;
import com.palantir.atlasdb.transaction.impl.expectations.TrackingDataKeyValueServiceImpl;
Expand Down Expand Up @@ -144,12 +147,15 @@
import com.palantir.common.streams.MoreStreams;
import com.palantir.lock.AtlasCellLockDescriptor;
import com.palantir.lock.AtlasRowLockDescriptor;
import com.palantir.lock.LimitingLongSupplier;
import com.palantir.lock.LockDescriptor;
import com.palantir.lock.v2.ClientLockingOptions;
import com.palantir.lock.v2.LockRequest;
import com.palantir.lock.v2.LockResponse;
import com.palantir.lock.v2.LockToken;
import com.palantir.lock.v2.TimelockService;
import com.palantir.lock.v2.TimestampLeaseResult;
import com.palantir.lock.v2.TimestampLeaseResults;
import com.palantir.lock.watch.ChangeMetadata;
import com.palantir.lock.watch.LockRequestMetadata;
import com.palantir.logsafe.Preconditions;
Expand Down Expand Up @@ -195,6 +201,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
Expand Down Expand Up @@ -223,7 +230,7 @@
* different rows and using range scans.
*/
public class SnapshotTransaction extends AbstractTransaction
implements ConstraintCheckingTransaction, CallbackAwareTransaction {
implements ConstraintCheckingTransaction, CallbackAwareTransaction, TimestampLeaseAwareTransaction {
private static final SafeLogger log = SafeLoggerFactory.get(SnapshotTransaction.class);
private static final SafeLogger perfLogger = SafeLoggerFactory.get("dualschema.perf");
private static final SafeLogger transactionLengthLogger = SafeLoggerFactory.get("txn.length");
Expand Down Expand Up @@ -306,6 +313,7 @@ private enum State {

protected final TransactionKnowledgeComponents knowledge;
private final TransactionLocksManager transactionLocksManager;
private final TransactionPreCommitActions preCommitActions;
private final PreCommitRequirementValidator preCommitRequirementValidator;
private final ReadSnapshotValidator readSnapshotValidator;
private final ThreadSafeCloser closer = new ThreadSafeCloser();
Expand Down Expand Up @@ -391,6 +399,7 @@ private enum State {
this.transactionOutcomeMetrics =
TransactionOutcomeMetrics.create(transactionMetrics, metricsManager.getTaggedRegistry());
this.expectationsDataCollectionMetrics = ExpectationsMetrics.of(metricsManager.getTaggedRegistry());
this.preCommitActions = new TransactionPreCommitActions();
this.transactionLocksManager = new TransactionLocksManager(
immutableTimestampLock, new DefaultLockValidityChecker(timelockService), timelockService::tryUnlock);
closer.register(transactionLocksManager);
Expand Down Expand Up @@ -1862,12 +1871,51 @@ public void commit() {
@Override
public void commit(TransactionService transactionService) {
try {
runPreCommitCallbacks();
commitWithoutCallbacks(transactionService);
} finally {
close();
}
}

@Override
public void preCommit(
TimestampLeaseName leaseName, int numLeasedTimestamps, Consumer<LongSupplier> preCommitAction) {
Preconditions.checkArgument(numLeasedTimestamps > 0, "Need to request a non-empty number of timestamp leases");
preCommitActions.addPreCommitAction(leaseName, numLeasedTimestamps, preCommitAction);
}

@ReviewedRestrictedApiUsage
private void runPreCommitCallbacks() {
Map<TimestampLeaseName, PerLeaseActions> actions = preCommitActions.getActions();
if (actions.isEmpty()) {
return;
}

TimestampLeaseResults timestampLeaseResults =
timelockService.acquireTimestampLeases(Maps.transformValues(actions, action -> action.timestampCount));
transactionLocksManager.registerLock(timestampLeaseResults.lock());

actions.forEach((timestampLeaseName, perLeaseActions) -> {
perLeaseActions.preCommitActions.forEach(preCommitAction -> {
LongSupplier leasedTimestamps;
if (preCommitAction.timestampCount == 0) {
leasedTimestamps = () -> {
throw new SafeRuntimeException(
"Cannot fetch leased timestamps since pre-commit action requested 0 leased timestamps",
SafeArg.of("leaseName", timestampLeaseName));
};
} else {
TimestampLeaseResult leaseResult =
timestampLeaseResults.results().get(timestampLeaseName);
leasedTimestamps = new LimitingLongSupplier(
leaseResult.freshTimestampsSupplier(), preCommitAction.timestampCount);
}
preCommitAction.action.accept(leasedTimestamps);
});
});
}

private void commitWithoutCallbacks(TransactionService transactionService) {
if (state.get() == State.COMMITTED) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import com.palantir.atlasdb.cell.api.DdlManager;
import com.palantir.atlasdb.cleaner.NoOpCleaner;
import com.palantir.atlasdb.cleaner.api.Cleaner;
import com.palantir.atlasdb.common.api.annotations.ReviewedRestrictedApiUsage;
import com.palantir.atlasdb.common.api.timelock.TimestampLeaseName;
import com.palantir.atlasdb.debug.ConflictTracer;
import com.palantir.atlasdb.keyvalue.api.ClusterAvailabilityStatus;
import com.palantir.atlasdb.keyvalue.api.KeyValueService;
Expand All @@ -42,6 +44,7 @@
import com.palantir.atlasdb.transaction.api.KeyValueServiceStatus;
import com.palantir.atlasdb.transaction.api.OpenTransaction;
import com.palantir.atlasdb.transaction.api.PreCommitCondition;
import com.palantir.atlasdb.transaction.api.TimestampLeaseAwareTransactionManager;
import com.palantir.atlasdb.transaction.api.Transaction;
import com.palantir.atlasdb.transaction.api.Transaction.TransactionType;
import com.palantir.atlasdb.transaction.api.TransactionFailedRetriableException;
Expand Down Expand Up @@ -72,6 +75,7 @@
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
Expand All @@ -82,7 +86,8 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;

/* package */ class SnapshotTransactionManager extends AbstractLockAwareTransactionManager {
/* package */ class SnapshotTransactionManager extends AbstractLockAwareTransactionManager
implements TimestampLeaseAwareTransactionManager {
private static final SafeLogger log = SafeLoggerFactory.get(SnapshotTransactionManager.class);

private static final int NUM_RETRIES = 10;
Expand Down Expand Up @@ -520,6 +525,12 @@ public long getUnreadableTimestamp() {
return cleaner.getUnreadableTimestamp();
}

@Override
@ReviewedRestrictedApiUsage
public long getLeasedTimestamp(TimestampLeaseName leaseName) {
return timelockService.getMinLeasedTimestamps(Set.of(leaseName)).get(leaseName);
}

@Override
public Cleaner getCleaner() {
return cleaner;
Expand Down
Loading

0 comments on commit 9afedb1

Please sign in to comment.