From d07967775705ab0217c21e99f6a7aa7addfcab53 Mon Sep 17 00:00:00 2001 From: Jakub Kozlowski Date: Fri, 25 Oct 2024 12:57:50 +0100 Subject: [PATCH] [Named min timestamp leases] LegacyTimelockService. (#7397) Timestamp leases in LegacyTimelockService. --- changelog/@unreleased/pr-7397.v2.yml | 5 + .../lock/v2/TimestampLeaseResult.java | 4 + .../lock/v2/TimestampLeaseResults.java | 4 + lock-impl/build.gradle | 2 + .../lock/impl/LegacyTimelockService.java | 109 ++++++++---- .../LegacyTimelockServiceIntegrationTest.java | 156 ++++++++++++++++++ 6 files changed, 246 insertions(+), 34 deletions(-) create mode 100644 changelog/@unreleased/pr-7397.v2.yml create mode 100644 lock-impl/src/test/java/com/palantir/lock/impl/LegacyTimelockServiceIntegrationTest.java diff --git a/changelog/@unreleased/pr-7397.v2.yml b/changelog/@unreleased/pr-7397.v2.yml new file mode 100644 index 00000000000..936bd3db884 --- /dev/null +++ b/changelog/@unreleased/pr-7397.v2.yml @@ -0,0 +1,5 @@ +type: improvement +improvement: + description: Timestamp leases in LegacyTimelockService. + links: + - https://github.com/palantir/atlasdb/pull/7397 diff --git a/lock-api-objects/src/main/java/com/palantir/lock/v2/TimestampLeaseResult.java b/lock-api-objects/src/main/java/com/palantir/lock/v2/TimestampLeaseResult.java index 218df2779a3..072be7813c3 100644 --- a/lock-api-objects/src/main/java/com/palantir/lock/v2/TimestampLeaseResult.java +++ b/lock-api-objects/src/main/java/com/palantir/lock/v2/TimestampLeaseResult.java @@ -26,4 +26,8 @@ public interface TimestampLeaseResult { @Value.Parameter LongSupplier freshTimestampsSupplier(); + + static TimestampLeaseResult of(long minLeasedTimestamp, LongSupplier freshTimestampsSupplier) { + return ImmutableTimestampLeaseResult.of(minLeasedTimestamp, freshTimestampsSupplier); + } } diff --git a/lock-api-objects/src/main/java/com/palantir/lock/v2/TimestampLeaseResults.java b/lock-api-objects/src/main/java/com/palantir/lock/v2/TimestampLeaseResults.java index f14c78fc917..e9f31d870f9 100644 --- a/lock-api-objects/src/main/java/com/palantir/lock/v2/TimestampLeaseResults.java +++ b/lock-api-objects/src/main/java/com/palantir/lock/v2/TimestampLeaseResults.java @@ -27,4 +27,8 @@ public interface TimestampLeaseResults { @Value.Parameter Map results(); + + static TimestampLeaseResults of(LockToken lock, Map results) { + return ImmutableTimestampLeaseResults.of(lock, results); + } } diff --git a/lock-impl/build.gradle b/lock-impl/build.gradle index b677d0d072a..89152bd35b2 100644 --- a/lock-impl/build.gradle +++ b/lock-impl/build.gradle @@ -13,6 +13,7 @@ dependencies { implementation 'com.palantir.refreshable:refreshable' implementation 'com.palantir.safe-logging:preconditions' implementation 'com.palantir.safe-logging:safe-logging' + implementation 'one.util:streamex' implementation 'org.eclipse.collections:eclipse-collections' implementation 'org.eclipse.collections:eclipse-collections-api' implementation 'org.slf4j:slf4j-api' @@ -21,6 +22,7 @@ dependencies { implementation project(':lock-api-objects') implementation project(':timelock-api') implementation project(':timestamp-api') + implementation project(':timelock-api:timelock-api-objects') testImplementation 'com.fasterxml.jackson.core:jackson-databind' testImplementation 'com.google.guava:guava' diff --git a/lock-impl/src/main/java/com/palantir/lock/impl/LegacyTimelockService.java b/lock-impl/src/main/java/com/palantir/lock/impl/LegacyTimelockService.java index f65873a0114..6937769fe27 100644 --- a/lock-impl/src/main/java/com/palantir/lock/impl/LegacyTimelockService.java +++ b/lock-impl/src/main/java/com/palantir/lock/impl/LegacyTimelockService.java @@ -16,24 +16,30 @@ package com.palantir.lock.impl; import com.google.common.collect.ImmutableSortedMap; +import com.palantir.atlasdb.timelock.api.ConjureTimestampRange; import com.palantir.atlasdb.timelock.api.TimestampLeaseName; import com.palantir.common.base.Throwables; import com.palantir.lock.AtlasTimestampLockDescriptor; +import com.palantir.lock.ConjureTimestampRangeTimestampSupplier; +import com.palantir.lock.LimitingLongSupplier; import com.palantir.lock.LockClient; import com.palantir.lock.LockDescriptor; import com.palantir.lock.LockMode; import com.palantir.lock.LockRefreshToken; import com.palantir.lock.LockService; import com.palantir.lock.SimpleTimeDuration; +import com.palantir.lock.logger.ClientId; import com.palantir.lock.v2.ClientLockingOptions; import com.palantir.lock.v2.LockImmutableTimestampResponse; import com.palantir.lock.v2.LockRequest; import com.palantir.lock.v2.LockResponse; import com.palantir.lock.v2.LockToken; import com.palantir.lock.v2.TimelockService; +import com.palantir.lock.v2.TimestampLeaseResult; import com.palantir.lock.v2.TimestampLeaseResults; import com.palantir.lock.v2.WaitForLocksRequest; import com.palantir.lock.v2.WaitForLocksResponse; +import com.palantir.logsafe.Preconditions; import com.palantir.timestamp.TimestampRange; import com.palantir.timestamp.TimestampService; import java.util.HashSet; @@ -42,13 +48,20 @@ import java.util.SortedMap; import java.util.TreeMap; import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.function.LongSupplier; import java.util.stream.Collectors; +import javax.annotation.Nullable; +import one.util.streamex.EntryStream; +import one.util.streamex.StreamEx; /** * A {@link TimelockService} implementation that delegates to a {@link LockService} and {@link TimestampService}. */ public class LegacyTimelockService implements TimelockService { + private static final ClientId TIMESTAMP_LEASES_CLIENT_ID = ClientId.of("timestamp-leases"); + private final TimestampService timestampService; private final LockService lockService; private final LockClient immutableTsLockClient; @@ -82,39 +95,13 @@ public TimestampRange getFreshTimestamps(int numTimestampsRequested) { @Override public LockImmutableTimestampResponse lockImmutableTimestamp() { - long immutableLockTs = timestampService.getFreshTimestamp(); - LockDescriptor lockDesc = AtlasTimestampLockDescriptor.of(immutableLockTs); - com.palantir.lock.LockRequest lockRequest = com.palantir.lock.LockRequest.builder( - ImmutableSortedMap.of(lockDesc, LockMode.READ)) - .withLockedInVersionId(immutableLockTs) - .build(); - LockRefreshToken lock; - - try { - lock = lockService.lock(immutableTsLockClient.getClientId(), lockRequest); - } catch (InterruptedException e) { - throw Throwables.throwUncheckedException(e); - } - - try { - return LockImmutableTimestampResponse.of( - getImmutableTimestampInternal(immutableLockTs), LockTokenConverter.toTokenV2(lock)); - } catch (Throwable e) { - if (lock != null) { - try { - lockService.unlock(lock); - } catch (Throwable unlockThrowable) { - e.addSuppressed(unlockThrowable); - } - } - throw Throwables.rewrapAndThrowUncheckedException(e); - } + return withLeaseFreshTimestamp(immutableTsLockClient.getClientId(), response -> response); } @Override public long getImmutableTimestamp() { long ts = timestampService.getFreshTimestamp(); - return getImmutableTimestampInternal(ts); + return getTimestampLeaseInternal(immutableTsLockClient.getClientId(), ts); } @Override @@ -186,18 +173,72 @@ public long currentTimeMillis() { @Override public TimestampLeaseResults acquireTimestampLeases(Map requests) { - // TODO(aalouane): implement! - throw new UnsupportedOperationException("Not implemented yet!"); + // This implementation here treats ALL the timestamp leases the same, meaning they will all progress + // at the same rate. This is only a perf concern (all timestamps progress together), not correctness. + // TODO(jakubk): This should reject timestamp lease names that conflict with the immutable timestamp lease + // or we should fix the implementation in timelock to allow these to conflict. + return withLeaseFreshTimestamp(TIMESTAMP_LEASES_CLIENT_ID.get(), response -> { + Map results = EntryStream.of(requests) + .mapValues(numFreshTimestamps -> { + TimestampRange timestampRange = getFreshTimestamps(numFreshTimestamps); + Preconditions.checkState( + timestampRange.size() >= numFreshTimestamps, "Didn't get enough timestamps"); + LongSupplier freshTimestamps = new LimitingLongSupplier( + new ConjureTimestampRangeTimestampSupplier(ConjureTimestampRange.of( + timestampRange.getLowerBound(), timestampRange.size())), + numFreshTimestamps); + return TimestampLeaseResult.of(response.getImmutableTimestamp(), freshTimestamps); + }) + .toMap(); + return TimestampLeaseResults.of(response.getLock(), results); + }); } @Override public Map getMinLeasedTimestamps(Set timestampNames) { - // TODO(aalouane): implement! - throw new UnsupportedOperationException("Not implemented yet!"); + long minLeasedTimestamp = + getTimestampLeaseInternal(TIMESTAMP_LEASES_CLIENT_ID.get(), timestampService.getFreshTimestamp()); + return StreamEx.of(timestampNames) + .mapToEntry(_ignored -> minLeasedTimestamp) + .toMap(); + } + + private R withLeaseFreshTimestamp(String timestampName, Function mapper) { + long immutableLockTs = timestampService.getFreshTimestamp(); + LockDescriptor lockDesc = AtlasTimestampLockDescriptor.of(immutableLockTs); + com.palantir.lock.LockRequest lockRequest = com.palantir.lock.LockRequest.builder( + ImmutableSortedMap.of(lockDesc, LockMode.READ)) + .withLockedInVersionId(immutableLockTs) + .build(); + LockRefreshToken lock; + + try { + lock = lockService.lock(timestampName, lockRequest); + } catch (InterruptedException e) { + throw Throwables.throwUncheckedException(e); + } + + try { + return mapper.apply(LockImmutableTimestampResponse.of( + getTimestampLeaseInternal(timestampName, immutableLockTs), LockTokenConverter.toTokenV2(lock))); + } catch (Throwable e) { + throw unlockAndRethrow(e, lock); + } + } + + private RuntimeException unlockAndRethrow(Throwable e, @Nullable LockRefreshToken lock) { + if (lock != null) { + try { + lockService.unlock(lock); + } catch (Throwable unlockThrowable) { + e.addSuppressed(unlockThrowable); + } + } + throw Throwables.rewrapAndThrowUncheckedException(e); } - private long getImmutableTimestampInternal(long ts) { - Long minLocked = lockService.getMinLockedInVersionId(immutableTsLockClient.getClientId()); + private long getTimestampLeaseInternal(String timestampName, long ts) { + Long minLocked = lockService.getMinLockedInVersionId(timestampName); return minLocked == null ? ts : minLocked; } diff --git a/lock-impl/src/test/java/com/palantir/lock/impl/LegacyTimelockServiceIntegrationTest.java b/lock-impl/src/test/java/com/palantir/lock/impl/LegacyTimelockServiceIntegrationTest.java new file mode 100644 index 00000000000..df793036d54 --- /dev/null +++ b/lock-impl/src/test/java/com/palantir/lock/impl/LegacyTimelockServiceIntegrationTest.java @@ -0,0 +1,156 @@ +/* + * (c) Copyright 2024 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.lock.impl; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import com.google.common.collect.Iterables; +import com.palantir.atlasdb.timelock.api.TimestampLeaseName; +import com.palantir.lock.LockClient; +import com.palantir.lock.annotations.ReviewedRestrictedApiUsage; +import com.palantir.lock.v2.LockToken; +import com.palantir.lock.v2.TimestampLeaseResult; +import com.palantir.lock.v2.TimestampLeaseResults; +import com.palantir.logsafe.Preconditions; +import com.palantir.timestamp.InMemoryTimestampService; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; +import java.util.stream.LongStream; +import one.util.streamex.StreamEx; +import org.assertj.core.api.MapAssert; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public final class LegacyTimelockServiceIntegrationTest { + + private static final LockClient LOCK_CLIENT = LockClient.of("foo"); + private static final TimestampLeaseName TIMESTAMP_LEASE_NAME1 = TimestampLeaseName.of("lease1"); + private static final TimestampLeaseName TIMESTAMP_LEASE_NAME2 = TimestampLeaseName.of("lease2"); + + private LockServiceImpl lockService; + private LegacyTimelockService timelock; + + @BeforeEach + public void beforeEach() { + lockService = LockServiceImpl.create(); + timelock = new LegacyTimelockService(new InMemoryTimestampService(), lockService, LOCK_CLIENT); + } + + @AfterEach + public void afterEach() { + lockService.close(); + } + + @Test + public void canAcquireAndReleaseMultipleLeases() { + TestLease lease1 = acquire(TIMESTAMP_LEASE_NAME1, TIMESTAMP_LEASE_NAME2); + TestLease lease2 = acquire(TIMESTAMP_LEASE_NAME1); + TestLease lease3 = acquire(TIMESTAMP_LEASE_NAME2); + TestLease lease4 = acquire(TIMESTAMP_LEASE_NAME1, TIMESTAMP_LEASE_NAME2); + assertThat(lease1.minLeasedTimestamp()) + .isEqualTo(lease2.minLeasedTimestamp()) + .isEqualTo(lease3.minLeasedTimestamp()) + .isEqualTo(lease4.minLeasedTimestamp()); + assertThatMinLeaseTimestamps(TIMESTAMP_LEASE_NAME1, TIMESTAMP_LEASE_NAME2) + .containsEntry(TIMESTAMP_LEASE_NAME1, lease1.minLeasedTimestamp()) + .containsEntry(TIMESTAMP_LEASE_NAME2, lease1.minLeasedTimestamp()); + + // Release middle leases + lease2.unlock(); + assertThatMinLeaseTimestamps(TIMESTAMP_LEASE_NAME1, TIMESTAMP_LEASE_NAME2) + .containsEntry(TIMESTAMP_LEASE_NAME1, lease1.minLeasedTimestamp()) + .containsEntry(TIMESTAMP_LEASE_NAME2, lease1.minLeasedTimestamp()); + lease3.unlock(); + assertThatMinLeaseTimestamps(TIMESTAMP_LEASE_NAME1, TIMESTAMP_LEASE_NAME2) + .containsEntry(TIMESTAMP_LEASE_NAME1, lease1.minLeasedTimestamp()) + .containsEntry(TIMESTAMP_LEASE_NAME2, lease1.minLeasedTimestamp()); + + // Release oldest lease + lease1.unlock(); + + // TODO(jakubk): This is awkward because we don't have the leased timestamps in the response. + MapAssert timestampsAssert = + assertThatMinLeaseTimestamps(TIMESTAMP_LEASE_NAME1, TIMESTAMP_LEASE_NAME2); + timestampsAssert.extractingByKey(TIMESTAMP_LEASE_NAME1).satisfies(minLeaseTimestamp -> assertThat( + minLeaseTimestamp) + .isGreaterThan(lease4.minLeasedTimestamp())); + timestampsAssert.extractingByKey(TIMESTAMP_LEASE_NAME2).satisfies(minLeaseTimestamp -> assertThat( + minLeaseTimestamp) + .isGreaterThan(lease4.minLeasedTimestamp())); + } + + @ReviewedRestrictedApiUsage + private TestLease acquire(TimestampLeaseName... leases) { + Map request = StreamEx.of(leases) + .mapToEntry(_ignored -> ThreadLocalRandom.current().nextInt(1, 10)) + .toMap(); + TimestampLeaseResults timestampLeaseResults = timelock.acquireTimestampLeases(request); + return createTestLease(request, timestampLeaseResults); + } + + private MapAssert assertThatMinLeaseTimestamps(TimestampLeaseName... expectedKeys) { + return assertThat(getMinLeasedTimestamps(Set.of(expectedKeys))).containsOnlyKeys(Set.of(expectedKeys)); + } + + @ReviewedRestrictedApiUsage + private Map getMinLeasedTimestamps(Set timestampNames) { + return timelock.getMinLeasedTimestamps(timestampNames); + } + + private TestLease createTestLease( + Map request, TimestampLeaseResults timestampLeaseResults) { + assertThat(timestampLeaseResults.results()).containsOnlyKeys(request.keySet()); + request.forEach((leaseName, numTimestamps) -> assertThat(timestampLeaseResults.results()) + .extractingByKey(leaseName) + .satisfies(result -> assertThatSupplierHasExactly(result, numTimestamps))); + Set minLeasedTimestamps = timestampLeaseResults.results().values().stream() + .map(TimestampLeaseResult::minLeasedTimestamp) + .collect(Collectors.toSet()); + assertThat(minLeasedTimestamps).hasSize(1); + return new TestLease(Iterables.getOnlyElement(minLeasedTimestamps), timestampLeaseResults.lock()); + } + + private static void assertThatSupplierHasExactly(TimestampLeaseResult result, int minNumTimestamps) { + LongStream.range(0, minNumTimestamps) + .forEach(_ignore -> Preconditions.checkState( + result.freshTimestampsSupplier().getAsLong() > 0)); + assertThatThrownBy(result.freshTimestampsSupplier()::getAsLong).isInstanceOf(RuntimeException.class); + } + + private final class TestLease { + + private final long minLeasedTimestamp; + private final LockToken lock; + + private TestLease(long minLeasedTimestamp, LockToken lock) { + this.minLeasedTimestamp = minLeasedTimestamp; + this.lock = lock; + } + + public void unlock() { + timelock.unlock(Set.of(lock)); + } + + public long minLeasedTimestamp() { + return minLeasedTimestamp; + } + } +}