Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
Immutable commit timestamp and delayed writes.
Browse files Browse the repository at this point in the history
  • Loading branch information
jkozlowski committed Sep 6, 2024
1 parent bd78b57 commit 885eb3c
Show file tree
Hide file tree
Showing 67 changed files with 1,347 additions and 300 deletions.
28 changes: 28 additions & 0 deletions .palantir/revapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,34 @@ acceptedBreaks:
- code: "java.method.removed"
old: "method void com.palantir.atlasdb.keyvalue.api.watch.NoOpLockWatchManager::removeTransactionStateFromCache(long)"
justification: "Internal API"
"0.1142.0":
com.palantir.atlasdb:atlasdb-api:
- code: "java.annotation.removed"
old: "parameter void com.palantir.atlasdb.transaction.api.precommit.PreCommitRequirementValidator::throwIfImmutableTsOrCommitLocksExpired(===com.palantir.lock.v2.LockToken===)"
new: "parameter void com.palantir.atlasdb.transaction.api.precommit.PreCommitRequirementValidator::throwIfImmutableTsOrCommitLocksExpired(===java.util.Set<com.palantir.lock.v2.LockToken>===)"
justification: "Prototyping"
- code: "java.annotation.removed"
old: "parameter void com.palantir.atlasdb.transaction.api.precommit.PreCommitRequirementValidator::throwIfPreCommitRequirementsNotMet(===com.palantir.lock.v2.LockToken===,\
\ long)"
new: "parameter void com.palantir.atlasdb.transaction.api.precommit.PreCommitRequirementValidator::throwIfPreCommitRequirementsNotMet(===java.util.Set<com.palantir.lock.v2.LockToken>===,\
\ long)"
justification: "Prototyping"
- code: "java.method.addedToInterface"
new: "method long com.palantir.atlasdb.transaction.api.TransactionManager::getCommitImmutableTimestamp()"
justification: "Prototyping"
- code: "java.method.addedToInterface"
new: "method void com.palantir.atlasdb.transaction.api.Transaction::putDelayed(java.util.List<com.palantir.atlasdb.transaction.api.DelayedWrite>)"
justification: "Prototyping"
- code: "java.method.parameterTypeChanged"
old: "parameter void com.palantir.atlasdb.transaction.api.precommit.PreCommitRequirementValidator::throwIfImmutableTsOrCommitLocksExpired(===com.palantir.lock.v2.LockToken===)"
new: "parameter void com.palantir.atlasdb.transaction.api.precommit.PreCommitRequirementValidator::throwIfImmutableTsOrCommitLocksExpired(===java.util.Set<com.palantir.lock.v2.LockToken>===)"
justification: "Prototyping"
- code: "java.method.parameterTypeChanged"
old: "parameter void com.palantir.atlasdb.transaction.api.precommit.PreCommitRequirementValidator::throwIfPreCommitRequirementsNotMet(===com.palantir.lock.v2.LockToken===,\
\ long)"
new: "parameter void com.palantir.atlasdb.transaction.api.precommit.PreCommitRequirementValidator::throwIfPreCommitRequirementsNotMet(===java.util.Set<com.palantir.lock.v2.LockToken>===,\
\ long)"
justification: "Prototyping"
"0.770.0":
com.palantir.atlasdb:atlasdb-api:
- code: "java.class.removed"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* (c) Copyright 2024 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.atlasdb.transaction.api;

import com.palantir.atlasdb.keyvalue.api.Cell;
import com.palantir.atlasdb.keyvalue.api.TableReference;
import java.util.Map;
import java.util.function.LongFunction;

