Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
[Named min timestamp leases] Add timestamp lease acquirer (#7396)
Browse files Browse the repository at this point in the history
  • Loading branch information
ergo14 authored Oct 25, 2024
1 parent d2686a7 commit fcfe6de
Show file tree
Hide file tree
Showing 7 changed files with 365 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,8 @@ public interface TimestampLeaseResult {
static TimestampLeaseResult of(long minLeasedTimestamp, LongSupplier freshTimestampsSupplier) {
return ImmutableTimestampLeaseResult.of(minLeasedTimestamp, freshTimestampsSupplier);
}

static ImmutableTimestampLeaseResult.Builder builder() {
return ImmutableTimestampLeaseResult.builder();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,8 @@ public interface TimestampLeaseResults {
static TimestampLeaseResults of(LockToken lock, Map<TimestampLeaseName, TimestampLeaseResult> results) {
return ImmutableTimestampLeaseResults.of(lock, results);
}

static ImmutableTimestampLeaseResults.Builder builder() {
return ImmutableTimestampLeaseResults.builder();
}
}
1 change: 1 addition & 0 deletions lock-api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ dependencies {
implementation 'javax.ws.rs:javax.ws.rs-api'
implementation 'com.fasterxml.jackson.core:jackson-annotations'
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'com.github.rholder:guava-retrying'
implementation 'com.palantir.safe-logging:safe-logging'
implementation 'com.palantir.safe-logging:preconditions'
implementation 'com.palantir.refreshable:refreshable'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public final class LeasedLockToken implements LockToken {
@GuardedBy("this")
private boolean invalidated = false;

static LeasedLockToken of(ConjureLockToken serverToken, Lease lease) {
public static LeasedLockToken of(ConjureLockToken serverToken, Lease lease) {
return new LeasedLockToken(serverToken, UniqueIds.pseudoRandomUuidV4(), lease);
}

Expand All @@ -53,7 +53,7 @@ public ConjureLockToken serverToken() {
return serverToken;
}

synchronized Lease getLease() {
public synchronized Lease getLease() {
return lease;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,175 @@

package com.palantir.lock.client.timestampleases;

import com.github.rholder.retry.Attempt;
import com.github.rholder.retry.RetryException;
import com.github.rholder.retry.Retryer;
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import com.palantir.atlasdb.timelock.api.ConjureLockToken;
import com.palantir.atlasdb.timelock.api.LeaseGuarantee;
import com.palantir.atlasdb.timelock.api.LeaseIdentifier;
import com.palantir.atlasdb.timelock.api.NamespaceTimestampLeaseRequest;
import com.palantir.atlasdb.timelock.api.RequestId;
import com.palantir.atlasdb.timelock.api.TimestampLeaseName;
import com.palantir.lock.client.TimeLockUnlocker;
import com.palantir.atlasdb.timelock.api.TimestampLeaseRequests;
import com.palantir.atlasdb.timelock.api.TimestampLeaseResponse;
import com.palantir.atlasdb.timelock.api.TimestampLeaseResponses;
import com.palantir.common.exception.AtlasDbDependencyException;
import com.palantir.common.streams.KeyedStream;
import com.palantir.lock.ConjureTimestampRangeTimestampSupplier;
import com.palantir.lock.LimitingLongSupplier;
import com.palantir.lock.client.LeasedLockToken;
import com.palantir.lock.v2.TimestampLeaseResult;
import com.palantir.lock.v2.TimestampLeaseResults;
import com.palantir.logsafe.Preconditions;
import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.exceptions.SafeRuntimeException;
import com.palantir.logsafe.logger.SafeLogger;
import com.palantir.logsafe.logger.SafeLoggerFactory;
import com.palantir.tritium.ids.UniqueIds;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.function.LongSupplier;
import java.util.function.Supplier;

final class TimestampLeaseAcquirerImpl implements TimestampLeaseAcquirer {
private static final SafeLogger log = SafeLoggerFactory.get(TimestampLeaseAcquirerImpl.class);

private final NamespacedTimestampLeaseService delegate;
private final TimeLockUnlocker unlocker;
private final Unlocker unlocker;
private final Supplier<UUID> uuidSupplier;

private final Retryer<Optional<TimestampLeaseResponses>> retryer =
RetryerBuilder.<Optional<TimestampLeaseResponses>>newBuilder()
.retryIfResult(Optional::isEmpty)
.withStopStrategy(StopStrategies.stopAfterAttempt(3))
.build();

public TimestampLeaseAcquirerImpl(NamespacedTimestampLeaseService delegate, TimeLockUnlocker unlocker) {
@VisibleForTesting
TimestampLeaseAcquirerImpl(
NamespacedTimestampLeaseService delegate, Unlocker unlocker, Supplier<UUID> uuidSupplier) {
this.delegate = delegate;
this.unlocker = unlocker;
this.uuidSupplier = uuidSupplier;
}

public TimestampLeaseAcquirerImpl(NamespacedTimestampLeaseService delegate, Unlocker unlocker) {
this(delegate, unlocker, UniqueIds::pseudoRandomUuidV4);
}

@Override
public TimestampLeaseResults acquireNamedTimestampLeases(Map<TimestampLeaseName, Integer> requests) {
// TODO(aalouane): implement
throw new UnsupportedOperationException("Not implemented yet");
TimestampLeaseResponses response = acquireLeasesWithRetry(requests);
try {
return TimestampLeaseResults.builder()
.lock(createLeasedLockToken(response))
.results(createTimestampLeaseResult(requests, response.getTimestampLeaseResponses()))
.build();
} catch (RuntimeException | Error e) {
log.error("Unexpected exception while creating client results", e);
unlock(response);
throw e;
}
}

@Override
public void close() {
// TODO(aalouane): decide whether or not to close the unlocker depending on ownership
}

private TimestampLeaseResponses acquireLeasesWithRetry(Map<TimestampLeaseName, Integer> requests) {
try {
return retryer.call(() -> acquireLeases(requests)).orElseThrow();
} catch (ExecutionException e) {
// should not happen with how the retryer is set up
log.warn("Unexpected exception. Expected a retry exception", e);
throw new SafeRuntimeException(e);
} catch (RetryException e) {
Attempt<?> lastFailedAttempt = e.getLastFailedAttempt();
log.warn(
"Exhausted retries to get enough timestamps while acquiring timestamp lease",
SafeArg.of("requests", requests),
SafeArg.of("numRetries", lastFailedAttempt.getAttemptNumber()),
SafeArg.of("attemptHadException", lastFailedAttempt.hasException()),
e);
throw new AtlasDbDependencyException(
"Exhausted retries to get enough timestamps while acquiring timestamp lease",
lastFailedAttempt.hasException() ? lastFailedAttempt.getExceptionCause() : null,
SafeArg.of("requests", requests),
SafeArg.of("numRetries", lastFailedAttempt.getAttemptNumber()),
SafeArg.of(
"maybeResult",
lastFailedAttempt.hasResult() ? lastFailedAttempt.getResult() : Optional.empty()));
}
}

private Optional<TimestampLeaseResponses> acquireLeases(Map<TimestampLeaseName, Integer> requestedFreshTimestamps) {
// we prefer to use a new request id for non-dialogue-native dialogue attempts
TimestampLeaseRequests request =
TimestampLeaseRequests.of(RequestId.of(uuidSupplier.get()), requestedFreshTimestamps);
NamespaceTimestampLeaseRequest requests = NamespaceTimestampLeaseRequest.of(List.of(request));

TimestampLeaseResponses response = Iterables.getOnlyElement(
delegate.acquireTimestampLeases(requests).get());
Map<TimestampLeaseName, TimestampLeaseResponse> responseMap = response.getTimestampLeaseResponses();

Preconditions.checkArgument(
requestedFreshTimestamps.keySet().equals(responseMap.keySet()),
"Response lease timestamps need to match request timestamp names exactly");

boolean wasFullyFulfilled = requestedFreshTimestamps.keySet().stream().allMatch(timestampName -> {
int requestedTimestamps = requestedFreshTimestamps.get(timestampName);
long returnedTimestamps =
responseMap.get(timestampName).getFreshTimestamps().getCount();
return returnedTimestamps >= requestedTimestamps;
});

if (!wasFullyFulfilled) {
unlock(response);
log.info(
"Timestamp lease request was not fully fulfilled. This should happen infrequently.",
SafeArg.of("requests", requests),
SafeArg.of("responses", response));
return Optional.empty();
}

return Optional.of(response);
}

private void unlock(TimestampLeaseResponses responses) {
unlocker.unlock(responses.getLeaseGuarantee().getIdentifier());
}

private static Map<TimestampLeaseName, TimestampLeaseResult> createTimestampLeaseResult(
Map<TimestampLeaseName, Integer> requestedTimestamps,
Map<TimestampLeaseName, TimestampLeaseResponse> responses) {
return KeyedStream.stream(responses)
.<TimestampLeaseResult>map((timestampName, response) -> {
int requestedForName = requestedTimestamps.get(timestampName);
LongSupplier freshTimestamps = new LimitingLongSupplier(
new ConjureTimestampRangeTimestampSupplier(response.getFreshTimestamps()),
requestedForName);
return TimestampLeaseResult.builder()
.minLeasedTimestamp(response.getMinLeased())
.freshTimestampsSupplier(freshTimestamps)
.build();
})
.collectToMap();
}

private static LeasedLockToken createLeasedLockToken(TimestampLeaseResponses responses) {
LeaseGuarantee leaseGuarantee = responses.getLeaseGuarantee();
return LeasedLockToken.of(
ConjureLockToken.of(leaseGuarantee.getIdentifier().get()), leaseGuarantee.getLease());
}

interface Unlocker {
void unlock(LeaseIdentifier leaseGuarantee);
}
}
Loading

0 comments on commit fcfe6de

Please sign in to comment.