Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
Immutable commit timestamp and delayed writes.
Browse files Browse the repository at this point in the history
  • Loading branch information
jkozlowski committed Sep 5, 2024
1 parent bd78b57 commit 89b5644
Show file tree
Hide file tree
Showing 63 changed files with 1,270 additions and 252 deletions.
28 changes: 28 additions & 0 deletions .palantir/revapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,34 @@ acceptedBreaks:
- code: "java.method.removed"
old: "method void com.palantir.atlasdb.keyvalue.api.watch.NoOpLockWatchManager::removeTransactionStateFromCache(long)"
justification: "Internal API"
"0.1142.0":
com.palantir.atlasdb:atlasdb-api:
- code: "java.annotation.removed"
old: "parameter void com.palantir.atlasdb.transaction.api.precommit.PreCommitRequirementValidator::throwIfImmutableTsOrCommitLocksExpired(===com.palantir.lock.v2.LockToken===)"
new: "parameter void com.palantir.atlasdb.transaction.api.precommit.PreCommitRequirementValidator::throwIfImmutableTsOrCommitLocksExpired(===java.util.Set<com.palantir.lock.v2.LockToken>===)"
justification: "Prototyping"
- code: "java.annotation.removed"
old: "parameter void com.palantir.atlasdb.transaction.api.precommit.PreCommitRequirementValidator::throwIfPreCommitRequirementsNotMet(===com.palantir.lock.v2.LockToken===,\
\ long)"
new: "parameter void com.palantir.atlasdb.transaction.api.precommit.PreCommitRequirementValidator::throwIfPreCommitRequirementsNotMet(===java.util.Set<com.palantir.lock.v2.LockToken>===,\
\ long)"
justification: "Prototyping"
- code: "java.method.addedToInterface"
new: "method long com.palantir.atlasdb.transaction.api.TransactionManager::getCommitImmutableTimestamp()"
justification: "Prototyping"
- code: "java.method.addedToInterface"
new: "method void com.palantir.atlasdb.transaction.api.Transaction::putDelayed(java.util.List<com.palantir.atlasdb.transaction.api.DelayedWrite>)"
justification: "Prototyping"
- code: "java.method.parameterTypeChanged"
old: "parameter void com.palantir.atlasdb.transaction.api.precommit.PreCommitRequirementValidator::throwIfImmutableTsOrCommitLocksExpired(===com.palantir.lock.v2.LockToken===)"
new: "parameter void com.palantir.atlasdb.transaction.api.precommit.PreCommitRequirementValidator::throwIfImmutableTsOrCommitLocksExpired(===java.util.Set<com.palantir.lock.v2.LockToken>===)"
justification: "Prototyping"
- code: "java.method.parameterTypeChanged"
old: "parameter void com.palantir.atlasdb.transaction.api.precommit.PreCommitRequirementValidator::throwIfPreCommitRequirementsNotMet(===com.palantir.lock.v2.LockToken===,\
\ long)"
new: "parameter void com.palantir.atlasdb.transaction.api.precommit.PreCommitRequirementValidator::throwIfPreCommitRequirementsNotMet(===java.util.Set<com.palantir.lock.v2.LockToken>===,\
\ long)"
justification: "Prototyping"
"0.770.0":
com.palantir.atlasdb:atlasdb-api:
- code: "java.class.removed"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* (c) Copyright 2024 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.atlasdb.transaction.api;

import com.palantir.atlasdb.keyvalue.api.Cell;
import com.palantir.atlasdb.keyvalue.api.TableReference;
import java.util.Map;
import java.util.function.LongFunction;