/**
* Curious creature, but it allows the user to delay deciding the particular cell to write to until the actual commit,
* at which point the user will be provided with a unique long value that can be used to generate the cell.
*
* This is a very specific feature that not many people probably require, or should try to use.
*
* Right now the interface allows you to do TOO MUCH, you don't even have to specify the tables you write to.
* This can be improved and locked down, but this allows for maximum experimentation.
*/
@FunctionalInterface
public interface DelayedWrite extends LongFunction<Map<TableReference, Map<Cell, byte[]>>> {}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.palantir.lock.watch.ChangeMetadata;
import com.palantir.util.result.Result;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
Expand Down Expand Up @@ -323,6 +324,9 @@ Stream<BatchingVisitable<RowResult<byte[]>>> getRangesLazy(
@Idempotent
void putWithMetadata(TableReference tableRef, Map<Cell, ValueAndChangeMetadata> valuesAndMetadata);

@Idempotent
void putDelayed(List<DelayedWrite> values);

/**
* Deletes values from the key-value store.
* @param tableRef the table from which to delete the values
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,8 @@ <T, C extends PreCommitCondition, E extends Exception> T runTaskWithConditionRea
*/
long getImmutableTimestamp();

long getCommitImmutableTimestamp();

/**
* Returns the lock service used by this transaction manager.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import com.palantir.atlasdb.transaction.api.TransactionFailedException;
import com.palantir.lock.v2.LockToken;
import java.util.Map;
import javax.annotation.Nullable;
import java.util.Set;

public interface PreCommitRequirementValidator {
/**
Expand All @@ -41,10 +41,10 @@ void throwIfPreCommitConditionInvalidAtCommitOnWriteTransaction(
* user pre-commit condition is no longer valid, or possibly because of other internal state such as commit
* locks having expired.
*/
void throwIfPreCommitRequirementsNotMet(@Nullable LockToken commitLocksToken, long timestamp);
void throwIfPreCommitRequirementsNotMet(Set<LockToken> commitLocksToken, long timestamp);

/**
* Throws a {@link TransactionFailedException} if the immutable timestamp lock or commit locks have expired.
*/
void throwIfImmutableTsOrCommitLocksExpired(@Nullable LockToken commitLocksToken);
void throwIfImmutableTsOrCommitLocksExpired(Set<LockToken> commitLocksToken);
}
1 change: 1 addition & 0 deletions atlasdb-autobatch/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ dependencies {
implementation 'com.palantir.conjure.java.api:errors'
implementation 'com.palantir.safe-logging:preconditions'
implementation 'com.palantir.tracing:tracing'
implementation 'com.palantir.tracing:tracing-api'
implementation 'io.dropwizard.metrics:metrics-core'
implementation project(':commons-executors')

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
import com.palantir.logsafe.logger.SafeLogger;
import com.palantir.logsafe.logger.SafeLoggerFactory;
import com.palantir.tracing.Observability;
import com.palantir.tracing.Tracer;
import com.palantir.tracing.Tracers;
import com.palantir.tracing.api.SpanType;
import java.time.Duration;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -97,7 +100,25 @@ public static <O> AutobatcherBuilder<SupplierKey, O> coalescing(Supplier<O> supp
*/
public static <I, O> AutobatcherBuilder<I, O> independent(Consumer<List<BatchElement<I, O>>> batchFunction) {
return new AutobatcherBuilder<>(parameters -> new IndependentBatchingEventHandler<>(
maybeWrapWithTimeout(batchFunction, parameters), parameters.batchSize()));
maybeWrapWithTimeout(
input -> {
long start = System.nanoTime();
try {
Tracer.initTraceWithSpan(
Observability.SAMPLE,
Tracers.randomId(),
parameters.safeLoggablePurpose(),
SpanType.LOCAL);
batchFunction.accept(input);
} finally {
log.info(
"Finished batch",
SafeArg.of("duration", Duration.ofNanos(System.nanoTime() - start)));
Tracer.fastCompleteSpan();
}
},
parameters),
parameters.batchSize()));
}

