Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
[Named min timestamp leases] LegacyTimelockService. (#7397)
Browse files Browse the repository at this point in the history
Timestamp leases in LegacyTimelockService.
  • Loading branch information
jkozlowski authored Oct 25, 2024
1 parent aff4bf3 commit d079677
Show file tree
Hide file tree
Showing 6 changed files with 246 additions and 34 deletions.
5 changes: 5 additions & 0 deletions changelog/@unreleased/pr-7397.v2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
type: improvement
improvement:
description: Timestamp leases in LegacyTimelockService.
links:
- https://github.com/palantir/atlasdb/pull/7397
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,8 @@ public interface TimestampLeaseResult {

@Value.Parameter
LongSupplier freshTimestampsSupplier();

static TimestampLeaseResult of(long minLeasedTimestamp, LongSupplier freshTimestampsSupplier) {
return ImmutableTimestampLeaseResult.of(minLeasedTimestamp, freshTimestampsSupplier);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,8 @@ public interface TimestampLeaseResults {

@Value.Parameter
Map<TimestampLeaseName, TimestampLeaseResult> results();

static TimestampLeaseResults of(LockToken lock, Map<TimestampLeaseName, TimestampLeaseResult> results) {
return ImmutableTimestampLeaseResults.of(lock, results);
}
}
2 changes: 2 additions & 0 deletions lock-impl/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ dependencies {
implementation 'com.palantir.refreshable:refreshable'
implementation 'com.palantir.safe-logging:preconditions'
implementation 'com.palantir.safe-logging:safe-logging'
implementation 'one.util:streamex'
implementation 'org.eclipse.collections:eclipse-collections'
implementation 'org.eclipse.collections:eclipse-collections-api'
implementation 'org.slf4j:slf4j-api'
Expand All @@ -21,6 +22,7 @@ dependencies {
implementation project(':lock-api-objects')
implementation project(':timelock-api')
implementation project(':timestamp-api')
implementation project(':timelock-api:timelock-api-objects')

testImplementation 'com.fasterxml.jackson.core:jackson-databind'
testImplementation 'com.google.guava:guava'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,30 @@
package com.palantir.lock.impl;

import com.google.common.collect.ImmutableSortedMap;
import com.palantir.atlasdb.timelock.api.ConjureTimestampRange;
import com.palantir.atlasdb.timelock.api.TimestampLeaseName;
import com.palantir.common.base.Throwables;
import com.palantir.lock.AtlasTimestampLockDescriptor;
import com.palantir.lock.ConjureTimestampRangeTimestampSupplier;
import com.palantir.lock.LimitingLongSupplier;
import com.palantir.lock.LockClient;
import com.palantir.lock.LockDescriptor;
import com.palantir.lock.LockMode;
import com.palantir.lock.LockRefreshToken;
import com.palantir.lock.LockService;
import com.palantir.lock.SimpleTimeDuration;
import com.palantir.lock.logger.ClientId;
import com.palantir.lock.v2.ClientLockingOptions;
import com.palantir.lock.v2.LockImmutableTimestampResponse;
import com.palantir.lock.v2.LockRequest;
import com.palantir.lock.v2.LockResponse;
import com.palantir.lock.v2.LockToken;
import com.palantir.lock.v2.TimelockService;
import com.palantir.lock.v2.TimestampLeaseResult;
import com.palantir.lock.v2.TimestampLeaseResults;
import com.palantir.lock.v2.WaitForLocksRequest;
import com.palantir.lock.v2.WaitForLocksResponse;
import com.palantir.logsafe.Preconditions;
import com.palantir.timestamp.TimestampRange;
import com.palantir.timestamp.TimestampService;
import java.util.HashSet;
Expand All @@ -42,13 +48,20 @@
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import one.util.streamex.EntryStream;
import one.util.streamex.StreamEx;

/**
* A {@link TimelockService} implementation that delegates to a {@link LockService} and {@link TimestampService}.
*/
public class LegacyTimelockService implements TimelockService {

private static final ClientId TIMESTAMP_LEASES_CLIENT_ID = ClientId.of("timestamp-leases");

private final TimestampService timestampService;
private final LockService lockService;
private final LockClient immutableTsLockClient;
Expand Down Expand Up @@ -82,39 +95,13 @@ public TimestampRange getFreshTimestamps(int numTimestampsRequested) {

@Override
public LockImmutableTimestampResponse lockImmutableTimestamp() {
long immutableLockTs = timestampService.getFreshTimestamp();
LockDescriptor lockDesc = AtlasTimestampLockDescriptor.of(immutableLockTs);
com.palantir.lock.LockRequest lockRequest = com.palantir.lock.LockRequest.builder(
ImmutableSortedMap.of(lockDesc, LockMode.READ))
.withLockedInVersionId(immutableLockTs)
.build();
LockRefreshToken lock;

try {
lock = lockService.lock(immutableTsLockClient.getClientId(), lockRequest);
} catch (InterruptedException e) {
throw Throwables.throwUncheckedException(e);
}

try {
return LockImmutableTimestampResponse.of(
getImmutableTimestampInternal(immutableLockTs), LockTokenConverter.toTokenV2(lock));
} catch (Throwable e) {
if (lock != null) {
try {
lockService.unlock(lock);
} catch (Throwable unlockThrowable) {
e.addSuppressed(unlockThrowable);
}
}
throw Throwables.rewrapAndThrowUncheckedException(e);
}
return withLeaseFreshTimestamp(immutableTsLockClient.getClientId(), response -> response);
}

@Override
public long getImmutableTimestamp() {
long ts = timestampService.getFreshTimestamp();
return getImmutableTimestampInternal(ts);
return getTimestampLeaseInternal(immutableTsLockClient.getClientId(), ts);
}

@Override
Expand Down Expand Up @@ -186,18 +173,72 @@ public long currentTimeMillis() {

@Override
public TimestampLeaseResults acquireTimestampLeases(Map<TimestampLeaseName, Integer> requests) {
// TODO(aalouane): implement!
throw new UnsupportedOperationException("Not implemented yet!");
// This implementation here treats ALL the timestamp leases the same, meaning they will all progress
// at the same rate. This is only a perf concern (all timestamps progress together), not correctness.
// TODO(jakubk): This should reject timestamp lease names that conflict with the immutable timestamp lease
// or we should fix the implementation in timelock to allow these to conflict.
return withLeaseFreshTimestamp(TIMESTAMP_LEASES_CLIENT_ID.get(), response -> {
Map<TimestampLeaseName, TimestampLeaseResult> results = EntryStream.of(requests)
.mapValues(numFreshTimestamps -> {
TimestampRange timestampRange = getFreshTimestamps(numFreshTimestamps);
Preconditions.checkState(
timestampRange.size() >= numFreshTimestamps, "Didn't get enough timestamps");
LongSupplier freshTimestamps = new LimitingLongSupplier(
new ConjureTimestampRangeTimestampSupplier(ConjureTimestampRange.of(
timestampRange.getLowerBound(), timestampRange.size())),
numFreshTimestamps);
return TimestampLeaseResult.of(response.getImmutableTimestamp(), freshTimestamps);
})
.toMap();
return TimestampLeaseResults.of(response.getLock(), results);
});
}

@Override
public Map<TimestampLeaseName, Long> getMinLeasedTimestamps(Set<TimestampLeaseName> timestampNames) {
// TODO(aalouane): implement!
throw new UnsupportedOperationException("Not implemented yet!");
long minLeasedTimestamp =
getTimestampLeaseInternal(TIMESTAMP_LEASES_CLIENT_ID.get(), timestampService.getFreshTimestamp());
return StreamEx.of(timestampNames)
.mapToEntry(_ignored -> minLeasedTimestamp)
.toMap();
}

private <R> R withLeaseFreshTimestamp(String timestampName, Function<LockImmutableTimestampResponse, R> mapper) {
long immutableLockTs = timestampService.getFreshTimestamp();
LockDescriptor lockDesc = AtlasTimestampLockDescriptor.of(immutableLockTs);
com.palantir.lock.LockRequest lockRequest = com.palantir.lock.LockRequest.builder(
ImmutableSortedMap.of(lockDesc, LockMode.READ))
.withLockedInVersionId(immutableLockTs)
.build();
LockRefreshToken lock;

try {
lock = lockService.lock(timestampName, lockRequest);
} catch (InterruptedException e) {
throw Throwables.throwUncheckedException(e);
}

try {
return mapper.apply(LockImmutableTimestampResponse.of(
getTimestampLeaseInternal(timestampName, immutableLockTs), LockTokenConverter.toTokenV2(lock)));
} catch (Throwable e) {
throw unlockAndRethrow(e, lock);
}
}

private RuntimeException unlockAndRethrow(Throwable e, @Nullable LockRefreshToken lock) {
if (lock != null) {
try {
lockService.unlock(lock);
} catch (Throwable unlockThrowable) {
e.addSuppressed(unlockThrowable);
}
}
throw Throwables.rewrapAndThrowUncheckedException(e);
}

private long getImmutableTimestampInternal(long ts) {
Long minLocked = lockService.getMinLockedInVersionId(immutableTsLockClient.getClientId());
private long getTimestampLeaseInternal(String timestampName, long ts) {
Long minLocked = lockService.getMinLockedInVersionId(timestampName);
return minLocked == null ? ts : minLocked;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* (c) Copyright 2024 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.lock.impl;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import com.google.common.collect.Iterables;
import com.palantir.atlasdb.timelock.api.TimestampLeaseName;
import com.palantir.lock.LockClient;
import com.palantir.lock.annotations.ReviewedRestrictedApiUsage;
import com.palantir.lock.v2.LockToken;
import com.palantir.lock.v2.TimestampLeaseResult;
import com.palantir.lock.v2.TimestampLeaseResults;
import com.palantir.logsafe.Preconditions;
import com.palantir.timestamp.InMemoryTimestampService;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import one.util.streamex.StreamEx;
import org.assertj.core.api.MapAssert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public final class LegacyTimelockServiceIntegrationTest {

private static final LockClient LOCK_CLIENT = LockClient.of("foo");
private static final TimestampLeaseName TIMESTAMP_LEASE_NAME1 = TimestampLeaseName.of("lease1");
private static final TimestampLeaseName TIMESTAMP_LEASE_NAME2 = TimestampLeaseName.of("lease2");

private LockServiceImpl lockService;
private LegacyTimelockService timelock;

@BeforeEach
public void beforeEach() {
lockService = LockServiceImpl.create();
timelock = new LegacyTimelockService(new InMemoryTimestampService(), lockService, LOCK_CLIENT);
}

@AfterEach
public void afterEach() {
lockService.close();
}

@Test
public void canAcquireAndReleaseMultipleLeases() {
TestLease lease1 = acquire(TIMESTAMP_LEASE_NAME1, TIMESTAMP_LEASE_NAME2);
TestLease lease2 = acquire(TIMESTAMP_LEASE_NAME1);
TestLease lease3 = acquire(TIMESTAMP_LEASE_NAME2);
TestLease lease4 = acquire(TIMESTAMP_LEASE_NAME1, TIMESTAMP_LEASE_NAME2);
assertThat(lease1.minLeasedTimestamp())
.isEqualTo(lease2.minLeasedTimestamp())
.isEqualTo(lease3.minLeasedTimestamp())
.isEqualTo(lease4.minLeasedTimestamp());
assertThatMinLeaseTimestamps(TIMESTAMP_LEASE_NAME1, TIMESTAMP_LEASE_NAME2)
.containsEntry(TIMESTAMP_LEASE_NAME1, lease1.minLeasedTimestamp())
.containsEntry(TIMESTAMP_LEASE_NAME2, lease1.minLeasedTimestamp());

// Release middle leases
lease2.unlock();
assertThatMinLeaseTimestamps(TIMESTAMP_LEASE_NAME1, TIMESTAMP_LEASE_NAME2)
.containsEntry(TIMESTAMP_LEASE_NAME1, lease1.minLeasedTimestamp())
.containsEntry(TIMESTAMP_LEASE_NAME2, lease1.minLeasedTimestamp());
lease3.unlock();
assertThatMinLeaseTimestamps(TIMESTAMP_LEASE_NAME1, TIMESTAMP_LEASE_NAME2)
.containsEntry(TIMESTAMP_LEASE_NAME1, lease1.minLeasedTimestamp())
.containsEntry(TIMESTAMP_LEASE_NAME2, lease1.minLeasedTimestamp());

// Release oldest lease
lease1.unlock();

// TODO(jakubk): This is awkward because we don't have the leased timestamps in the response.
MapAssert<TimestampLeaseName, Long> timestampsAssert =
assertThatMinLeaseTimestamps(TIMESTAMP_LEASE_NAME1, TIMESTAMP_LEASE_NAME2);
timestampsAssert.extractingByKey(TIMESTAMP_LEASE_NAME1).satisfies(minLeaseTimestamp -> assertThat(
minLeaseTimestamp)
.isGreaterThan(lease4.minLeasedTimestamp()));
timestampsAssert.extractingByKey(TIMESTAMP_LEASE_NAME2).satisfies(minLeaseTimestamp -> assertThat(
minLeaseTimestamp)
.isGreaterThan(lease4.minLeasedTimestamp()));
}

@ReviewedRestrictedApiUsage
private TestLease acquire(TimestampLeaseName... leases) {
Map<TimestampLeaseName, Integer> request = StreamEx.of(leases)
.mapToEntry(_ignored -> ThreadLocalRandom.current().nextInt(1, 10))
.toMap();
TimestampLeaseResults timestampLeaseResults = timelock.acquireTimestampLeases(request);
return createTestLease(request, timestampLeaseResults);
}

private MapAssert<TimestampLeaseName, Long> assertThatMinLeaseTimestamps(TimestampLeaseName... expectedKeys) {
return assertThat(getMinLeasedTimestamps(Set.of(expectedKeys))).containsOnlyKeys(Set.of(expectedKeys));
}

@ReviewedRestrictedApiUsage
private Map<TimestampLeaseName, Long> getMinLeasedTimestamps(Set<TimestampLeaseName> timestampNames) {
return timelock.getMinLeasedTimestamps(timestampNames);
}

private TestLease createTestLease(
Map<TimestampLeaseName, Integer> request, TimestampLeaseResults timestampLeaseResults) {
assertThat(timestampLeaseResults.results()).containsOnlyKeys(request.keySet());
request.forEach((leaseName, numTimestamps) -> assertThat(timestampLeaseResults.results())
.extractingByKey(leaseName)
.satisfies(result -> assertThatSupplierHasExactly(result, numTimestamps)));
Set<Long> minLeasedTimestamps = timestampLeaseResults.results().values().stream()
.map(TimestampLeaseResult::minLeasedTimestamp)
.collect(Collectors.toSet());
assertThat(minLeasedTimestamps).hasSize(1);
return new TestLease(Iterables.getOnlyElement(minLeasedTimestamps), timestampLeaseResults.lock());
}

private static void assertThatSupplierHasExactly(TimestampLeaseResult result, int minNumTimestamps) {
LongStream.range(0, minNumTimestamps)
.forEach(_ignore -> Preconditions.checkState(
result.freshTimestampsSupplier().getAsLong() > 0));
assertThatThrownBy(result.freshTimestampsSupplier()::getAsLong).isInstanceOf(RuntimeException.class);
}

private final class TestLease {

private final long minLeasedTimestamp;
private final LockToken lock;

private TestLease(long minLeasedTimestamp, LockToken lock) {
this.minLeasedTimestamp = minLeasedTimestamp;
this.lock = lock;
}

public void unlock() {
timelock.unlock(Set.of(lock));
}

public long minLeasedTimestamp() {
return minLeasedTimestamp;
}
}
}

0 comments on commit d079677

Please sign in to comment.