Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
Timestamp partial fill metric
Browse files Browse the repository at this point in the history
  • Loading branch information
ergo14 committed Oct 25, 2024
1 parent 218cad2 commit 2257446
Show file tree
Hide file tree
Showing 21 changed files with 161 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import com.palantir.lock.client.RequestBatchersFactory;
import com.palantir.lock.client.TimeLockClient;
import com.palantir.lock.client.TimestampCorroboratingTimelockService;
import com.palantir.lock.client.TimestampLeaseMetrics;
import com.palantir.lock.client.metrics.TimeLockFeedbackBackgroundTask;
import com.palantir.lock.client.timestampleases.MinLeasedTimestampGetter;
import com.palantir.lock.client.timestampleases.MinLeasedTimestampGetterImpl;
Expand Down Expand Up @@ -369,7 +370,8 @@ private static LockAndTimestampServices getLockAndTimestampServices(
namespacedConjureTimelockService,
multiClientTimelockServiceSupplier,
namespacedTimelockRpcClient,
timeLockHelperServices.requestBatchersFactory());
timeLockHelperServices.requestBatchersFactory(),
metricsManager);

TimestampManagementService timestampManagementService = new RemoteTimestampManagementAdapter(
serviceProvider.getTimestampManagementRpcClient(), timelockNamespace);
Expand All @@ -391,7 +393,8 @@ private static RemoteTimelockServiceAdapter createRemoteTimelockServiceAdapter(
NamespacedConjureTimelockService namespacedConjureTimelockService,
Supplier<InternalMultiClientConjureTimelockService> multiClientTimelockServiceSupplier,
NamespacedTimelockRpcClient namespacedTimelockRpcClient,
RequestBatchersFactory batchersFactory) {
RequestBatchersFactory batchersFactory,
MetricsManager metricsManager) {
LockTokenUnlocker unlocker = getTimeLockUnlocker(
timelockNamespace,
timelockRequestBatcherProviders,
Expand All @@ -401,8 +404,8 @@ private static RemoteTimelockServiceAdapter createRemoteTimelockServiceAdapter(
NamespacedTimestampLeaseService timestampLeaseService = new NamespacedTimestampLeaseServiceImpl(
Namespace.of(timelockNamespace), multiClientTimelockServiceSupplier.get());

TimestampLeaseAcquirer timestampLeaseAcquirer =
TimestampLeaseAcquirerImpl.create(timestampLeaseService, unlocker);
TimestampLeaseAcquirer timestampLeaseAcquirer = TimestampLeaseAcquirerImpl.create(
timestampLeaseService, unlocker, TimestampLeaseMetrics.of(metricsManager.getTaggedRegistry()));

MinLeasedTimestampGetter minLeasedTimestampGetter = new MinLeasedTimestampGetterImpl(timestampLeaseService);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import com.palantir.lock.client.NamespacedConjureTimelockServiceImpl;
import com.palantir.lock.client.RemoteTimelockServiceAdapter;
import com.palantir.lock.client.RequestBatchersFactory;
import com.palantir.lock.client.TimestampLeaseMetrics;
import com.palantir.lock.client.TransactionStarter;
import com.palantir.lock.client.timestampleases.MinLeasedTimestampGetterImpl;
import com.palantir.lock.client.timestampleases.NamespacedTimestampLeaseService;
Expand Down Expand Up @@ -90,6 +91,7 @@ public abstract class AbstractInMemoryTimelockExtension implements TimeLockServi
private NamespacedConjureTimelockServiceImpl namespacedConjureTimelockService;
private NamespacedTimestampLeaseService timestampLeaseService;
private LockTokenUnlocker unlocker;
private final MetricsManager metricsManager = MetricsManagers.createForTests();

public AbstractInMemoryTimelockExtension() {
this("client");
Expand Down Expand Up @@ -129,8 +131,6 @@ public void setup() {
.clusterSnapshot(clusterConfig)
.build();

MetricsManager metricsManager = MetricsManagers.createForTests();

timeLockAgent = TimeLockAgent.create(
metricsManager,
install,
Expand All @@ -146,7 +146,7 @@ public void setup() {
() -> System.exit(0));

delegate = timeLockAgent.createInvalidatingTimeLockServices(client);
createHelperServices(metricsManager);
createHelperServices();

// Wait for leadership
Awaitility.await()
Expand All @@ -156,7 +156,7 @@ public void setup() {
.until(() -> delegate.getTimestampService().getFreshTimestamp() > 0);
}

private void createHelperServices(MetricsManager metricsManager) {
private void createHelperServices() {
helperServices = TimeLockHelperServices.create(
client,
metricsManager,
Expand Down Expand Up @@ -248,7 +248,8 @@ public TimelockService getLegacyTimelockService() {
lockLeaseService,
transactionStarter,
commitTimestampGetter,
TimestampLeaseAcquirerImpl.create(timestampLeaseService, unlocker),
TimestampLeaseAcquirerImpl.create(
timestampLeaseService, unlocker, TimestampLeaseMetrics.of(metricsManager.getTaggedRegistry())),
new MinLeasedTimestampGetterImpl(timestampLeaseService));
}

Expand Down
1 change: 1 addition & 0 deletions lock-api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ apply plugin: 'com.palantir.metric-schema'
license {
exclude '**/TimestampCorrectnessMetrics.java'
exclude '**/TopologyMetrics.java'
exclude '**/TimestampLeaseMetrics.java'
}

libsDirName = file('build/artifacts')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.palantir.lock.client.timestampleases;

import com.codahale.metrics.Counter;
import com.github.rholder.retry.Attempt;
import com.github.rholder.retry.RetryException;
import com.github.rholder.retry.Retryer;
Expand All @@ -39,6 +40,7 @@
import com.palantir.lock.LimitingLongSupplier;
import com.palantir.lock.client.LeasedLockToken;
import com.palantir.lock.client.LockTokenUnlocker;
import com.palantir.lock.client.TimestampLeaseMetrics;
import com.palantir.lock.v2.TimestampLeaseResult;
import com.palantir.lock.v2.TimestampLeaseResults;
import com.palantir.logsafe.Preconditions;
Expand All @@ -62,28 +64,38 @@ public final class TimestampLeaseAcquirerImpl implements TimestampLeaseAcquirer
private final NamespacedTimestampLeaseService delegate;
private final Unlocker unlocker;
private final Supplier<UUID> uuidSupplier;
private final Counter notEnoughFreshTimestampsCounter;

private final Retryer<Optional<TimestampLeaseResponses>> retryer =
RetryerBuilder.<Optional<TimestampLeaseResponses>>newBuilder()
.retryIfResult(Optional::isEmpty)
.withStopStrategy(StopStrategies.stopAfterAttempt(3))
.build();

@VisibleForTesting
TimestampLeaseAcquirerImpl(
NamespacedTimestampLeaseService delegate, Unlocker unlocker, Supplier<UUID> uuidSupplier) {
private TimestampLeaseAcquirerImpl(
NamespacedTimestampLeaseService delegate,
Unlocker unlocker,
Supplier<UUID> uuidSupplier,
Counter notEnoughFreshTimestampsCounter) {
this.delegate = delegate;
this.unlocker = unlocker;
this.uuidSupplier = uuidSupplier;
this.notEnoughFreshTimestampsCounter = notEnoughFreshTimestampsCounter;
}

private TimestampLeaseAcquirerImpl(NamespacedTimestampLeaseService delegate, Unlocker unlocker) {
this(delegate, unlocker, UniqueIds::pseudoRandomUuidV4);
@VisibleForTesting
TimestampLeaseAcquirerImpl(
NamespacedTimestampLeaseService delegate, Unlocker unlocker, Supplier<UUID> uuidSupplier) {
this(delegate, unlocker, uuidSupplier, new Counter());
}

public static TimestampLeaseAcquirer create(
NamespacedTimestampLeaseService timestampLeaseService, LockTokenUnlocker unlocker) {
return new TimestampLeaseAcquirerImpl(timestampLeaseService, identifier -> unlock(unlocker, identifier));
NamespacedTimestampLeaseService delegate, LockTokenUnlocker unlocker, TimestampLeaseMetrics metrics) {
return new TimestampLeaseAcquirerImpl(
delegate,
identifier -> unlock(unlocker, identifier),
UniqueIds::pseudoRandomUuidV4,
metrics.notEnoughFreshTimestamps());
}

@Override
Expand Down Expand Up @@ -146,23 +158,27 @@ private Optional<TimestampLeaseResponses> acquireLeases(Map<TimestampLeaseName,
requestedFreshTimestamps.keySet().equals(responseMap.keySet()),
"Response lease timestamps need to match request timestamp names exactly");

boolean wasFullyFulfilled = requestedFreshTimestamps.keySet().stream().allMatch(timestampName -> {
int requestedTimestamps = requestedFreshTimestamps.get(timestampName);
long returnedTimestamps =
responseMap.get(timestampName).getFreshTimestamps().getCount();
return returnedTimestamps >= requestedTimestamps;
});
long wereNotFullyFulfilled = requestedFreshTimestamps.keySet().stream()
.filter(timestampName -> {
int requestedTimestamps = requestedFreshTimestamps.get(timestampName);
long returnedTimestamps =
responseMap.get(timestampName).getFreshTimestamps().getCount();
return returnedTimestamps < requestedTimestamps;
})
.count();

if (wereNotFullyFulfilled > 0) {
notEnoughFreshTimestampsCounter.inc(wereNotFullyFulfilled);

if (!wasFullyFulfilled) {
unlock(response);
log.info(
"Timestamp lease request was not fully fulfilled. This should happen infrequently.",
SafeArg.of("requests", requests),
SafeArg.of("requests", request),
SafeArg.of("responses", response));
return Optional.empty();
} else {
return Optional.of(response);
}

return Optional.of(response);
}

private void unlock(TimestampLeaseResponses responses) {
Expand Down
6 changes: 6 additions & 0 deletions lock-api/src/main/metrics/metric-schema.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,10 @@ namespaces:
type: timer
docs: observed call duration during leader election

timestampLease:
docs: Timestamp lease client side telemetry
metrics:
notEnoughFreshTimestamps:
type: counter
docs: Counts instances where TimeLock returns less than the requested number of timestamps in acquire timestamp lease calls

Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.palantir.atlasdb.timelock.lockwatches.RequestMetrics;
import com.palantir.atlasdb.timelock.paxos.LeadershipComponents;
import com.palantir.atlasdb.timelock.paxos.LeadershipComponents.LeadershipProxies;
import com.palantir.atlasdb.timelock.timestampleases.TimestampLeaseMetrics;
import com.palantir.atlasdb.util.MetricsManager;
import com.palantir.common.concurrent.NamedThreadFactory;
import com.palantir.common.concurrent.PTExecutors;
Expand Down Expand Up @@ -96,7 +97,8 @@ private AsyncTimelockService createRawAsyncTimelockService(
maybeEnhancedLockLog,
reaperExecutor,
timeoutExecutor,
BufferMetrics.of(metricsManager.getTaggedRegistry())),
BufferMetrics.of(metricsManager.getTaggedRegistry()),
TimestampLeaseMetrics.of(metricsManager.getTaggedRegistry())),
timestampServiceSupplier.get(),
maybeEnhancedLockLog,
RequestMetrics.of(metricsManager.getTaggedRegistry()));
Expand Down
1 change: 1 addition & 0 deletions timelock-impl/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -113,4 +113,5 @@ dependencies {
license {
exclude '**/RequestMetrics.java'
exclude '**/BufferMetrics.java'
exclude '**/TimestampLeaseMetrics.java'
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.palantir.atlasdb.timelock.lock.watch.LockWatchingService;
import com.palantir.atlasdb.timelock.lock.watch.LockWatchingServiceImpl;
import com.palantir.atlasdb.timelock.lockwatches.BufferMetrics;
import com.palantir.atlasdb.timelock.timestampleases.TimestampLeaseMetrics;
import com.palantir.lock.LockDescriptor;
import com.palantir.lock.v2.LeaderTime;
import com.palantir.lock.v2.LockToken;
Expand Down Expand Up @@ -64,7 +65,8 @@ public static AsyncLockService createDefault(
LockLog lockLog,
ScheduledExecutorService reaperExecutor,
ScheduledExecutorService timeoutExecutor,
BufferMetrics bufferMetrics) {
BufferMetrics bufferMetrics,
TimestampLeaseMetrics timestampLeaseMetrics) {

LeaderClock clock = LeaderClock.create();

Expand All @@ -73,7 +75,7 @@ public static AsyncLockService createDefault(
LockAcquirer lockAcquirer = new LockAcquirer(lockLog, timeoutExecutor, clock, lockWatchingService);

return new AsyncLockService(
new LockManager(),
LockManager.create(timestampLeaseMetrics),
lockAcquirer,
heldLocks,
new AwaitedLocksCollection(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,24 @@
package com.palantir.atlasdb.timelock.lock;

import com.palantir.atlasdb.timelock.api.TimestampLeaseName;
import com.palantir.atlasdb.timelock.timestampleases.TimestampLeaseMetrics;
import com.palantir.lock.LockDescriptor;
import java.util.Optional;
import java.util.Set;

final class LockManager {
private final ExclusiveLockCollection exclusiveLocks = new ExclusiveLockCollection();
private final NamedMinTimestampLockCollection namedMinTimestampLockCollection =
new NamedMinTimestampLockCollection();
private final ExclusiveLockCollection exclusiveLocks;
private final NamedMinTimestampLockCollection namedMinTimestampLockCollection;

private LockManager(
ExclusiveLockCollection exclusiveLocks, NamedMinTimestampLockCollection namedMinTimestampLockCollection) {
this.exclusiveLocks = exclusiveLocks;
this.namedMinTimestampLockCollection = namedMinTimestampLockCollection;
}

static LockManager create(TimestampLeaseMetrics metrics) {
return new LockManager(new ExclusiveLockCollection(), NamedMinTimestampLockCollection.create(metrics));
}

OrderedLocks getAllExclusiveLocks(Set<LockDescriptor> descriptors) {
return exclusiveLocks.getAll(descriptors);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,21 @@
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.palantir.atlasdb.timelock.api.TimestampLeaseName;
import com.palantir.atlasdb.timelock.timestampleases.TimestampLeaseMetrics;
import java.util.Optional;

final class NamedMinTimestampLockCollection {
private final LoadingCache<String, NamedMinTimestampTracker> namedMinTimestampTrackers =
Caffeine.newBuilder().build(NamedMinTimestampTrackerImpl::new);
private final LoadingCache<String, NamedMinTimestampTracker> namedMinTimestampTrackers;

private NamedMinTimestampLockCollection(LoadingCache<String, NamedMinTimestampTracker> namedMinTimestampTrackers) {
this.namedMinTimestampTrackers = namedMinTimestampTrackers;
}

static NamedMinTimestampLockCollection create(TimestampLeaseMetrics metrics) {
LoadingCache<String, NamedMinTimestampTracker> namedMinTimestampTrackers =
Caffeine.newBuilder().build(name -> NamedMinTimestampTrackerImpl.create(name, metrics));
return new NamedMinTimestampLockCollection(namedMinTimestampTrackers);
}

AsyncLock getImmutableTimestampLock(long timestamp) {
return getNamedTimestampLockInternal(TimestampLeaseName.RESERVED_NAME_FOR_IMMUTABLE_TIMESTAMP, timestamp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.palantir.atlasdb.timelock.lock;

import com.palantir.atlasdb.timelock.timestampleases.TimestampLeaseMetrics;
import com.palantir.lock.LockDescriptor;
import com.palantir.lock.StringLockDescriptor;
import com.palantir.logsafe.SafeArg;
Expand All @@ -24,16 +25,33 @@
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.concurrent.GuardedBy;

final class NamedMinTimestampTrackerImpl implements NamedMinTimestampTracker {
private final String timestampName;
private final AtomicInteger numLocksHeld;
private final AtomicLong approximateMinLocked;

@GuardedBy("this")
private final SortedMap<Long, UUID> holdersByTimestamp = new TreeMap<>();

NamedMinTimestampTrackerImpl(String timestampName) {
private NamedMinTimestampTrackerImpl(
String timestampName, AtomicInteger numLocksHeld, AtomicLong approximateMinLocked) {
this.timestampName = timestampName;
this.numLocksHeld = numLocksHeld;
this.approximateMinLocked = approximateMinLocked;
}

static NamedMinTimestampTracker create(String timestampName, TimestampLeaseMetrics metrics) {
AtomicInteger numLocksHeld = new AtomicInteger();
metrics.locksHeld().name(timestampName).build(numLocksHeld::get);

AtomicLong approximateMinLeased = new AtomicLong();
metrics.approximateSmallestLeased().name(timestampName).build(approximateMinLeased::get);

return new NamedMinTimestampTrackerImpl(timestampName, numLocksHeld, approximateMinLeased);
}

@Override
Expand All @@ -46,6 +64,7 @@ public synchronized void lock(long timestamp, UUID requestId) {
SafeArg.of("requestId", requestId),
SafeArg.of("currentHolder", holdersByTimestamp.get(timestamp)));
}
numLocksHeld.incrementAndGet();
}

@Override
Expand All @@ -58,14 +77,17 @@ public synchronized void unlock(long timestamp, UUID requestId) {
SafeArg.of("requestId", requestId),
SafeArg.of("currentHolder", holdersByTimestamp.get(timestamp)));
}
numLocksHeld.decrementAndGet();
}

@Override
public synchronized Optional<Long> getMinimumTimestamp() {
if (holdersByTimestamp.isEmpty()) {
return Optional.empty();
}
return Optional.of(holdersByTimestamp.firstKey());
Long minimum = holdersByTimestamp.firstKey();
approximateMinLocked.set(minimum);
return Optional.of(minimum);
}

@Override
Expand Down
Loading

0 comments on commit 2257446

Please sign in to comment.