private static <I, O> Consumer<List<BatchElement<I, O>>> maybeWrapWithTimeout(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.palantir.atlasdb.keyvalue.api.RowResult;
import com.palantir.atlasdb.keyvalue.api.TableReference;
import com.palantir.atlasdb.transaction.api.ConstraintCheckable;
import com.palantir.atlasdb.transaction.api.DelayedWrite;
import com.palantir.atlasdb.transaction.api.GetRangesQuery;
import com.palantir.atlasdb.transaction.api.Transaction;
import com.palantir.atlasdb.transaction.api.TransactionFailedException;
Expand All @@ -37,6 +38,7 @@
import com.palantir.lock.watch.ChangeMetadata;
import com.palantir.util.result.Result;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
Expand Down Expand Up @@ -144,6 +146,11 @@ public void putWithMetadata(TableReference tableRef, Map<Cell, ValueAndChangeMet
delegate().putWithMetadata(tableRef, valuesAndMetadata);
}

@Override
public void putDelayed(List<DelayedWrite> values) {
delegate().putDelayed(values);
}

@Override
public void delete(TableReference tableRef, Set<Cell> keys) {
delegate().delete(tableRef, keys);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,11 @@ private static LockAndTimestampServices createRawRemoteServices(
.lock(lockService)
.timestamp(timeService)
.timestampManagement(timestampManagementService)
.timelock(new LegacyTimelockService(timeService, lockService, TransactionManagers.LOCK_CLIENT))
.timelock(new LegacyTimelockService(
timeService,
lockService,
TransactionManagers.LOCK_CLIENT,
TransactionManagers.COMMIT_TIMESTAMP_LOCK_CLIENT))
.build();
}

Expand All @@ -463,7 +467,11 @@ private static LockAndTimestampServices createRawEmbeddedServices(
.lock(lockService)
.timestamp(timeService)
.timestampManagement(timestampManagementService)
.timelock(new LegacyTimelockService(timeService, lockService, TransactionManagers.LOCK_CLIENT))
.timelock(new LegacyTimelockService(
timeService,
lockService,
TransactionManagers.LOCK_CLIENT,
TransactionManagers.COMMIT_TIMESTAMP_LOCK_CLIENT))
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@
public abstract class TransactionManagers {
private static final SafeLogger log = SafeLoggerFactory.get(TransactionManagers.class);
public static final LockClient LOCK_CLIENT = LockClient.of("atlas instance");
public static final LockClient COMMIT_TIMESTAMP_LOCK_CLIENT = LockClient.of("commitTimestamp");

abstract AtlasDbConfig config();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.palantir.logsafe.exceptions.SafeIllegalStateException;
import com.palantir.logsafe.logger.SafeLogger;
import com.palantir.logsafe.logger.SafeLoggerFactory;
import com.palantir.tracing.CloseableTracer;
import java.util.Comparator;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -86,7 +87,9 @@ public void reset() {

@Override
public void createReadOnlyCache(StartTimestamp timestamp, CommitUpdate commitUpdate) {
cacheMap.computeIfPresent(timestamp, (_startTs, cache) -> cache.withReadOnlyCache(commitUpdate));
try (CloseableTracer tracer = CloseableTracer.startSpan("CacheStoreImpl#createReadOnlyCache")) {
cacheMap.computeIfPresent(timestamp, (_startTs, cache) -> cache.withReadOnlyCache(commitUpdate));
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@
import com.palantir.atlasdb.keyvalue.api.AtlasLockDescriptorUtils;
import com.palantir.atlasdb.keyvalue.api.CellReference;
import com.palantir.atlasdb.keyvalue.api.TableReference;
import com.palantir.conjure.java.client.config.ImmutablesStyle;
import com.palantir.lock.LockDescriptor;
import com.palantir.lock.watch.CommitUpdate;
import com.palantir.lock.watch.CommitUpdate.Visitor;
import com.palantir.logsafe.Unsafe;
import com.palantir.tracing.CloseableTracer;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
Expand All @@ -41,7 +44,9 @@ private FilteringValueCacheSnapshot(ValueCacheSnapshot delegate, LockedCells loc
}

static ValueCacheSnapshot create(ValueCacheSnapshot delegate, CommitUpdate commitUpdate) {
return new FilteringValueCacheSnapshot(delegate, toLockedCells(commitUpdate));
try (CloseableTracer tracer = CloseableTracer.startSpan("FilteringValueCacheSnapshot#create")) {
return new FilteringValueCacheSnapshot(delegate, toLockedCells(commitUpdate));
}
}

@Override
Expand Down Expand Up @@ -79,20 +84,23 @@ public String toString() {
}

private static LockedCells toLockedCells(CommitUpdate commitUpdate) {
return commitUpdate.accept(new Visitor<LockedCells>() {
@Override
public LockedCells invalidateAll() {
return LockedCells.invalidateAll();
}

@Override
public LockedCells invalidateSome(Set<LockDescriptor> invalidatedLocks) {
return LockedCells.invalidateSome(invalidatedLocks);
}
});
try (CloseableTracer tracer = CloseableTracer.startSpan("FilteringValueCacheSnapshot#toLockedCells")) {
return commitUpdate.accept(new Visitor<LockedCells>() {
@Override
public LockedCells invalidateAll() {
return LockedCells.invalidateAll();
}

@Override
public LockedCells invalidateSome(Set<LockDescriptor> invalidatedLocks) {
return LockedCells.invalidateSome(invalidatedLocks);
}
});
}
}

@Value.Immutable
@ImmutablesStyle
interface LockedCells {
boolean allLocked();

Expand All @@ -103,13 +111,17 @@ static LockedCells invalidateAll() {
}

static LockedCells invalidateSome(Set<LockDescriptor> descriptors) {
return ImmutableLockedCells.builder()
.allLocked(false)
.lockedCells(descriptors.stream()
.map(AtlasLockDescriptorUtils::candidateCells)
.flatMap(List::stream)
.collect(Collectors.toSet()))
.build();
try (CloseableTracer tracer = CloseableTracer.startSpan(
"FilteringValueCacheSnapshot#invalidateSome",
Map.of("size", Integer.toString(descriptors.size())))) {
return ImmutableLockedCells.builder()
.allLocked(false)
.lockedCells(descriptors.stream()
.map(AtlasLockDescriptorUtils::candidateCells)
.flatMap(List::stream)
.collect(Collectors.toSet()))
.build();
}
}

default boolean isUnlocked(CellReference cellReference) {
Expand Down
Loading

0 comments on commit 885eb3c

Please sign in to comment.