From eff4f948f703ca0ab3a55d09bf64dfa48bf3358a Mon Sep 17 00:00:00 2001 From: Will Dey Date: Sat, 2 Mar 2024 22:47:05 -0500 Subject: [PATCH 01/10] @Output is unidiomatic and unnecessary in SnapshotTransaction --- .../cell/api/TransactionKeyValueService.java | 7 + .../atlasdb/futures/AtlasFutures.java | 67 +++ .../atlasdb/futures/FutureSpliterator.java | 76 +++ .../impl/GetRowsColumnRangeIterator.java | 3 +- .../transaction/impl/SnapshotTransaction.java | 457 ++++++++---------- 5 files changed, 353 insertions(+), 257 deletions(-) create mode 100644 atlasdb-commons/src/main/java/com/palantir/atlasdb/futures/FutureSpliterator.java diff --git a/atlasdb-api/src/main/java/com/palantir/atlasdb/cell/api/TransactionKeyValueService.java b/atlasdb-api/src/main/java/com/palantir/atlasdb/cell/api/TransactionKeyValueService.java index 72c139df4f8..3a2c2935ddb 100644 --- a/atlasdb-api/src/main/java/com/palantir/atlasdb/cell/api/TransactionKeyValueService.java +++ b/atlasdb-api/src/main/java/com/palantir/atlasdb/cell/api/TransactionKeyValueService.java @@ -18,6 +18,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.errorprone.annotations.MustBeClosed; +import com.palantir.atlasdb.futures.AtlasFutures; import com.palantir.atlasdb.keyvalue.api.BatchColumnRangeSelection; import com.palantir.atlasdb.keyvalue.api.Cell; import com.palantir.atlasdb.keyvalue.api.ColumnRangeSelection; @@ -32,6 +33,7 @@ import com.palantir.processors.AutoDelegate; import com.palantir.util.paging.TokenBackedBasicResultsPage; import java.util.Map; +import java.util.concurrent.CompletableFuture; /** * Key-Value API to be used with user data tables. @@ -67,6 +69,11 @@ RowColumnRangeIterator getRowsColumnRange( ListenableFuture> getAsync(TableReference tableRef, Map timestampByCell); + default CompletableFuture> getCompletableAsync( + TableReference tableRef, Map timestampByCell) { + return AtlasFutures.toCompletableFuture(getAsync(tableRef, timestampByCell)); + } + Map getLatestTimestamps(TableReference tableRef, Map timestampByCell); void multiPut(Map> valuesByTable, long timestamp) diff --git a/atlasdb-commons/src/main/java/com/palantir/atlasdb/futures/AtlasFutures.java b/atlasdb-commons/src/main/java/com/palantir/atlasdb/futures/AtlasFutures.java index 55a18f58a6f..660581a6f95 100644 --- a/atlasdb-commons/src/main/java/com/palantir/atlasdb/futures/AtlasFutures.java +++ b/atlasdb-commons/src/main/java/com/palantir/atlasdb/futures/AtlasFutures.java @@ -16,18 +16,24 @@ package com.palantir.atlasdb.futures; +import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import com.palantir.common.base.Throwables; import com.palantir.common.streams.KeyedStream; import com.palantir.tracing.DeferredTracer; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.jetbrains.annotations.NotNull; public final class AtlasFutures { private AtlasFutures() {} @@ -109,6 +115,67 @@ public static R getUnchecked(Future future) { } } + public static CompletableFuture toCompletableFuture(ListenableFuture listenableFuture) { + CompletableFuture completableFuture = new CompletableFuture() { + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + boolean result = listenableFuture.cancel(mayInterruptIfRunning); + super.cancel(mayInterruptIfRunning); + return result; + } + }; + Futures.addCallback( + listenableFuture, + new FutureCallback() { + @Override + public void onSuccess(R result) { + completableFuture.complete(result); + } + + @Override + public void onFailure(Throwable t) { + completableFuture.completeExceptionally(t); + } + }, + MoreExecutors.directExecutor()); + return completableFuture; + } + + public static ListenableFuture toListenableFuture(CompletableFuture completableFuture) { + return new ListenableFuture() { + @Override + public void addListener(Runnable listener, Executor executor) { + completableFuture.thenRunAsync(listener, executor); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return completableFuture.cancel(mayInterruptIfRunning); + } + + @Override + public boolean isCancelled() { + return completableFuture.isCancelled(); + } + + @Override + public boolean isDone() { + return completableFuture.isDone(); + } + + @Override + public R get() throws InterruptedException, ExecutionException { + return completableFuture.get(); + } + + @Override + public R get(long timeout, @NotNull TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + return completableFuture.get(timeout, unit); + } + }; + } + private static Executor traceRestoringExecutor(Executor executor, String operation) { DeferredTracer deferredTracer = new DeferredTracer(operation); return command -> executor.execute(() -> deferredTracer.withTrace(() -> { diff --git a/atlasdb-commons/src/main/java/com/palantir/atlasdb/futures/FutureSpliterator.java b/atlasdb-commons/src/main/java/com/palantir/atlasdb/futures/FutureSpliterator.java new file mode 100644 index 00000000000..49eb6a6f12a --- /dev/null +++ b/atlasdb-commons/src/main/java/com/palantir/atlasdb/futures/FutureSpliterator.java @@ -0,0 +1,76 @@ +/* + * (c) Copyright 2024 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.atlasdb.futures; + +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import java.util.Map; +import java.util.Set; +import java.util.Spliterator; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; +import java.util.function.Consumer; + +public class FutureSpliterator implements Spliterator { + private final Future> future; + + public FutureSpliterator(Future> future) { + this.future = future; + } + + @Override + public long estimateSize() { + if (future.isDone()) { + try { + return future.get().estimateSize(); + } catch (Exception e) { + // pass + } + } + return Long.MAX_VALUE; + } + + @Override + public int characteristics() { + return Spliterator.ORDERED; + } + + @Override + public boolean tryAdvance(Consumer consumer) { + return AtlasFutures.getUnchecked(future).tryAdvance(consumer); + } + + @Override + public void forEachRemaining(Consumer consumer) { + AtlasFutures.getUnchecked(future).forEachRemaining(consumer); + } + + @Override + public Spliterator trySplit() { + return null; + } + + public static FutureSpliterator> ofEntries(ListenableFuture> futureMap) { + return new FutureSpliterator<>( + Futures.transform(futureMap, map -> map.entrySet().spliterator(), MoreExecutors.directExecutor())); + } + + public static FutureSpliterator> ofEntries(CompletableFuture> futureMap) { + return new FutureSpliterator<>(futureMap.thenApply(Map::entrySet).thenApply(Set::spliterator)); + } +} diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/GetRowsColumnRangeIterator.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/GetRowsColumnRangeIterator.java index 70c80f012c5..08e190af66e 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/GetRowsColumnRangeIterator.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/GetRowsColumnRangeIterator.java @@ -27,7 +27,6 @@ import com.palantir.atlasdb.keyvalue.api.Value; import com.palantir.common.base.ClosableIterator; import com.palantir.common.base.ClosableIterators; -import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.Map; @@ -110,6 +109,6 @@ protected Iterator> computeNext() { @FunctionalInterface interface PostFilterer { - Collection> postFilter(Map rawResults); + Map postFilter(Map rawResults); } } diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransaction.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransaction.java index 05d7468fac0..668fd17fa7f 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransaction.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransaction.java @@ -55,6 +55,7 @@ import com.palantir.atlasdb.debug.ConflictTracer; import com.palantir.atlasdb.encoding.PtBytes; import com.palantir.atlasdb.futures.AtlasFutures; +import com.palantir.atlasdb.futures.FutureSpliterator; import com.palantir.atlasdb.keyvalue.api.BatchColumnRangeSelection; import com.palantir.atlasdb.keyvalue.api.Cell; import com.palantir.atlasdb.keyvalue.api.ColumnRangeSelection; @@ -164,21 +165,24 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.NavigableMap; import java.util.NoSuchElementException; -import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.SortedMap; import java.util.SortedSet; +import java.util.Spliterator; import java.util.TreeMap; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -186,10 +190,12 @@ import java.util.function.Function; import java.util.function.LongSupplier; import java.util.function.Supplier; -import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.stream.StreamSupport; import javax.annotation.Nullable; +import one.util.streamex.EntryStream; +import one.util.streamex.MoreCollectors; +import one.util.streamex.StreamEx; import org.apache.commons.lang3.Validate; import org.apache.commons.lang3.tuple.Pair; import org.eclipse.collections.api.LongIterable; @@ -720,7 +726,7 @@ private Iterator> getRowColumnRangePostFiltered( return Collections.emptyIterator(); } SortedMap postFiltered = ImmutableSortedMap.copyOf( - getWithPostFilteringSync(tableRef, raw, x -> x), preserveInputRowOrder(batch)); + getWithPostFiltering(tableRef, raw, x -> x), preserveInputRowOrder(batch)); return postFiltered.entrySet().iterator(); })); } @@ -741,7 +747,7 @@ private ClosableIterator> getRowColumnRangePostFiltered( // we can't skip lock checks on range scans validatePreCommitRequirementsOnNonExhaustiveReadIfNecessary(tableRef, getStartTimestamp()); }, - raw -> getWithPostFilteringSync(tableRef, raw, Value.GET_VALUE)); + raw -> getWithPostFiltering(tableRef, raw, Value.GET_VALUE)); } private Iterator> getRowColumnRangePostFilteredWithoutSorting( @@ -756,7 +762,7 @@ private Iterator> getRowColumnRangePostFilteredWithoutSor } SortedMap postFiltered = - ImmutableSortedMap.copyOf(getWithPostFilteringSync(tableRef, raw, x -> x), cellComparator); + ImmutableSortedMap.copyOf(getWithPostFiltering(tableRef, raw, x -> x), cellComparator); return postFiltered.entrySet().iterator(); })); } @@ -881,7 +887,7 @@ public SortedMap> getRowsIgnoringLocalWrites( private NavigableMap> filterRowResults( TableReference tableRef, Map rawResults, ImmutableMap.Builder resultCollector) { ImmutableMap collected = resultCollector - .putAll(getWithPostFilteringSync(tableRef, rawResults, Value.GET_VALUE)) + .putAll(getWithPostFiltering(tableRef, rawResults, Value.GET_VALUE)) .buildOrThrow(); Map filterDeletedValues = removeEmptyColumns(collected, tableRef); return RowResults.viewOfSortedMap(Cells.breakCellsUpByRow(filterDeletedValues)); @@ -990,52 +996,50 @@ ListenableFuture> getInternal( } hasReads = true; - Map result = new HashMap<>(); - Map writes = localWriteBuffer.getLocalWrites().get(tableRef); - long numberOfNonDeleteLocalWrites = 0; - if (writes != null && !writes.isEmpty()) { - for (Cell cell : cells) { - byte[] value = writes.get(cell); - if (value != null) { - result.put(cell, value); - if (value != PtBytes.EMPTY_BYTE_ARRAY) { - numberOfNonDeleteLocalWrites++; - } - } - } - } + Map localWrites = Optional.ofNullable( + localWriteBuffer.getLocalWrites().get(tableRef)) + .map(writes -> + StreamEx.of(cells).mapToEntry(writes::get).nonNull().toImmutableMap()) + .orElseGet(ImmutableMap::of); + long numberOfNonDeleteLocalWrites = + StreamEx.ofValues(localWrites).count(Predicates.not(PtBytes.EMPTY_BYTE_ARRAY::equals)); // We don't need to read any cells that were written locally. long expectedNumberOfPresentCellsToFetch = numberOfExpectedPresentCells - numberOfNonDeleteLocalWrites; - return Futures.transform( - getFromKeyValueService( + return AtlasFutures.toListenableFuture(getFromKeyValueService( tableRef, - Sets.difference(cells, result.keySet()), + Sets.difference(cells, localWrites.keySet()), asyncKeyValueService, - asyncTransactionService), - fromKeyValueService -> { - result.putAll(fromKeyValueService); - - long getMillis = TimeUnit.NANOSECONDS.toMillis(timer.stop()); + asyncTransactionService) + .whenComplete((maybeFromKeyValueService, _ex) -> Optional.ofNullable(maybeFromKeyValueService) + .ifPresent(fromKeyValueService -> { + CellCountValidator.validateFetchedLessOrEqualToExpected( + expectedNumberOfPresentCellsToFetch, fromKeyValueService); + boolean allPossibleCellsReadAndPresent = + fromKeyValueService.size() == expectedNumberOfPresentCellsToFetch; + validatePreCommitRequirementsOnReadIfNecessary( + tableRef, getStartTimestamp(), allPossibleCellsReadAndPresent); + })) + .thenApply(fromKeyValueService -> ImmutableMap.builder() + .putAll(localWrites) + .putAll(fromKeyValueService) + .buildKeepingLast()) + .whenComplete((result, _e) -> { if (perfLogger.isDebugEnabled()) { perfLogger.debug( "Snapshot transaction get cells (some possibly deleted)", LoggingArgs.tableRef(tableRef), SafeArg.of("numberOfCells", cells.size()), - SafeArg.of("numberOfCellsRetrieved", result.size()), + SafeArg.of( + "numberOfCellsRetrieved", + Optional.ofNullable(result) + .map(Map::size) + .orElse(0)), SafeArg.of("getOperation", operationName), - SafeArg.of("durationMillis", getMillis)); + SafeArg.of("durationMillis", TimeUnit.NANOSECONDS.toMillis(timer.stop()))); } - - CellCountValidator.validateFetchedLessOrEqualToExpected( - expectedNumberOfPresentCellsToFetch, fromKeyValueService); - boolean allPossibleCellsReadAndPresent = - fromKeyValueService.size() == expectedNumberOfPresentCellsToFetch; - validatePreCommitRequirementsOnReadIfNecessary( - tableRef, getStartTimestamp(), allPossibleCellsReadAndPresent); - return removeEmptyColumns(result, tableRef); - }, - MoreExecutors.directExecutor()); + }) + .thenApply(result -> removeEmptyColumns(result, tableRef))); } @Override @@ -1046,7 +1050,7 @@ public Map getIgnoringLocalWrites(TableReference tableRef, Set> result = getFromKeyValueService( + Future> result = getFromKeyValueService( tableRef, cells, immediateTransactionKeyValueService, immediateTransactionService); Map unfiltered = Futures.getUnchecked(result); @@ -1065,19 +1069,17 @@ public Map getIgnoringLocalWrites(TableReference tableRef, Set> getFromKeyValueService( + private CompletableFuture> getFromKeyValueService( TableReference tableRef, Set cells, TransactionKeyValueService asyncKeyValueService, AsyncTransactionService asyncTransactionService) { Map toRead = Cells.constantValueMap(cells, getStartTimestamp()); - ListenableFuture>> postFilteredResults = Futures.transformAsync( - asyncKeyValueService.getAsync(tableRef, toRead), - rawResults -> getWithPostFilteringAsync( - tableRef, rawResults, Value.GET_VALUE, asyncKeyValueService, asyncTransactionService), - MoreExecutors.directExecutor()); - - return Futures.transform(postFilteredResults, ImmutableMap::copyOf, MoreExecutors.directExecutor()); + return asyncKeyValueService + .getCompletableAsync(tableRef, toRead) + .thenApply(rawResults -> getWithPostFiltering( + tableRef, rawResults, Value.GET_VALUE, asyncKeyValueService, asyncTransactionService)) + .thenApply(ImmutableMap::copyOf); } private static byte[] getNextStartRowName( @@ -1506,7 +1508,7 @@ private SortedMap postFilterRows( } } - return ImmutableSortedMap.copyOf(getWithPostFilteringSync(tableRef, rawResults, transformer)); + return ImmutableSortedMap.copyOf(getWithPostFiltering(tableRef, rawResults, transformer)); } private int estimateSize(List> rangeRows) { @@ -1517,22 +1519,24 @@ private int estimateSize(List> rangeRows) { return estimatedSize; } - private Collection> getWithPostFilteringSync( + private Map getWithPostFiltering( TableReference tableRef, Map rawResults, Function transformer) { - return AtlasFutures.getUnchecked(getWithPostFilteringAsync( - tableRef, rawResults, transformer, immediateTransactionKeyValueService, immediateTransactionService)); + return getWithPostFiltering( + tableRef, rawResults, transformer, immediateTransactionKeyValueService, immediateTransactionService); } - private ListenableFuture>> getWithPostFilteringAsync( + private Map getWithPostFiltering( TableReference tableRef, Map rawResults, Function transformer, TransactionKeyValueService asyncKeyValueService, AsyncTransactionService asyncTransactionService) { - long bytes = 0; - for (Map.Entry entry : rawResults.entrySet()) { - bytes += entry.getValue().getContents().length + Cells.getApproxSizeOfCell(entry.getKey()); - } + long bytes = EntryStream.of(rawResults) + .mapKeys(Cells::getApproxSizeOfCell) + .mapValues(value -> value.getContents().length) + .mapKeyValue(Long::sum) + .mapToLong(Long::longValue) + .sum(); if (bytes > TransactionConstants.WARN_LEVEL_FOR_QUEUED_BYTES && log.isWarnEnabled()) { log.warn( "A single get had quite a few bytes: {} for table {}. The number of results was {}. " @@ -1552,67 +1556,21 @@ private ListenableFuture>> getWithPostFilterin getCounter(AtlasDbMetricNames.SNAPSHOT_TRANSACTION_CELLS_READ, tableRef).inc(rawResults.size()); - Collection> resultsAccumulator = new ArrayList<>(); - if (AtlasDbConstants.HIDDEN_TABLES.contains(tableRef)) { Preconditions.checkState(allowHiddenTableAccess, "hidden tables cannot be read in this transaction"); // hidden tables are used outside of the transaction protocol, and in general have invalid timestamps, // so do not apply post-filtering as post-filtering would rollback (actually delete) the data incorrectly // this case is hit when reading a hidden table from console - for (Map.Entry e : rawResults.entrySet()) { - resultsAccumulator.add(Maps.immutableEntry(e.getKey(), transformer.apply(e.getValue()))); - } - return Futures.immediateFuture(resultsAccumulator); + return EntryStream.of(rawResults).mapValues(transformer).toImmutableMap(); } - return Futures.transformAsync( - Futures.immediateFuture(rawResults), - resultsToPostFilter -> getWithPostFilteringIterate( - tableRef, - resultsToPostFilter, - resultsAccumulator, - transformer, - asyncKeyValueService, - asyncTransactionService), - MoreExecutors.directExecutor()); - } - - private ListenableFuture>> getWithPostFilteringIterate( - TableReference tableReference, - Map resultsToPostFilter, - Collection> resultsAccumulator, - Function transformer, - TransactionKeyValueService asyncKeyValueService, - AsyncTransactionService asyncTransactionService) { - return Futures.transformAsync( - Futures.immediateFuture(resultsToPostFilter), - results -> { - int iterations = 0; - Map remainingResultsToPostFilter = results; - while (!remainingResultsToPostFilter.isEmpty()) { - remainingResultsToPostFilter = AtlasFutures.getUnchecked(getWithPostFilteringInternal( - tableReference, - remainingResultsToPostFilter, - resultsAccumulator, - transformer, - asyncKeyValueService, - asyncTransactionService)); - Preconditions.checkState( - ++iterations < MAX_POST_FILTERING_ITERATIONS, - "Unable to filter cells to find correct result after " - + "reaching max iterations. This is likely due to aborted cells lying around," - + " or in the very rare case, could be due to transactions which constantly " - + "conflict but never commit. These values will be cleaned up eventually, but" - + " if the issue persists, ensure that sweep is caught up.", - SafeArg.of("table", tableReference), - SafeArg.of("maxIterations", MAX_POST_FILTERING_ITERATIONS)); - } - getCounter(AtlasDbMetricNames.SNAPSHOT_TRANSACTION_CELLS_RETURNED, tableReference) - .inc(resultsAccumulator.size()); - - return Futures.immediateFuture(resultsAccumulator); - }, - MoreExecutors.directExecutor()); + Map postFiltered = getWithPostFilteringInternal( + tableRef, rawResults, asyncKeyValueService, asyncTransactionService, 1) + .mapValues(transformer) + .toImmutableMap(); + getCounter(AtlasDbMetricNames.SNAPSHOT_TRANSACTION_CELLS_RETURNED, tableRef) + .inc(postFiltered.size()); + return postFiltered; } /** @@ -1620,52 +1578,37 @@ private ListenableFuture>> getWithPostFilterin * it was truncated. In this case, there is a chance that we end up with a sentinel with no valid AtlasDB cell * covering it. In this case, we ignore it. */ - private Set findOrphanedSweepSentinels(TableReference table, Map rawResults) { - Set sweepSentinels = Maps.filterValues(rawResults, SnapshotTransaction::isSweepSentinel) - .keySet(); + private Set findOrphanedSweepSentinels(TableReference table, Set sweepSentinels) { if (sweepSentinels.isEmpty()) { return Collections.emptySet(); } + Set actualOrphanedSentinels = findOrphanedSweepSentinelsInternal( + ImmutableMap.copyOf(transactionKeyValueService.getLatestTimestamps( + table, Maps.asMap(sweepSentinels, x -> Long.MAX_VALUE)))) + .toImmutableSet(); + deleteOrphanedSentinelsAsync(table, actualOrphanedSentinels); + return actualOrphanedSentinels; + } + private StreamEx findOrphanedSweepSentinelsInternal(Map timestampCandidates) { // for each sentinel, start at long max. Then iterate down with each found uncommitted value. // if committed value seen, stop: the sentinel is not orphaned // if we get back -1, the sentinel is orphaned - Map timestampCandidates = new HashMap<>( - transactionKeyValueService.getLatestTimestamps(table, Maps.asMap(sweepSentinels, x -> Long.MAX_VALUE))); - Set actualOrphanedSentinels = new HashSet<>(); - - while (!timestampCandidates.isEmpty()) { - Map> sentinelTypeToTimestamps = timestampCandidates.entrySet().stream() - .collect(Collectors.groupingBy( - entry -> entry.getValue() == Value.INVALID_VALUE_TIMESTAMP - ? SentinelType.DEFINITE_ORPHANED - : SentinelType.INDETERMINATE, - Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))); - - Map definiteOrphans = sentinelTypeToTimestamps.get(SentinelType.DEFINITE_ORPHANED); - if (definiteOrphans != null) { - actualOrphanedSentinels.addAll(definiteOrphans.keySet()); - } - - Map cellsToQuery = sentinelTypeToTimestamps.get(SentinelType.INDETERMINATE); - if (cellsToQuery == null) { - break; - } - Set committedStartTimestamps = KeyedStream.stream( - defaultTransactionService.get(cellsToQuery.values())) - .filter(Objects::nonNull) - .keys() - .collect(Collectors.toSet()); - - Map nextTimestampCandidates = KeyedStream.stream(cellsToQuery) - .filter(cellStartTimestamp -> !committedStartTimestamps.contains(cellStartTimestamp)) - .collectToMap(); - timestampCandidates = transactionKeyValueService.getLatestTimestamps(table, nextTimestampCandidates); - } - - deleteOrphanedSentinelsAsync(table, actualOrphanedSentinels); - - return actualOrphanedSentinels; + Map> sentinelTypeToTimestamps = EntryStream.of(timestampCandidates) + .collect(MoreCollectors.partitioningBy( + entry -> entry.getValue() == Value.INVALID_VALUE_TIMESTAMP, MoreCollectors.entriesToMap())); + Map definiteOrphans = sentinelTypeToTimestamps.getOrDefault(true, ImmutableMap.of()); + StreamEx resultStream = StreamEx.ofKeys(definiteOrphans); + Optional.ofNullable(sentinelTypeToTimestamps.get(false)) + .map(cellsToQuery -> EntryStream.of(cellsToQuery) + .removeValues(EntryStream.of(defaultTransactionService.get(cellsToQuery.values())) + .nonNullValues() + .keys() + .toImmutableSet()::contains) + .toImmutableMap()) + .map(this::findOrphanedSweepSentinelsInternal) + .ifPresent(resultStream::append); + return resultStream; } private void deleteOrphanedSentinelsAsync(TableReference table, Set actualOrphanedSentinels) { @@ -1685,121 +1628,126 @@ private static boolean isSweepSentinel(Value value) { * This will return all the key-value pairs that still need to be postFiltered. It will output properly post * filtered keys to the {@code resultsCollector} output param. */ - private ListenableFuture> getWithPostFilteringInternal( + private EntryStream getWithPostFilteringInternal( TableReference tableRef, Map rawResults, - @Output Collection> resultsCollector, - Function transformer, TransactionKeyValueService asyncKeyValueService, - AsyncTransactionService asyncTransactionService) { - Set orphanedSentinels = findOrphanedSweepSentinels(tableRef, rawResults); + AsyncTransactionService asyncTransactionService, + int iterations) { + Preconditions.checkState( + iterations < MAX_POST_FILTERING_ITERATIONS, + "Unable to filter cells to find correct result after " + + "reaching max iterations. This is likely due to aborted cells lying around," + + " or in the very rare case, could be due to transactions which constantly " + + "conflict but never commit. These values will be cleaned up eventually, but" + + " if the issue persists, ensure that sweep is caught up.", + SafeArg.of("table", tableRef), + SafeArg.of("maxIterations", MAX_POST_FILTERING_ITERATIONS)); + + Set sweepSentinels = Maps.filterValues(rawResults, SnapshotTransaction::isSweepSentinel) + .keySet(); + Set orphanedSentinels = findOrphanedSweepSentinels(tableRef, sweepSentinels); LongSet valuesStartTimestamps = getStartTimestampsForValues(rawResults.values()); - return Futures.transformAsync( + getCounter(AtlasDbMetricNames.CellFilterMetrics.INVALID_START_TS, tableRef) + .inc(sweepSentinels.size()); + // This means that this transaction started too long ago. When we do garbage collection, + // we clean up old values, and this transaction started at a timestamp before the garbage + // collection. + if (getReadSentinelBehavior().equals(TransactionReadSentinelBehavior.THROW_EXCEPTION) + && StreamEx.ofKeys(rawResults).anyMatch(orphanedSentinels::contains)) { + throw new TransactionFailedRetriableException("Tried to read a value that has been " + + "deleted. This can be caused by hard delete transactions using the type " + + TransactionType.AGGRESSIVE_HARD_DELETE + + ". It can also be caused by transactions taking too long, or" + + " its locks expired. Retrying it should work."); + } + + return streamFutureEntries(Futures.transform( getCommitTimestamps(tableRef, valuesStartTimestamps, true, asyncTransactionService), commitTimestamps -> collectCellsToPostFilter( - tableRef, - rawResults, - resultsCollector, - transformer, - asyncKeyValueService, - orphanedSentinels, - commitTimestamps), - MoreExecutors.directExecutor()); + tableRef, + Maps.filterValues(rawResults, Predicates.not(SnapshotTransaction::isSweepSentinel)), + asyncKeyValueService, + asyncTransactionService, + commitTimestamps, + iterations) + .spliterator(), + MoreExecutors.directExecutor())); } - private ListenableFuture> collectCellsToPostFilter( + private EntryStream collectCellsToPostFilter( TableReference tableRef, Map rawResults, - @Output Collection> resultsCollector, - Function transformer, TransactionKeyValueService asyncKeyValueService, - Set orphanedSentinels, - LongLongMap commitTimestamps) { - Map keysToReload = Maps.newHashMapWithExpectedSize(0); - Map keysToDelete = Maps.newHashMapWithExpectedSize(0); - ImmutableSet.Builder keysAddedBuilder = ImmutableSet.builder(); - - for (Map.Entry e : rawResults.entrySet()) { - Cell key = e.getKey(); - Value value = e.getValue(); - - if (isSweepSentinel(value)) { - getCounter(AtlasDbMetricNames.CellFilterMetrics.INVALID_START_TS, tableRef) - .inc(); - - // This means that this transaction started too long ago. When we do garbage collection, - // we clean up old values, and this transaction started at a timestamp before the garbage collection. - switch (getReadSentinelBehavior()) { - case IGNORE: - break; - case THROW_EXCEPTION: - if (!orphanedSentinels.contains(key)) { - throw new TransactionFailedRetriableException("Tried to read a value that has been " - + "deleted. This can be caused by hard delete transactions using the type " - + TransactionType.AGGRESSIVE_HARD_DELETE - + ". It can also be caused by transactions taking too long, or" - + " its locks expired. Retrying it should work."); - } - break; - default: - throw new IllegalStateException("Invalid read sentinel behavior " + getReadSentinelBehavior()); - } - } else { - long theirCommitTimestamp = - commitTimestamps.getIfAbsent(value.getTimestamp(), TransactionConstants.FAILED_COMMIT_TS); - if (theirCommitTimestamp == TransactionConstants.FAILED_COMMIT_TS) { - keysToReload.put(key, value.getTimestamp()); - if (shouldDeleteAndRollback()) { - // This is from a failed transaction so we can roll it back and then reload it. - keysToDelete.put(key, value.getTimestamp()); - getCounter(AtlasDbMetricNames.CellFilterMetrics.INVALID_COMMIT_TS, tableRef) - .inc(); - } - } else if (theirCommitTimestamp > getStartTimestamp()) { - // The value's commit timestamp is after our start timestamp. - // This means the value is from a transaction which committed - // after our transaction began. We need to try reading at an - // earlier timestamp. - keysToReload.put(key, value.getTimestamp()); - getCounter(AtlasDbMetricNames.CellFilterMetrics.COMMIT_TS_GREATER_THAN_TRANSACTION_TS, tableRef) - .inc(); - } else { - // The value has a commit timestamp less than our start timestamp, and is visible and valid. - if (value.getContents().length != 0) { - resultsCollector.add(Maps.immutableEntry(key, transformer.apply(value))); - keysAddedBuilder.add(key); - } - } - } - } - Set keysAddedToResults = keysAddedBuilder.build(); - - if (!keysToDelete.isEmpty()) { + AsyncTransactionService asyncTransactionService, + LongLongMap commitTimestamps, + int iterations) { + Map keysToDelete = EntryStream.of(rawResults) + .mapValues(Value::getTimestamp) + .filterKeyValue((key, ts) -> commitTimestamps.getIfAbsent(ts, TransactionConstants.FAILED_COMMIT_TS) + == TransactionConstants.FAILED_COMMIT_TS) + .toImmutableMap(); + Map keysToReload = EntryStream.of(rawResults) + .removeKeys(keysToDelete::containsKey) + .mapValues(Value::getTimestamp) + .filterKeyValue((key, ts) -> + // The value's commit timestamp is after our start timestamp. + // This means the value is from a transaction which committed + // after our transaction began. We need to try reading at an + // earlier timestamp. + commitTimestamps.get(ts) > getStartTimestamp()) + .toImmutableMap(); + getCounter(AtlasDbMetricNames.CellFilterMetrics.COMMIT_TS_GREATER_THAN_TRANSACTION_TS, tableRef) + .inc(keysToReload.size()); + + Map filteredResults = EntryStream.of(rawResults) + .removeKeys(keysToDelete::containsKey) + .removeKeys(keysToReload::containsKey) + .removeValues(value -> value.getContents().length == 0) + // The value has a commit timestamp less than our start timestamp, and is visible and valid. + .collect(MoreCollectors.entriesToCustomMap(LinkedHashMap::new)); + keysToReload.putAll(keysToDelete); + + EntryStream resultStream = EntryStream.of(filteredResults); + if (shouldDeleteAndRollback() + && !keysToDelete.isEmpty() + && !rollbackFailedTransactions(tableRef, keysToDelete, commitTimestamps, defaultTransactionService)) { + // These are from a failed transaction so we can roll it back and then reload it. + getCounter(AtlasDbMetricNames.CellFilterMetrics.INVALID_COMMIT_TS, tableRef) + .inc(keysToDelete.size()); // if we can't roll back the failed transactions, we should just try again - if (!rollbackFailedTransactions(tableRef, keysToDelete, commitTimestamps, defaultTransactionService)) { - return Futures.immediateFuture(getRemainingResults(rawResults, keysAddedToResults)); - } - } - - if (!keysToReload.isEmpty()) { - return Futures.transform( - asyncKeyValueService.getAsync(tableRef, keysToReload), - nextRawResults -> { - boolean allPossibleCellsReadAndPresent = nextRawResults.size() == keysToReload.size(); + return resultStream.append(getWithPostFilteringInternal( + tableRef, + Maps.difference(rawResults, filteredResults).entriesOnlyOnLeft(), + asyncKeyValueService, + asyncTransactionService, + iterations + 1)); + } + + Map keysToReloadOrDelete = ImmutableMap.builderWithExpectedSize( + keysToReload.size() + keysToDelete.size()) + .putAll(keysToReload) + .putAll(keysToDelete) + .buildOrThrow(); + if (!keysToReloadOrDelete.isEmpty()) { + return resultStream.append(streamFutureEntries(asyncKeyValueService + .getCompletableAsync(tableRef, keysToReloadOrDelete) + .thenApply(nextRawResults -> { validatePreCommitRequirementsOnReadIfNecessary( - tableRef, getStartTimestamp(), allPossibleCellsReadAndPresent); - return getRemainingResults(nextRawResults, keysAddedToResults); - }, - MoreExecutors.directExecutor()); + tableRef, getStartTimestamp(), nextRawResults.size() == keysToReloadOrDelete.size()); + return Maps.difference(rawResults, filteredResults).entriesOnlyOnLeft(); + }) + .thenApply(remainingRawResults -> getWithPostFilteringInternal( + tableRef, + remainingRawResults, + asyncKeyValueService, + asyncTransactionService, + iterations + 1)) + .thenApply(EntryStream::spliterator))); } - return Futures.immediateFuture(ImmutableMap.of()); - } - private Map getRemainingResults(Map rawResults, Set keysAddedToResults) { - Map remainingResults = new HashMap<>(rawResults); - remainingResults.keySet().removeAll(keysAddedToResults); - return remainingResults; + return resultStream; } /** @@ -2914,11 +2862,6 @@ private Counter getCounter(String name, TableReference tableRef) { return tableLevelMetricsController.createAndRegisterCounter(SnapshotTransaction.class, name, tableRef); } - private enum SentinelType { - DEFINITE_ORPHANED, - INDETERMINATE; - } - private final class SuccessCallbackManager { private final List callbacks = new CopyOnWriteArrayList<>(); @@ -2972,4 +2915,8 @@ private ListenableFuture scopeToTransaction(ListenableFuture transacti }, MoreExecutors.directExecutor()); } + + private static EntryStream streamFutureEntries(Future>> future) { + return EntryStream.of(new FutureSpliterator<>(future)); + } } From 743b02ae67e16b0c80ac439d11fc220edc550d2c Mon Sep 17 00:00:00 2001 From: Will Dey Date: Sat, 2 Mar 2024 22:59:22 -0500 Subject: [PATCH 02/10] @DoNotDelegate --- .../palantir/atlasdb/cell/api/TransactionKeyValueService.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/atlasdb-api/src/main/java/com/palantir/atlasdb/cell/api/TransactionKeyValueService.java b/atlasdb-api/src/main/java/com/palantir/atlasdb/cell/api/TransactionKeyValueService.java index 3a2c2935ddb..a3de259a92e 100644 --- a/atlasdb-api/src/main/java/com/palantir/atlasdb/cell/api/TransactionKeyValueService.java +++ b/atlasdb-api/src/main/java/com/palantir/atlasdb/cell/api/TransactionKeyValueService.java @@ -31,6 +31,7 @@ import com.palantir.atlasdb.keyvalue.api.Value; import com.palantir.common.base.ClosableIterator; import com.palantir.processors.AutoDelegate; +import com.palantir.processors.DoNotDelegate; import com.palantir.util.paging.TokenBackedBasicResultsPage; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -69,6 +70,7 @@ RowColumnRangeIterator getRowsColumnRange( ListenableFuture> getAsync(TableReference tableRef, Map timestampByCell); + @DoNotDelegate default CompletableFuture> getCompletableAsync( TableReference tableRef, Map timestampByCell) { return AtlasFutures.toCompletableFuture(getAsync(tableRef, timestampByCell)); From 09439e7b91aab57af7299e7a950d87728c9ab305 Mon Sep 17 00:00:00 2001 From: Will Dey Date: Sat, 2 Mar 2024 23:03:27 -0500 Subject: [PATCH 03/10] cleanup --- .../atlasdb/transaction/impl/SnapshotTransaction.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransaction.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransaction.java index 668fd17fa7f..ba0212bfc79 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransaction.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransaction.java @@ -1599,7 +1599,7 @@ private StreamEx findOrphanedSweepSentinelsInternal(Map timest entry -> entry.getValue() == Value.INVALID_VALUE_TIMESTAMP, MoreCollectors.entriesToMap())); Map definiteOrphans = sentinelTypeToTimestamps.getOrDefault(true, ImmutableMap.of()); StreamEx resultStream = StreamEx.ofKeys(definiteOrphans); - Optional.ofNullable(sentinelTypeToTimestamps.get(false)) + return Optional.ofNullable(sentinelTypeToTimestamps.get(false)) .map(cellsToQuery -> EntryStream.of(cellsToQuery) .removeValues(EntryStream.of(defaultTransactionService.get(cellsToQuery.values())) .nonNullValues() @@ -1607,8 +1607,8 @@ private StreamEx findOrphanedSweepSentinelsInternal(Map timest .toImmutableSet()::contains) .toImmutableMap()) .map(this::findOrphanedSweepSentinelsInternal) - .ifPresent(resultStream::append); - return resultStream; + .map(resultStream::append) + .orElse(resultStream); } private void deleteOrphanedSentinelsAsync(TableReference table, Set actualOrphanedSentinels) { @@ -1628,7 +1628,7 @@ private static boolean isSweepSentinel(Value value) { * This will return all the key-value pairs that still need to be postFiltered. It will output properly post * filtered keys to the {@code resultsCollector} output param. */ - private EntryStream getWithPostFilteringInternal( + private EntryStream getWithPostFilteringInternal( TableReference tableRef, Map rawResults, TransactionKeyValueService asyncKeyValueService, @@ -1676,7 +1676,7 @@ private EntryStream getWithPostFilteringInternal( MoreExecutors.directExecutor())); } - private EntryStream collectCellsToPostFilter( + private EntryStream collectCellsToPostFilter( TableReference tableRef, Map rawResults, TransactionKeyValueService asyncKeyValueService, From 784415873fb3b1a3e422ac2c9996f179b7eb5403 Mon Sep 17 00:00:00 2001 From: Will Dey Date: Sat, 2 Mar 2024 23:07:45 -0500 Subject: [PATCH 04/10] concision --- .../atlasdb/transaction/impl/SnapshotTransaction.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransaction.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransaction.java index ba0212bfc79..b89c78b1a0f 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransaction.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransaction.java @@ -1527,8 +1527,7 @@ private Map getWithPostFiltering( private Map getWithPostFiltering( TableReference tableRef, - Map rawResults, - Function transformer, + Map rawResults,f TransactionKeyValueService asyncKeyValueService, AsyncTransactionService asyncTransactionService) { long bytes = EntryStream.of(rawResults) @@ -1561,7 +1560,7 @@ private Map getWithPostFiltering( // hidden tables are used outside of the transaction protocol, and in general have invalid timestamps, // so do not apply post-filtering as post-filtering would rollback (actually delete) the data incorrectly // this case is hit when reading a hidden table from console - return EntryStream.of(rawResults).mapValues(transformer).toImmutableMap(); + return Maps.transformValues(rawResults, transformer::apply); } Map postFiltered = getWithPostFilteringInternal( From f2135a10e60083beeff1d4b8e2512a3a1ac410d0 Mon Sep 17 00:00:00 2001 From: Will Dey Date: Sat, 2 Mar 2024 23:10:22 -0500 Subject: [PATCH 05/10] fix test --- .../atlasdb/transaction/impl/SnapshotTransaction.java | 5 +++-- .../transaction/impl/GetRowsColumnRangeIteratorTest.java | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransaction.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransaction.java index b89c78b1a0f..3fac7ca7a97 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransaction.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransaction.java @@ -1527,7 +1527,8 @@ private Map getWithPostFiltering( private Map getWithPostFiltering( TableReference tableRef, - Map rawResults,f + Map rawResults, + Function transformer, TransactionKeyValueService asyncKeyValueService, AsyncTransactionService asyncTransactionService) { long bytes = EntryStream.of(rawResults) @@ -1566,7 +1567,7 @@ private Map getWithPostFiltering( Map postFiltered = getWithPostFilteringInternal( tableRef, rawResults, asyncKeyValueService, asyncTransactionService, 1) .mapValues(transformer) - .toImmutableMap(); + .toCustomMap(LinkedHashMap::new); getCounter(AtlasDbMetricNames.SNAPSHOT_TRANSACTION_CELLS_RETURNED, tableRef) .inc(postFiltered.size()); return postFiltered; diff --git a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/transaction/impl/GetRowsColumnRangeIteratorTest.java b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/transaction/impl/GetRowsColumnRangeIteratorTest.java index bb28da786dc..8632da8b0ae 100644 --- a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/transaction/impl/GetRowsColumnRangeIteratorTest.java +++ b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/transaction/impl/GetRowsColumnRangeIteratorTest.java @@ -149,6 +149,6 @@ private Iterator> createIteratorUnderTest(Runnable valid getInitialIterator(), COLUMN_RANGE_SELECTION, validationStep, - results -> Maps.transformValues(results, Value::getContents).entrySet()); + results -> Maps.transformValues(results, Value::getContents)); } } From 221eb11d6f8eecafdd14269e21828805192153bc Mon Sep 17 00:00:00 2001 From: Will Dey Date: Sat, 2 Mar 2024 23:45:10 -0500 Subject: [PATCH 06/10] deps --- .../main/java/com/palantir/atlasdb/futures/AtlasFutures.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/atlasdb-commons/src/main/java/com/palantir/atlasdb/futures/AtlasFutures.java b/atlasdb-commons/src/main/java/com/palantir/atlasdb/futures/AtlasFutures.java index 660581a6f95..3bb90625c20 100644 --- a/atlasdb-commons/src/main/java/com/palantir/atlasdb/futures/AtlasFutures.java +++ b/atlasdb-commons/src/main/java/com/palantir/atlasdb/futures/AtlasFutures.java @@ -33,7 +33,6 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import org.jetbrains.annotations.NotNull; public final class AtlasFutures { private AtlasFutures() {} @@ -169,7 +168,7 @@ public R get() throws InterruptedException, ExecutionException { } @Override - public R get(long timeout, @NotNull TimeUnit unit) + public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return completableFuture.get(timeout, unit); } From e5e454a2de0f1b72104674ee0d8252b67f2279d5 Mon Sep 17 00:00:00 2001 From: Will Dey Date: Sat, 2 Mar 2024 23:48:10 -0500 Subject: [PATCH 07/10] nonNullValues --- .../atlasdb/transaction/impl/SnapshotTransaction.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransaction.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransaction.java index 3fac7ca7a97..c46fb68f713 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransaction.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransaction.java @@ -998,8 +998,10 @@ ListenableFuture> getInternal( Map localWrites = Optional.ofNullable( localWriteBuffer.getLocalWrites().get(tableRef)) - .map(writes -> - StreamEx.of(cells).mapToEntry(writes::get).nonNull().toImmutableMap()) + .map(writes -> StreamEx.of(cells) + .mapToEntry(writes::get) + .nonNullValues() + .toImmutableMap()) .orElseGet(ImmutableMap::of); long numberOfNonDeleteLocalWrites = StreamEx.ofValues(localWrites).count(Predicates.not(PtBytes.EMPTY_BYTE_ARRAY::equals)); From 8eb187edc66d021ec7eaf307ea230e2626b0957a Mon Sep 17 00:00:00 2001 From: Will Dey Date: Sun, 3 Mar 2024 11:37:45 -0500 Subject: [PATCH 08/10] more cleanup --- .../atlasdb/futures/FutureSpliterator.java | 76 ------------------- .../transaction/impl/SnapshotTransaction.java | 41 ++++++++-- 2 files changed, 35 insertions(+), 82 deletions(-) delete mode 100644 atlasdb-commons/src/main/java/com/palantir/atlasdb/futures/FutureSpliterator.java diff --git a/atlasdb-commons/src/main/java/com/palantir/atlasdb/futures/FutureSpliterator.java b/atlasdb-commons/src/main/java/com/palantir/atlasdb/futures/FutureSpliterator.java deleted file mode 100644 index 49eb6a6f12a..00000000000 --- a/atlasdb-commons/src/main/java/com/palantir/atlasdb/futures/FutureSpliterator.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * (c) Copyright 2024 Palantir Technologies Inc. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.palantir.atlasdb.futures; - -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.MoreExecutors; -import java.util.Map; -import java.util.Set; -import java.util.Spliterator; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Future; -import java.util.function.Consumer; - -public class FutureSpliterator implements Spliterator { - private final Future> future; - - public FutureSpliterator(Future> future) { - this.future = future; - } - - @Override - public long estimateSize() { - if (future.isDone()) { - try { - return future.get().estimateSize(); - } catch (Exception e) { - // pass - } - } - return Long.MAX_VALUE; - } - - @Override - public int characteristics() { - return Spliterator.ORDERED; - } - - @Override - public boolean tryAdvance(Consumer consumer) { - return AtlasFutures.getUnchecked(future).tryAdvance(consumer); - } - - @Override - public void forEachRemaining(Consumer consumer) { - AtlasFutures.getUnchecked(future).forEachRemaining(consumer); - } - - @Override - public Spliterator trySplit() { - return null; - } - - public static FutureSpliterator> ofEntries(ListenableFuture> futureMap) { - return new FutureSpliterator<>( - Futures.transform(futureMap, map -> map.entrySet().spliterator(), MoreExecutors.directExecutor())); - } - - public static FutureSpliterator> ofEntries(CompletableFuture> futureMap) { - return new FutureSpliterator<>(futureMap.thenApply(Map::entrySet).thenApply(Set::spliterator)); - } -} diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransaction.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransaction.java index c46fb68f713..d88ee423580 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransaction.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransaction.java @@ -55,7 +55,6 @@ import com.palantir.atlasdb.debug.ConflictTracer; import com.palantir.atlasdb.encoding.PtBytes; import com.palantir.atlasdb.futures.AtlasFutures; -import com.palantir.atlasdb.futures.FutureSpliterator; import com.palantir.atlasdb.keyvalue.api.BatchColumnRangeSelection; import com.palantir.atlasdb.keyvalue.api.Cell; import com.palantir.atlasdb.keyvalue.api.ColumnRangeSelection; @@ -168,6 +167,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.NavigableMap; import java.util.NoSuchElementException; import java.util.Optional; @@ -187,6 +187,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.LongSupplier; import java.util.function.Supplier; @@ -1626,10 +1627,6 @@ private static boolean isSweepSentinel(Value value) { return value.getTimestamp() == Value.INVALID_VALUE_TIMESTAMP; } - /** - * This will return all the key-value pairs that still need to be postFiltered. It will output properly post - * filtered keys to the {@code resultsCollector} output param. - */ private EntryStream getWithPostFilteringInternal( TableReference tableRef, Map rawResults, @@ -2919,6 +2916,38 @@ private ListenableFuture scopeToTransaction(ListenableFuture transacti } private static EntryStream streamFutureEntries(Future>> future) { - return EntryStream.of(new FutureSpliterator<>(future)); + return EntryStream.of(new Spliterator<>() { + @Override + public boolean tryAdvance(Consumer> action) { + return AtlasFutures.getUnchecked(future).tryAdvance(action); + } + + @Override + public void forEachRemaining(Consumer> action) { + AtlasFutures.getUnchecked(future).forEachRemaining(action); + } + + @Override + public Spliterator> trySplit() { + return null; + } + + @Override + public long estimateSize() { + if (future.isDone()) { + try { + return future.get().estimateSize(); + } catch (Exception e) { + // pass + } + } + return Long.MAX_VALUE; + } + + @Override + public int characteristics() { + return Spliterator.IMMUTABLE + Spliterator.NONNULL + Spliterator.ORDERED; + } + }); } } From f32e045cddabe031cca5fc60b3d2858abd8f2343 Mon Sep 17 00:00:00 2001 From: Will Dey Date: Sun, 3 Mar 2024 11:49:45 -0500 Subject: [PATCH 09/10] update delete & rollback conditional --- .../transaction/impl/SnapshotTransaction.java | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransaction.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransaction.java index d88ee423580..991a23201c2 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransaction.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransaction.java @@ -1709,19 +1709,20 @@ private EntryStream collectCellsToPostFilter( keysToReload.putAll(keysToDelete); EntryStream resultStream = EntryStream.of(filteredResults); - if (shouldDeleteAndRollback() - && !keysToDelete.isEmpty() - && !rollbackFailedTransactions(tableRef, keysToDelete, commitTimestamps, defaultTransactionService)) { + if (shouldDeleteAndRollback()) { // These are from a failed transaction so we can roll it back and then reload it. getCounter(AtlasDbMetricNames.CellFilterMetrics.INVALID_COMMIT_TS, tableRef) .inc(keysToDelete.size()); - // if we can't roll back the failed transactions, we should just try again - return resultStream.append(getWithPostFilteringInternal( - tableRef, - Maps.difference(rawResults, filteredResults).entriesOnlyOnLeft(), - asyncKeyValueService, - asyncTransactionService, - iterations + 1)); + if (!keysToDelete.isEmpty() + && !rollbackFailedTransactions( + tableRef, keysToDelete, commitTimestamps, defaultTransactionService)) { + return resultStream.append(getWithPostFilteringInternal( + tableRef, + Maps.difference(rawResults, filteredResults).entriesOnlyOnLeft(), + asyncKeyValueService, + asyncTransactionService, + iterations + 1)); + } } Map keysToReloadOrDelete = ImmutableMap.builderWithExpectedSize( From bc561e72df4c86768edbd6e2cc539ee4f3fd6f14 Mon Sep 17 00:00:00 2001 From: Will Dey Date: Mon, 4 Mar 2024 11:53:27 -0500 Subject: [PATCH 10/10] unnecessary putAll --- .../palantir/atlasdb/transaction/impl/SnapshotTransaction.java | 1 - 1 file changed, 1 deletion(-) diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransaction.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransaction.java index 991a23201c2..7aa38b28507 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransaction.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransaction.java @@ -1706,7 +1706,6 @@ private EntryStream collectCellsToPostFilter( .removeValues(value -> value.getContents().length == 0) // The value has a commit timestamp less than our start timestamp, and is visible and valid. .collect(MoreCollectors.entriesToCustomMap(LinkedHashMap::new)); - keysToReload.putAll(keysToDelete); EntryStream resultStream = EntryStream.of(filteredResults); if (shouldDeleteAndRollback()) {