/**
* Curious creature, but it allows the user to delay deciding the particular cell to write to until the actual commit,
* at which point the user will be provided with a unique long value that can be used to generate the cell.
*
* This is a very specific feature that not many people probably require, or should try to use.
*
* Right now the interface allows you to do TOO MUCH, you don't even have to specify the tables you write to.
* This can be improved and locked down, but this allows for maximum experimentation.
*/
@FunctionalInterface
public interface DelayedWrite extends LongFunction<Map<TableReference, Map<Cell, byte[]>>> {}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.palantir.lock.watch.ChangeMetadata;
import com.palantir.util.result.Result;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
Expand Down Expand Up @@ -323,6 +324,9 @@ Stream<BatchingVisitable<RowResult<byte[]>>> getRangesLazy(
@Idempotent
void putWithMetadata(TableReference tableRef, Map<Cell, ValueAndChangeMetadata> valuesAndMetadata);

@Idempotent
void putDelayed(List<DelayedWrite> values);

/**
* Deletes values from the key-value store.
* @param tableRef the table from which to delete the values
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,8 @@ <T, C extends PreCommitCondition, E extends Exception> T runTaskWithConditionRea
*/
long getImmutableTimestamp();

long getCommitImmutableTimestamp();

/**
* Returns the lock service used by this transaction manager.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import com.palantir.atlasdb.transaction.api.TransactionFailedException;
import com.palantir.lock.v2.LockToken;
import java.util.Map;
import javax.annotation.Nullable;
import java.util.Set;

public interface PreCommitRequirementValidator {
/**
Expand All @@ -41,10 +41,10 @@ void throwIfPreCommitConditionInvalidAtCommitOnWriteTransaction(
* user pre-commit condition is no longer valid, or possibly because of other internal state such as commit
* locks having expired.
*/
void throwIfPreCommitRequirementsNotMet(@Nullable LockToken commitLocksToken, long timestamp);
void throwIfPreCommitRequirementsNotMet(Set<LockToken> commitLocksToken, long timestamp);

/**
* Throws a {@link TransactionFailedException} if the immutable timestamp lock or commit locks have expired.
*/
void throwIfImmutableTsOrCommitLocksExpired(@Nullable LockToken commitLocksToken);
void throwIfImmutableTsOrCommitLocksExpired(Set<LockToken> commitLocksToken);
}
1 change: 1 addition & 0 deletions atlasdb-autobatch/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ dependencies {
implementation 'com.palantir.conjure.java.api:errors'
implementation 'com.palantir.safe-logging:preconditions'
implementation 'com.palantir.tracing:tracing'
implementation 'com.palantir.tracing:tracing-api'
implementation 'io.dropwizard.metrics:metrics-core'
implementation project(':commons-executors')

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
import com.palantir.logsafe.logger.SafeLogger;
import com.palantir.logsafe.logger.SafeLoggerFactory;
import com.palantir.tracing.Observability;
import com.palantir.tracing.Tracer;
import com.palantir.tracing.Tracers;
import com.palantir.tracing.api.SpanType;
import java.time.Duration;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -97,7 +100,25 @@ public static <O> AutobatcherBuilder<SupplierKey, O> coalescing(Supplier<O> supp
*/
public static <I, O> AutobatcherBuilder<I, O> independent(Consumer<List<BatchElement<I, O>>> batchFunction) {
return new AutobatcherBuilder<>(parameters -> new IndependentBatchingEventHandler<>(
maybeWrapWithTimeout(batchFunction, parameters), parameters.batchSize()));
maybeWrapWithTimeout(
input -> {
long start = System.nanoTime();
try {
Tracer.initTraceWithSpan(
Observability.SAMPLE,
Tracers.randomId(),
parameters.safeLoggablePurpose(),
SpanType.LOCAL);
batchFunction.accept(input);
} finally {
log.info(
"Finished batch",
SafeArg.of("duration", Duration.ofNanos(System.nanoTime() - start)));
Tracer.fastCompleteSpan();
}
},
parameters),
parameters.batchSize()));
}

private static <I, O> Consumer<List<BatchElement<I, O>>> maybeWrapWithTimeout(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.palantir.atlasdb.keyvalue.api.RowResult;
import com.palantir.atlasdb.keyvalue.api.TableReference;
import com.palantir.atlasdb.transaction.api.ConstraintCheckable;
import com.palantir.atlasdb.transaction.api.DelayedWrite;
import com.palantir.atlasdb.transaction.api.GetRangesQuery;
import com.palantir.atlasdb.transaction.api.Transaction;
import com.palantir.atlasdb.transaction.api.TransactionFailedException;
Expand All @@ -37,6 +38,7 @@
import com.palantir.lock.watch.ChangeMetadata;
import com.palantir.util.result.Result;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
Expand Down Expand Up @@ -144,6 +146,11 @@ public void putWithMetadata(TableReference tableRef, Map<Cell, ValueAndChangeMet
delegate().putWithMetadata(tableRef, valuesAndMetadata);
}

@Override
public void putDelayed(List<DelayedWrite> values) {
delegate().putDelayed(values);
}

@Override
public void delete(TableReference tableRef, Set<Cell> keys) {
delegate().delete(tableRef, keys);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,11 @@ private static LockAndTimestampServices createRawRemoteServices(
.lock(lockService)
.timestamp(timeService)
.timestampManagement(timestampManagementService)
.timelock(new LegacyTimelockService(timeService, lockService, TransactionManagers.LOCK_CLIENT))
.timelock(new LegacyTimelockService(
timeService,
lockService,
TransactionManagers.LOCK_CLIENT,
TransactionManagers.COMMIT_TIMESTAMP_LOCK_CLIENT))
.build();
}

Expand All @@ -463,7 +467,11 @@ private static LockAndTimestampServices createRawEmbeddedServices(
.lock(lockService)
.timestamp(timeService)
.timestampManagement(timestampManagementService)
.timelock(new LegacyTimelockService(timeService, lockService, TransactionManagers.LOCK_CLIENT))
.timelock(new LegacyTimelockService(
timeService,
lockService,
TransactionManagers.LOCK_CLIENT,
TransactionManagers.COMMIT_TIMESTAMP_LOCK_CLIENT))
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@
public abstract class TransactionManagers {
private static final SafeLogger log = SafeLoggerFactory.get(TransactionManagers.class);
public static final LockClient LOCK_CLIENT = LockClient.of("atlas instance");
public static final LockClient COMMIT_TIMESTAMP_LOCK_CLIENT = LockClient.of("commitTimestamp");

abstract AtlasDbConfig config();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.logger.SafeLogger;
import com.palantir.logsafe.logger.SafeLoggerFactory;
import com.palantir.tracing.CloseableTracer;
import com.palantir.util.RateLimitedLogger;
import java.util.Comparator;
import java.util.List;
Expand Down Expand Up @@ -112,21 +113,28 @@ public static LockWatchValueScopingCache create(

@Override
public synchronized void processStartTransactions(Set<Long> startTimestamps) {
TransactionsLockWatchUpdate updateForTransactions =
eventCache.getUpdateForTransactions(startTimestamps, currentVersion);
try (CloseableTracer _trace = CloseableTracer.startSpan("processStartTransactions")) {
TransactionsLockWatchUpdate updateForTransactions =
eventCache.getUpdateForTransactions(startTimestamps, currentVersion);

Optional<LockWatchVersion> latestVersionFromUpdate = computeMaxUpdateVersion(updateForTransactions);
Optional<LockWatchVersion> latestVersionFromUpdate = computeMaxUpdateVersion(updateForTransactions);

if (updateForTransactions.clearCache()) {
clearCache(updateForTransactions, latestVersionFromUpdate);
}
if (updateForTransactions.clearCache()) {
clearCache(updateForTransactions, latestVersionFromUpdate);
}

updateStores(updateForTransactions);
updateCurrentVersion(latestVersionFromUpdate);
try (CloseableTracer tracer = CloseableTracer.startSpan("LockWatchValueScopingCacheImpl#updateStores")) {
updateStores(updateForTransactions);
}
try (CloseableTracer tracer =
CloseableTracer.startSpan("LockWatchValueScopingCacheImpl#updateCurrentVersion")) {
updateCurrentVersion(latestVersionFromUpdate);
}
}
}

@Override
public synchronized void updateCacheWithCommitTimestampsInformation(Set<Long> startTimestamps) {
public void updateCacheWithCommitTimestampsInformation(Set<Long> startTimestamps) {
startTimestamps.forEach(this::processCommitUpdate);
}

Expand All @@ -139,8 +147,27 @@ public void requestStateRemoved(long startTs) {

@Override
public synchronized void ensureStateRemoved(long startTimestamp) {
ensureStateRemovedImpl(startTimestamp);
}

private synchronized void ensureStateRemoved(Set<Long> elements) {
try (CloseableTracer tracer = CloseableTracer.startSpan("ensureStateRemoved(Set<Long>)")) {
for (long startTimestamp : elements) {
ensureStateRemovedImpl(startTimestamp);
}
}
}

private synchronized void ensureStateRemovedImpl(long startTimestamp) {
// Not very concurrency safe, but should be fast, just removal from
// 2 maps. Once a second it runs some cleanup on internal datastructures,
// which will block, but should also be fairly quick.
// I think the requirement here is that sometimes we need to
// do computation on the whole datastrure, but we should figure out
// a way to not have to do that.
StartTimestamp startTs = StartTimestamp.of(startTimestamp);
snapshotStore.removeTimestamp(startTs);
// Concurrency safe
cacheStore.removeCache(startTs);
}

Expand Down Expand Up @@ -214,11 +241,16 @@ public TransactionScopedCache getReadOnlyTransactionScopedCacheForCommit(long st
return cacheStore.getReadOnlyCache(StartTimestamp.of(startTs));
}

private synchronized void processCommitUpdate(long startTimestamp) {
private void processCommitUpdate(long startTimestamp) {
StartTimestamp startTs = StartTimestamp.of(startTimestamp);
// Just a lookup to a map: threadsafe and doesn't really need to be synchronized.
TransactionScopedCache cache = cacheStore.getCache(startTs);
cache.finalise();

// This is already threadsafe and called directly from other places?
CommitUpdate commitUpdate = eventCache.getCommitUpdate(startTimestamp);

// Just #computeIfPresent on a threadsafe map
cacheStore.createReadOnlyCache(startTs, commitUpdate);
}

Expand Down Expand Up @@ -350,9 +382,7 @@ public void accept(Set<Long> elements) {
// TODO (jkong): Implement a nicer version of multi-removal.
// The current version for instance may, in between timestamps, spam the rate limiter or clean up snapshots
// multiple times when it could just do the check once at the end.
for (long element : elements) {
ensureStateRemoved(element);
}
ensureStateRemoved(elements);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.palantir.logsafe.UnsafeArg;
import com.palantir.logsafe.logger.SafeLogger;
import com.palantir.logsafe.logger.SafeLoggerFactory;
import com.palantir.tracing.CloseableTracer;
import java.util.Collection;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -129,17 +130,19 @@ public CommitUpdate getEventUpdate(long startTs) {
@Override
public synchronized TransactionsLockWatchUpdate getUpdateForTransactions(
Set<Long> startTimestamps, Optional<LockWatchVersion> lastKnownVersion) {
Preconditions.checkArgument(!startTimestamps.isEmpty(), "Cannot get update for empty set of transactions");
TimestampMapping timestampMapping = getTimestampMappings(startTimestamps);

VersionBounds versionBounds = VersionBounds.builder()
.startVersion(lastKnownVersion)
.endVersion(timestampMapping.lastVersion())
.earliestSnapshotVersion(timestampMapping.versionRange().lowerEndpoint())
.build();

return eventLog.getEventsBetweenVersions(versionBounds)
.toTransactionsLockWatchUpdate(timestampMapping, lastKnownVersion);
try (CloseableTracer tracer = CloseableTracer.startSpan("LockWatchEventCacheImpl#getUpdateForTransactions")) {
Preconditions.checkArgument(!startTimestamps.isEmpty(), "Cannot get update for empty set of transactions");
TimestampMapping timestampMapping = getTimestampMappings(startTimestamps);

VersionBounds versionBounds = VersionBounds.builder()
.startVersion(lastKnownVersion)
.endVersion(timestampMapping.lastVersion())
.earliestSnapshotVersion(timestampMapping.versionRange().lowerEndpoint())
.build();

return eventLog.getEventsBetweenVersions(versionBounds)
.toTransactionsLockWatchUpdate(timestampMapping, lastKnownVersion);
}
}

@Override
Expand All @@ -165,13 +168,15 @@ synchronized LockWatchEventCacheState getStateForDiagnostics() {
}

private synchronized TimestampMapping getTimestampMappings(Set<Long> startTimestamps) {
ImmutableTimestampMapping.Builder mappingBuilder = TimestampMapping.builder();
startTimestamps.forEach(timestamp -> {
Optional<LockWatchVersion> entry = timestampStateStore.getStartVersion(timestamp);
assertTrue(entry.isPresent(), "start timestamp missing from map");
mappingBuilder.putTimestampMapping(timestamp, entry.get());
});
return mappingBuilder.build();
try (CloseableTracer _trace = CloseableTracer.startSpan("LockWatchEventCacheImpl#getTimestampMappings")) {
ImmutableTimestampMapping.Builder mappingBuilder = TimestampMapping.builder();
startTimestamps.forEach(timestamp -> {
Optional<LockWatchVersion> entry = timestampStateStore.getStartVersion(timestamp);
assertTrue(entry.isPresent(), "start timestamp missing from map");
mappingBuilder.putTimestampMapping(timestamp, entry.get());
});
return mappingBuilder.build();
}
}

private synchronized Optional<LockWatchVersion> processEventLogUpdate(LockWatchStateUpdate update) {
Expand Down
Loading

0 comments on commit 89b5644

Please sign in to comment.