From e27ff0779dfefe8b1ae777be7e7f47f6202bc255 Mon Sep 17 00:00:00 2001 From: Jonathan Ellis Date: Fri, 20 Dec 2024 09:08:01 -0600 Subject: [PATCH 1/9] remove reranking and force rerankK=limit in diskann (sstable) searches. memtables unaffected --- .../cassandra/index/sai/disk/vector/CassandraDiskAnn.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/java/org/apache/cassandra/index/sai/disk/vector/CassandraDiskAnn.java b/src/java/org/apache/cassandra/index/sai/disk/vector/CassandraDiskAnn.java index c4243fffff2b..612058189e41 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/vector/CassandraDiskAnn.java +++ b/src/java/org/apache/cassandra/index/sai/disk/vector/CassandraDiskAnn.java @@ -227,6 +227,7 @@ public CloseableIterator search(VectorFloat queryVector, { VectorValidation.validateIndexable(queryVector, similarityFunction); + rerankK = limit; var graphAccessManager = searchers.get(); var searcher = graphAccessManager.get(); try @@ -236,11 +237,11 @@ public CloseableIterator search(VectorFloat queryVector, if (features.contains(FeatureId.FUSED_ADC)) { var asf = view.approximateScoreFunctionFor(queryVector, similarityFunction); - var rr = view.rerankerFor(queryVector, similarityFunction); - ssp = new SearchScoreProvider(asf, rr); + ssp = new SearchScoreProvider(asf, null); } else if (compressedVectors == null) { + // no PQ, search with full-res vectors from disk ssp = new SearchScoreProvider(view.rerankerFor(queryVector, similarityFunction)); } else @@ -251,8 +252,7 @@ else if (compressedVectors == null) ? VectorSimilarityFunction.COSINE : similarityFunction; var asf = compressedVectors.precomputedScoreFunctionFor(queryVector, sf); - var rr = view.rerankerFor(queryVector, similarityFunction); - ssp = new SearchScoreProvider(asf, rr); + ssp = new SearchScoreProvider(asf, null); } var result = searcher.search(ssp, limit, rerankK, threshold, context.getAnnRerankFloor(), ordinalsMap.ignoringDeleted(acceptBits)); if (V3OnDiskFormat.ENABLE_RERANK_FLOOR) From ccb7316fb983b5854ce1c49b4daa86f14050eb95 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Fri, 27 Dec 2024 23:28:12 +0100 Subject: [PATCH 2/9] Cache USE_DSE_COMPATIBLE_HISTOGRAM_BOUNDARIES to save cpu while updating metrics --- .../metrics/DecayingEstimatedHistogramReservoir.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/java/org/apache/cassandra/metrics/DecayingEstimatedHistogramReservoir.java b/src/java/org/apache/cassandra/metrics/DecayingEstimatedHistogramReservoir.java index caa187c6823f..955a2f5ea590 100644 --- a/src/java/org/apache/cassandra/metrics/DecayingEstimatedHistogramReservoir.java +++ b/src/java/org/apache/cassandra/metrics/DecayingEstimatedHistogramReservoir.java @@ -117,6 +117,7 @@ public class DecayingEstimatedHistogramReservoir implements Reservoir final static int subBucketHalfCountMagnitude = subBucketCountMagnitude - 1; // power of two of half the number of sub-buckets final static long subBucketMask = (long)(subBucketCount - 1) << unitMagnitude; final static int leadingZeroCountBase = 64 - unitMagnitude - subBucketHalfCountMagnitude - 1; + private static final boolean USE_DSE_COMPATIBLE_HISTOGRAM_BOUNDARIES = CassandraRelevantProperties.USE_DSE_COMPATIBLE_HISTOGRAM_BOUNDARIES.getBoolean(); // DSE COMPATIBILITY CHANGES END private static float[] computeTable(int bits) @@ -217,7 +218,7 @@ public DecayingEstimatedHistogramReservoir(Clock clock) if (bucketCount == DEFAULT_BUCKET_COUNT) { - if (CassandraRelevantProperties.USE_DSE_COMPATIBLE_HISTOGRAM_BOUNDARIES.getBoolean()) + if (USE_DSE_COMPATIBLE_HISTOGRAM_BOUNDARIES) bucketOffsets = considerZeroes ? DEFAULT_DSE_WITH_ZERO_BUCKET_OFFSETS : DEFAULT_DSE_WITHOUT_ZERO_BUCKET_OFFSETS; else bucketOffsets = considerZeroes ? DEFAULT_WITH_ZERO_BUCKET_OFFSETS : DEFAULT_WITHOUT_ZERO_BUCKET_OFFSETS; @@ -273,7 +274,7 @@ public int stripedIndex(int offsetIndex, int stripe) @VisibleForTesting public static int findIndex(long[] bucketOffsets, long value) { - if (CassandraRelevantProperties.USE_DSE_COMPATIBLE_HISTOGRAM_BOUNDARIES.getBoolean()) + if (USE_DSE_COMPATIBLE_HISTOGRAM_BOUNDARIES) return findIndexDse(bucketOffsets, value); // values below zero are nonsense, but we have never failed when presented them From b7f6789d74f71ab6baa2bdc8500f1e124e3b3be1 Mon Sep 17 00:00:00 2001 From: Branimir Lambov Date: Mon, 30 Dec 2024 13:30:11 +0200 Subject: [PATCH 3/9] Improve typed reads in RandomAccessReader --- .../cassandra/io/util/RandomAccessReader.java | 207 +++++++----------- 1 file changed, 80 insertions(+), 127 deletions(-) diff --git a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java index c105a4e7bcdb..07679eb1c989 100644 --- a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java +++ b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java @@ -93,165 +93,118 @@ public ByteOrder order() @Override public void read(float[] dest, int offset, int count) throws IOException { - var copied = 0; - while (copied < count) + for (int inBuffer = buffer.remaining() / Float.BYTES; + inBuffer < count; + inBuffer = buffer.remaining() / Float.BYTES) { - var bh = bufferHolder; - long position = getPosition(); - - FloatBuffer floatBuffer; - if (bh.offset() == 0 && position % Float.BYTES == 0 && bh.order() == order) - { - // this is a separate code path because buffer() and asFloatBuffer() both allocate - // new and relatively expensive xBuffer objects, so we want to avoid doing that - // twice, where possible. If the BufferHandler has a different underlying - // byte order, we duplicate first because there is not yet a way to configure - // the buffer handler to use the correct byte order. - floatBuffer = bh.floatBuffer(); - floatBuffer.position(Ints.checkedCast(position / Float.BYTES)); - } - else + if (inBuffer >= 1) { - // bufferHolder offset is non-zero, and probably not aligned to Float.BYTES, so - // set the position before converting to FloatBuffer. - var bb = bh.buffer(); - bb.order(order); - bb.position(Ints.checkedCast(position - bh.offset())); - floatBuffer = bb.asFloatBuffer(); + // read as much as we can from the buffer + readFloats(buffer, order, dest, offset, inBuffer); + offset += inBuffer; + count -= inBuffer; } - var remaining = floatBuffer.remaining(); - if (remaining == 0) + if (buffer.remaining() > 0) { - // slow path -- the next float bytes are across a buffer boundary (we never start a loop iteration with - // the "current" buffer fully exhausted, so `remaining == 0` truly means "some bytes remains, but not - // enough for a float"), so we read that float individually (which will read it byte by byte, - // reBuffering as needed). After that we loop, which will switch back to the faster path for any - // remaining floats in the newly reloaded buffer. - dest[offset + copied] = readFloat(); - seek(position + Float.BYTES); - copied++; + // read the buffer-spanning value using the slow path + dest[offset++] = readFloat(); + --count; } else - { - var elementsToRead = Math.min(remaining, count - copied); - floatBuffer.get(dest, offset + copied, elementsToRead); - seek(position + ((long) elementsToRead * Float.BYTES)); - copied += elementsToRead; - } + reBuffer(); } + + readFloats(buffer, order, dest, offset, count); } @Override - public void readFully(long[] dest) throws IOException { - int copied = 0; - while (copied < dest.length) - { - var bh = bufferHolder; - long position = getPosition(); + public void readFully(long[] dest) throws IOException + { + read(dest, 0, dest.length); + } - LongBuffer longBuffer; - if (bh.offset() == 0 && position % Long.BYTES == 0 && bh.order() == order) - { - // this is a separate code path because buffer() and asLongBuffer() both allocate - // new and relatively expensive xBuffer objects, so we want to avoid doing that - // twice, where possible. If the BufferHandler has a different underlying - // byte order, we duplicate first because there is not yet a way to configure - // the buffer handler to use the correct byte order. - longBuffer = bh.longBuffer(); - longBuffer.position(Ints.checkedCast(position / Long.BYTES)); - } - else + public void read(long[] dest, int offset, int count) throws IOException + { + for (int inBuffer = buffer.remaining() / Long.BYTES; + inBuffer < count; + inBuffer = buffer.remaining() / Long.BYTES) + { + if (inBuffer >= 1) { - // offset is non-zero, and probably not aligned to Long.BYTES, so - // set the position before converting to LongBuffer. - var bb = bh.buffer(); - bb.order(order); - bb.position(Ints.checkedCast(position - bh.offset())); - longBuffer = bb.asLongBuffer(); + // read as much as we can from the buffer + readLongs(buffer, order, dest, offset, inBuffer); + offset += inBuffer; + count -= inBuffer; } - var remaining = longBuffer.remaining(); - if (remaining == 0) + if (buffer.remaining() > 0) { - // slow path -- the next long bytes are across a buffer boundary (we never start a loop iteration with - // the "current" buffer fully exhausted, so `remaining == 0` truly means "some bytes remains, but not - // enough for a long"), so we read that long individually (which will read it byte by byte, - // reBuffering as needed). After that we loop, which will switch back to the faster path for any - // remaining longs in the newly reloaded buffer. - dest[copied] = readLong(); - seek(position + Long.BYTES); - copied++; + // read the buffer-spanning value using the slow path + dest[offset++] = readLong(); + --count; } else - { - var elementsToRead = Math.min(remaining, dest.length - copied); - longBuffer.get(dest, copied, elementsToRead); - seek(position + ((long) elementsToRead * Long.BYTES)); - copied += elementsToRead; - } + reBuffer(); } + + readLongs(buffer, order, dest, offset, count); } - /** - * Read ints into an int[], starting at the current position. - * - * @param dest the array to read into - * @param offset the offset in the array at which to start writing ints - * @param count the number of ints to read - * - * Will change the buffer position. - */ @Override public void read(int[] dest, int offset, int count) throws IOException { - int copied = 0; - while (copied < count) + for (int inBuffer = buffer.remaining() / Integer.BYTES; + inBuffer < count; + inBuffer = buffer.remaining() / Integer.BYTES) { - var bh = bufferHolder; - long position = getPosition(); - - IntBuffer intBuffer; - if (bh.offset() == 0 && position % Integer.BYTES == 0 && bh.order() == order) - { - // this is a separate code path because buffer() and asIntBuffer() both allocate - // new and relatively expensive xBuffer objects, so we want to avoid doing that - // twice, where possible. If the BufferHandler has a different underlying - // byte order, we duplicate first because there is not yet a way to configure - // the buffer handler to use the correct byte order. - intBuffer = bh.intBuffer(); - intBuffer.position(Ints.checkedCast(position / Integer.BYTES)); - } - else + if (inBuffer >= 1) { - // offset is non-zero, and probably not aligned to Integer.BYTES, so - // set the position before converting to IntBuffer. - var bb = bh.buffer(); - bb.order(order); - bb.position(Ints.checkedCast(position - bh.offset())); - intBuffer = bb.asIntBuffer(); + // read as much as we can from the buffer + readInts(buffer, order, dest, offset, inBuffer); + offset += inBuffer; + count -= inBuffer; } - var remaining = intBuffer.remaining(); - if (remaining == 0) + if (buffer.remaining() > 0) { - // slow path -- the next int bytes are across a buffer boundary (we never start a loop iteration with - // the "current" buffer fully exhausted, so `remaining == 0` truly means "some bytes remains, but not - // enough for an int"), so we read that int individually (which will read it byte by byte, - // reBuffering as needed). After that we loop, which will switch back to the faster path for any - // remaining ints in the newly reloaded buffer. - dest[offset + copied] = readInt(); - seek(position + Integer.BYTES); - copied++; + // read the buffer-spanning value using the slow path + dest[offset++] = readInt(); + --count; } else - { - var elementsToRead = Math.min(remaining, count - copied); - intBuffer.get(dest, offset + copied, elementsToRead); - seek(position + ((long) elementsToRead * Integer.BYTES)); - copied += elementsToRead; - } + reBuffer(); } + + readInts(buffer, order, dest, offset, count); + } + + private static void readFloats(ByteBuffer buffer, ByteOrder order, float[] dest, int offset, int count) + { + FloatBuffer floatBuffer = updateBufferByteOrderIfNeeded(buffer, order).asFloatBuffer(); + floatBuffer.get(dest, offset, count); + buffer.position(buffer.position() + count * Float.BYTES); + } + + private static void readLongs(ByteBuffer buffer, ByteOrder order, long[] dest, int offset, int count) + { + LongBuffer longBuffer = updateBufferByteOrderIfNeeded(buffer, order).asLongBuffer(); + longBuffer.get(dest, offset, count); + buffer.position(buffer.position() + count * Long.BYTES); + } + + private static void readInts(ByteBuffer buffer, ByteOrder order, int[] dest, int offset, int count) + { + IntBuffer intBuffer = updateBufferByteOrderIfNeeded(buffer, order).asIntBuffer(); + intBuffer.get(dest, offset, count); + buffer.position(buffer.position() + count * Integer.BYTES); + } + + private static ByteBuffer updateBufferByteOrderIfNeeded(ByteBuffer buffer, ByteOrder order) + { + return buffer.order() != order + ? buffer.duplicate().order(order) + : buffer; // Note: ?: rather than if to hit one-liner inlining path } @Override From 83263e4078915157ea5e40cfc97f9ef164c0028b Mon Sep 17 00:00:00 2001 From: Jonathan Ellis Date: Mon, 30 Dec 2024 08:52:57 -0600 Subject: [PATCH 4/9] remove reranking from orderByBruteForce --- .../sai/disk/v2/V2VectorIndexSearcher.java | 27 ++++++++++++++++--- .../disk/vector/BruteForceRowIdIterator.java | 6 ++--- 2 files changed, 27 insertions(+), 6 deletions(-) diff --git a/src/java/org/apache/cassandra/index/sai/disk/v2/V2VectorIndexSearcher.java b/src/java/org/apache/cassandra/index/sai/disk/v2/V2VectorIndexSearcher.java index 62748e3332c4..f2f6dba96ce8 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v2/V2VectorIndexSearcher.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v2/V2VectorIndexSearcher.java @@ -21,7 +21,9 @@ import java.lang.invoke.MethodHandles; import java.util.ArrayList; import java.util.Collections; +import java.util.Iterator; import java.util.List; +import java.util.NoSuchElementException; import java.util.function.Consumer; import com.google.common.annotations.VisibleForTesting; @@ -291,7 +293,7 @@ private CloseableIterator orderByBruteForce(CompressedVectors cv VectorFloat queryVector, IntIntPairArray segmentOrdinalPairs, int limit, - int rerankK) throws IOException + int rerankK) { var approximateScores = new SortingIterator.Builder(segmentOrdinalPairs.size()); var similarityFunction = indexContext.getIndexWriterConfig().getSimilarityFunction(); @@ -302,8 +304,27 @@ private CloseableIterator orderByBruteForce(CompressedVectors cv approximateScores.add(new BruteForceRowIdIterator.RowWithApproximateScore(segmentRowId, ordinal, score)); }); var approximateScoresQueue = approximateScores.build(BruteForceRowIdIterator.RowWithApproximateScore::compare); - var reranker = new CloseableReranker(similarityFunction, queryVector, graph.getView()); - return new BruteForceRowIdIterator(approximateScoresQueue, reranker, limit, rerankK); + var transformed = new Iterator() { + int consumed = 0; + + @Override + public boolean hasNext() + { + if (consumed++ >= limit) + return false; + return approximateScoresQueue.hasNext(); + } + + @Override + public RowIdWithScore next() + { + if (!hasNext()) + throw new NoSuchElementException(); + var approximated = approximateScoresQueue.next(); + return new RowIdWithScore(approximated.rowId, approximated.appoximateScore); + } + }; + return CloseableIterator.wrap(transformed); } /** diff --git a/src/java/org/apache/cassandra/index/sai/disk/vector/BruteForceRowIdIterator.java b/src/java/org/apache/cassandra/index/sai/disk/vector/BruteForceRowIdIterator.java index 1edf0ebed587..3b364404ef8c 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/vector/BruteForceRowIdIterator.java +++ b/src/java/org/apache/cassandra/index/sai/disk/vector/BruteForceRowIdIterator.java @@ -51,9 +51,9 @@ public class BruteForceRowIdIterator extends AbstractIterator { public static class RowWithApproximateScore { - private final int rowId; - private final int ordinal; - private final float appoximateScore; + public final int rowId; + public final int ordinal; + public final float appoximateScore; public RowWithApproximateScore(int rowId, int ordinal, float appoximateScore) { From 53910c0a98d540db9407c1d09871775110e71535 Mon Sep 17 00:00:00 2001 From: Joel Knighton Date: Mon, 30 Dec 2024 13:41:34 -0600 Subject: [PATCH 5/9] Fix wrapping non-reranked brute force to increment in `next` rather than `hasNext`, so that `hasNext` isn't side-effecting --- .../cassandra/index/sai/disk/v2/V2VectorIndexSearcher.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/java/org/apache/cassandra/index/sai/disk/v2/V2VectorIndexSearcher.java b/src/java/org/apache/cassandra/index/sai/disk/v2/V2VectorIndexSearcher.java index f2f6dba96ce8..7394db7edf0f 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v2/V2VectorIndexSearcher.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v2/V2VectorIndexSearcher.java @@ -310,7 +310,7 @@ private CloseableIterator orderByBruteForce(CompressedVectors cv @Override public boolean hasNext() { - if (consumed++ >= limit) + if (consumed >= limit) return false; return approximateScoresQueue.hasNext(); } @@ -320,6 +320,7 @@ public RowIdWithScore next() { if (!hasNext()) throw new NoSuchElementException(); + consumed++; var approximated = approximateScoresQueue.next(); return new RowIdWithScore(approximated.rowId, approximated.appoximateScore); } From aaf3d8ae0d9a51e27c3aacb2898f53c3f5954414 Mon Sep 17 00:00:00 2001 From: Sylvain Lebresne Date: Fri, 3 Jan 2025 14:45:22 +0100 Subject: [PATCH 6/9] Experimental trivial vectorized reads implementation for RAR --- build.xml | 2 +- .../apache/cassandra/cache/ChunkCache.java | 6 + .../config/CassandraRelevantProperties.java | 8 +- .../apache/cassandra/io/util/ChunkReader.java | 5 + .../io/util/CompressedChunkReader.java | 6 + .../cassandra/io/util/RandomAccessReader.java | 113 +++++++++++++++ .../apache/cassandra/io/util/Rebufferer.java | 5 + .../cassandra/io/util/SimpleChunkReader.java | 6 + .../io/util/RandomAccessReaderTest.java | 130 ++++++++++++++++++ 9 files changed, 279 insertions(+), 2 deletions(-) diff --git a/build.xml b/build.xml index bbee9df83fc0..bbdf17b5ab01 100644 --- a/build.xml +++ b/build.xml @@ -717,7 +717,7 @@ - + diff --git a/src/java/org/apache/cassandra/cache/ChunkCache.java b/src/java/org/apache/cassandra/cache/ChunkCache.java index 308f709c2352..46fed8589fdd 100644 --- a/src/java/org/apache/cassandra/cache/ChunkCache.java +++ b/src/java/org/apache/cassandra/cache/ChunkCache.java @@ -385,6 +385,12 @@ public CachingRebufferer(ChunkReader file) alignmentMask = -chunkSize; } + @Override + public boolean supportsConcurrentRebuffer() + { + return source.supportsReadingChunksConcurrently(); + } + @Override public BufferHolder rebuffer(long position) { diff --git a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java index 955429aa408b..266de7383de2 100644 --- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java +++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java @@ -568,7 +568,13 @@ public enum CassandraRelevantProperties * Do not wait for gossip to be enabled before starting stabilisation period. This is required especially for tests * which do not enable gossip at all. */ - CLUSTER_VERSION_PROVIDER_SKIP_WAIT_FOR_GOSSIP("cassandra.test.cluster_version_provider.skip_wait_for_gossip"); + CLUSTER_VERSION_PROVIDER_SKIP_WAIT_FOR_GOSSIP("cassandra.test.cluster_version_provider.skip_wait_for_gossip"), + + + /** + * (Experimental) Thread pool size for RandomAccessReader "vectored" reads. + */ + RAR_VECTORED_READS_THREAD_POOL_SIZE("cassandra.rar.vectored_reads_thread_pool_size"); CassandraRelevantProperties(String key, String defaultVal) { diff --git a/src/java/org/apache/cassandra/io/util/ChunkReader.java b/src/java/org/apache/cassandra/io/util/ChunkReader.java index a0be78876995..01b805bf607d 100644 --- a/src/java/org/apache/cassandra/io/util/ChunkReader.java +++ b/src/java/org/apache/cassandra/io/util/ChunkReader.java @@ -42,6 +42,11 @@ public interface ChunkReader extends RebuffererFactory */ void readChunk(long position, ByteBuffer buffer); + default boolean supportsReadingChunksConcurrently() + { + return false; + } + /** * Buffer size required for this rebufferer. Must be power of 2 if alignment is required. */ diff --git a/src/java/org/apache/cassandra/io/util/CompressedChunkReader.java b/src/java/org/apache/cassandra/io/util/CompressedChunkReader.java index b46d4a25153c..cfdb87a981c1 100644 --- a/src/java/org/apache/cassandra/io/util/CompressedChunkReader.java +++ b/src/java/org/apache/cassandra/io/util/CompressedChunkReader.java @@ -186,6 +186,12 @@ public void readChunk(long position, ByteBuffer uncompressed) } } + @Override + public boolean supportsReadingChunksConcurrently() + { + return true; + } + @Override public void invalidateIfCached(long position) { diff --git a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java index 07679eb1c989..64cb4d29409f 100644 --- a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java +++ b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java @@ -23,16 +23,31 @@ import java.nio.FloatBuffer; import java.nio.IntBuffer; import java.nio.LongBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.concurrent.NotThreadSafe; import com.google.common.primitives.Ints; +import io.netty.util.concurrent.FastThreadLocal; +import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor; +import org.apache.cassandra.concurrent.NamedThreadFactory; +import org.apache.cassandra.config.CassandraRelevantProperties; import org.apache.cassandra.io.compress.BufferType; import org.apache.cassandra.io.util.Rebufferer.BufferHolder; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.Throwables; @NotThreadSafe public class RandomAccessReader extends RebufferingInputStream implements FileDataInput, io.github.jbellis.jvector.disk.RandomAccessReader { + private static final int VECTORED_READS_POOL_SIZE = CassandraRelevantProperties.RAR_VECTORED_READS_THREAD_POOL_SIZE.getInt(FBUtilities.getAvailableProcessors() / 2); + // The default buffer size when the client doesn't specify it public static final int DEFAULT_BUFFER_SIZE = 4096; @@ -43,6 +58,8 @@ public class RandomAccessReader extends RebufferingInputStream implements FileDa private final ByteOrder order; private BufferHolder bufferHolder; + private VectoredReadsPool vectoredReadsPool; + /** * Only created through Builder * @@ -179,6 +196,55 @@ public void read(int[] dest, int offset, int count) throws IOException readInts(buffer, order, dest, offset, count); } + @Override + public void read(int[][] ints, long[] positions) throws IOException + { + if (!rebufferer.supportsConcurrentRebuffer()) + { + io.github.jbellis.jvector.disk.RandomAccessReader.super.read(ints, positions); + return; + } + + if (ints.length != positions.length) + throw new IllegalArgumentException(String.format("ints.length %d != positions.length %d", ints.length, positions.length)); + + if (ints.length == 0) + return; + + maybeInitVectoredReadsPool(); + List> futures = new ArrayList<>(ints.length - 1); + for (int i = 0; i < ints.length - 1; i++) + { + if (ints[i].length == 0) + continue; + + futures.add(vectoredReadsPool.readAsync(ints[i], positions[i])); + } + + // Read last array in the current thread both because "why not" and also so this RAR is set "as if" the read + // was done synchronously. + seek(positions[ints.length - 1]); + read(ints[ints.length - 1], 0, ints[ints.length - 1].length); + + for (CompletableFuture future : futures) + { + try + { + future.get(); + } + catch (Exception e) + { + throw Throwables.cleaned(e); + } + } + } + + private void maybeInitVectoredReadsPool() + { + if (vectoredReadsPool == null) + vectoredReadsPool = new VectoredReadsPool(VECTORED_READS_POOL_SIZE, rebufferer, order); + } + private static void readFloats(ByteBuffer buffer, ByteOrder order, float[] dest, int offset, int count) { FloatBuffer floatBuffer = updateBufferByteOrderIfNeeded(buffer, order).asFloatBuffer(); @@ -478,4 +544,51 @@ public static RandomAccessReader open(File file) } } + private static class VectoredReadsPool + { + private final ThreadPoolExecutor executor; + private final FastThreadLocal perThreadReaders; + protected static final AtomicInteger id = new AtomicInteger(1); + + VectoredReadsPool(int size, Rebufferer rebufferer, ByteOrder order) + { + this.executor = new DebuggableThreadPoolExecutor(size, Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new NamedThreadFactory("rar-vectored-reads-" + id.getAndIncrement())); + this.perThreadReaders = new FastThreadLocal<>() + { + @Override + protected RandomAccessReader initialValue() + { + return new RandomAccessReader(rebufferer, order, Rebufferer.EMPTY); + } + + @Override + protected void onRemoval(RandomAccessReader reader) + { + reader.close(); + } + }; + } + + CompletableFuture readAsync(int[] ints, long position) + { + return CompletableFuture.runAsync(() -> { + try + { + RandomAccessReader reader = perThreadReaders.get(); + reader.seek(position); + reader.read(ints, 0, ints.length); + } + catch (Exception e) + { + throw Throwables.cleaned(e); + } + }, executor); + } + + void close() + { + executor.shutdown(); + } + } + } diff --git a/src/java/org/apache/cassandra/io/util/Rebufferer.java b/src/java/org/apache/cassandra/io/util/Rebufferer.java index a1f894676964..4513e295ef76 100644 --- a/src/java/org/apache/cassandra/io/util/Rebufferer.java +++ b/src/java/org/apache/cassandra/io/util/Rebufferer.java @@ -37,6 +37,11 @@ public interface Rebufferer extends ReaderFileProxy */ BufferHolder rebuffer(long position); + default boolean supportsConcurrentRebuffer() + { + return false; + } + /** * Called when a reader is closed. Should clean up reader-specific data. */ diff --git a/src/java/org/apache/cassandra/io/util/SimpleChunkReader.java b/src/java/org/apache/cassandra/io/util/SimpleChunkReader.java index 810d6c653369..1414ee740cda 100644 --- a/src/java/org/apache/cassandra/io/util/SimpleChunkReader.java +++ b/src/java/org/apache/cassandra/io/util/SimpleChunkReader.java @@ -49,6 +49,12 @@ public void readChunk(long position, ByteBuffer buffer) buffer.flip(); } + @Override + public boolean supportsReadingChunksConcurrently() + { + return true; + } + @Override public int chunkSize() { diff --git a/test/unit/org/apache/cassandra/io/util/RandomAccessReaderTest.java b/test/unit/org/apache/cassandra/io/util/RandomAccessReaderTest.java index 26b086b3a803..5d9ba65c1efd 100644 --- a/test/unit/org/apache/cassandra/io/util/RandomAccessReaderTest.java +++ b/test/unit/org/apache/cassandra/io/util/RandomAccessReaderTest.java @@ -37,6 +37,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.function.Function; +import java.util.stream.Collectors; import org.junit.Assert; import org.junit.BeforeClass; @@ -47,6 +48,7 @@ import static org.junit.Assert.*; +import org.apache.cassandra.cache.ChunkCache; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.io.compress.BufferType; import org.apache.cassandra.utils.ByteBufferUtil; @@ -508,6 +510,134 @@ private void testReadFullyIntArray(int shift, ByteOrder order) throws IOExceptio } } + // read vectored array tests - ints + + private static final class VectoredIntReadArrayCase + { + private final int[][] expected; + private final long totalBytes; + + static int counter; + + VectoredIntReadArrayCase(int... numElements) + { + this.expected = new int[numElements.length][]; + long bytes = 0; + for (int i = 0; i < numElements.length; i++) + { + int num = numElements[i]; + bytes += ((long)num) * Integer.BYTES; + this.expected[i] = new int[num]; + for (int j = 0; j < num; j++) + { + this.expected[i][j] = counter++; + } + } + this.totalBytes = bytes; + } + + int[][] createSizedArray() + { + int[][] arr = new int[expected.length][]; + for (int i = 0; i < expected.length; i++) + arr[i] = new int[expected[i].length]; + return arr; + } + + long[] computePositions(long initial) + { + long[] positions = new long[expected.length]; + positions[0] = initial; + for (int i = 1; i < expected.length; i++) + positions[i] = positions[i - 1] + ((long)expected[i - 1].length) * Integer.BYTES; + return positions; + } + + @Override + public String toString() + { + return Arrays.stream(expected).map(arr -> Integer.toString(arr.length)).collect(Collectors.joining(", ", "[", "]")); + } + } + + @Test + public void testVectoredIntArray() throws IOException + { + testVectoredReadIntArray(ByteOrder.BIG_ENDIAN); + testVectoredReadIntArray(ByteOrder.LITTLE_ENDIAN); + } + + private void testVectoredReadIntArray(ByteOrder order) throws IOException + { + // Note: using a small buffer would not work; the code in FileHandle enforce at least 4096 in the case + // we care about (simpl chunk reader), so reflecting this here. + int bufferSize = 4096; + + List cases = new ArrayList<>(); + cases.add(new VectoredIntReadArrayCase(0, 0, 0)); + cases.add(new VectoredIntReadArrayCase(10, 0, 10)); + cases.add(new VectoredIntReadArrayCase(17, 100, 4, 12)); + cases.add(new VectoredIntReadArrayCase(100, 100, 100)); + cases.add(new VectoredIntReadArrayCase(121)); + cases.add(new VectoredIntReadArrayCase(10000, 1)); + cases.add(new VectoredIntReadArrayCase(1000, 20000, 10000, 30000)); + + File file = writeFile(writer -> { + try + { + writer.order(order); + for (VectoredIntReadArrayCase testCase : cases) + { + for (int[] array : testCase.expected) + for (int f : array) + writer.writeInt(f); + } + return false; + } + catch (IOException e) + { + throw new RuntimeException(e); + } + }); + + try (ChannelProxy channel = new ChannelProxy(file); + FileHandle.Builder builder = new FileHandle.Builder(channel) + .order(order) + .bufferType(BufferType.OFF_HEAP) + .mmapped(false) + .withChunkCache(ChunkCache.instance) + .bufferSize(bufferSize); + FileHandle fh = builder.complete(); + RandomAccessReader reader = fh.createReader()) + { + assertEquals(channel.size(), reader.length()); + assertEquals(channel.size(), reader.bytesRemaining()); + assertEquals(file.length(), reader.available()); + + // Running twice: first run will mostly read from the file; the 2nd run will use the chunk cache + doTestVectoredReadIntArray(reader, cases); + doTestVectoredReadIntArray(reader, cases); + } + } + + private void doTestVectoredReadIntArray(RandomAccessReader reader, List cases) throws IOException + { + long position = 0; + for (VectoredIntReadArrayCase testCase : cases) + { + int[][] readArray = testCase.createSizedArray(); + long[] positions = testCase.computePositions(position); + reader.read(readArray, positions); + + assertArrayEquals(testCase.expected, readArray); + + position += testCase.totalBytes; + } + + assertTrue(reader.isEOF()); + assertEquals(0, reader.bytesRemaining()); + } + /** A fake file channel that simply increments the position and doesn't * actually read anything. We use it to simulate very large files, > 2G. */ From f81933473114dab41f79ca2d02cf0fd0aba98b57 Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Mon, 6 Jan 2025 16:30:14 -0600 Subject: [PATCH 7/9] CNDB-12304: Parallelize ORDER BY row materialization/validation (cherry picked from commit fb43993bdc5b024375ce871e66708b858ce721a6) --- .../cassandra/db/ColumnFamilyStore.java | 28 +++ .../db/SinglePartitionReadCommand.java | 6 +- .../cassandra/index/sai/QueryContext.java | 7 +- .../cassandra/index/sai/plan/QueryView.java | 7 +- .../plan/StorageAttachedIndexSearcher.java | 234 ++++++++++++------ .../index/sai/plan/TopKProcessor.java | 111 ++------- 6 files changed, 221 insertions(+), 172 deletions(-) diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index f9618c45e582..c4096664b832 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -3242,6 +3242,34 @@ public ViewFragment(List sstables, Iterable memtables) this.sstables = sstables; this.memtables = memtables; } + + public void sortSSTablesByMaxTimestampDescending() + { + sstables.sort(SSTableReader.maxTimestampDescending); + } + } + + // A RefViewFragment that is sorted by max timestamp descending, which makes this object safe to reuse and to + // share across threads. + public static class SortedRefViewFragment extends RefViewFragment + { + private SortedRefViewFragment(RefViewFragment view) + { + // Copy sstable list to an immutable list to ensure there is not a future change that modifies the list + super(List.copyOf(view.sstables), view.memtables, view.refs); + } + + // sstables are expected to be pre-sorted + @Override + public void sortSSTablesByMaxTimestampDescending() + { + } + + public static SortedRefViewFragment sortThenCreateFrom(RefViewFragment view) + { + view.sortSSTablesByMaxTimestampDescending(); + return new SortedRefViewFragment(view); + } } public static class RefViewFragment extends ViewFragment implements AutoCloseable diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java index 4b6663f7a294..df24f68904de 100644 --- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java @@ -632,7 +632,7 @@ private UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs if (Tracing.traceSinglePartitions()) Tracing.trace("Acquiring sstable references"); - view.sstables.sort(SSTableReader.maxTimestampDescending); + view.sortSSTablesByMaxTimestampDescending(); ClusteringIndexFilter filter = clusteringIndexFilter(); long minTimestamp = Long.MAX_VALUE; long mostRecentPartitionTombstone = Long.MIN_VALUE; @@ -673,7 +673,7 @@ private UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs * In other words, iterating in descending maxTimestamp order allow to do our mostRecentPartitionTombstone * elimination in one pass, and minimize the number of sstables for which we read a partition tombstone. */ - view.sstables.sort(SSTableReader.maxTimestampDescending); + view.sortSSTablesByMaxTimestampDescending(); int nonIntersectingSSTables = 0; int includedDueToTombstones = 0; @@ -903,7 +903,7 @@ private UnfilteredRowIterator queryMemtableAndSSTablesInTimestampOrder(ColumnFam } /* add the SSTables on disk */ - view.sstables.sort(SSTableReader.maxTimestampDescending); + view.sortSSTablesByMaxTimestampDescending(); // read sorted sstables SSTableReadMetricsCollector metricsCollector = new SSTableReadMetricsCollector(); for (SSTableReader sstable : view.sstables) diff --git a/src/java/org/apache/cassandra/index/sai/QueryContext.java b/src/java/org/apache/cassandra/index/sai/QueryContext.java index a0b1719049ec..f3af9b5a2f83 100644 --- a/src/java/org/apache/cassandra/index/sai/QueryContext.java +++ b/src/java/org/apache/cassandra/index/sai/QueryContext.java @@ -211,9 +211,14 @@ public FilterSortOrder filterSortOrder() return filterSortOrder; } + public long approximateRemainingTimeNs() + { + return executionQuotaNano - totalQueryTimeNs(); + } + public void checkpoint() { - if (totalQueryTimeNs() >= executionQuotaNano && !DISABLE_TIMEOUT) + if (approximateRemainingTimeNs() >= 0 && !DISABLE_TIMEOUT) { addQueryTimeouts(1); throw new AbortedOperationException(); diff --git a/src/java/org/apache/cassandra/index/sai/plan/QueryView.java b/src/java/org/apache/cassandra/index/sai/plan/QueryView.java index 45f2481b99ca..e1f444fcdda1 100644 --- a/src/java/org/apache/cassandra/index/sai/plan/QueryView.java +++ b/src/java/org/apache/cassandra/index/sai/plan/QueryView.java @@ -47,12 +47,13 @@ public class QueryView implements AutoCloseable { - final ColumnFamilyStore.RefViewFragment view; + // We use a SortedRefViewFragment because it can be safely shared across multiple threads when materializing rows. + final ColumnFamilyStore.SortedRefViewFragment view; final Set referencedIndexes; final Set memtableIndexes; final IndexContext indexContext; - public QueryView(ColumnFamilyStore.RefViewFragment view, + public QueryView(ColumnFamilyStore.SortedRefViewFragment view, Set referencedIndexes, Set memtableIndexes, IndexContext indexContext) @@ -180,7 +181,7 @@ else if (MonotonicClock.approxTime.now() - failingSince > TimeUnit.MILLISECONDS. // freeze referencedIndexes and memtableIndexes, so we can safely give access to them // without risking something messes them up // (this was added after KeyRangeTermIterator messed them up which led to a bug) - return new QueryView(refViewFragment, + return new QueryView(ColumnFamilyStore.SortedRefViewFragment.sortThenCreateFrom(refViewFragment), Collections.unmodifiableSet(referencedIndexes), Collections.unmodifiableSet(memtableIndexes), indexContext); diff --git a/src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexSearcher.java b/src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexSearcher.java index cee2d69978fd..452af7d446f6 100644 --- a/src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexSearcher.java +++ b/src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexSearcher.java @@ -21,12 +21,13 @@ import java.util.ArrayDeque; import java.util.ArrayList; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.List; -import java.util.Map; -import java.util.PriorityQueue; import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -38,6 +39,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.concurrent.ImmediateExecutor; +import org.apache.cassandra.concurrent.LocalAwareExecutorService; +import org.apache.cassandra.concurrent.SharedExecutorPool; +import org.apache.cassandra.config.CassandraRelevantProperties; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.DataRange; import org.apache.cassandra.db.DecoratedKey; @@ -58,11 +63,13 @@ import org.apache.cassandra.index.sai.disk.format.IndexFeatureSet; import org.apache.cassandra.index.sai.iterators.KeyRangeIterator; import org.apache.cassandra.index.sai.metrics.TableQueryMetrics; +import org.apache.cassandra.index.sai.utils.AbortedOperationException; import org.apache.cassandra.index.sai.utils.PrimaryKey; import org.apache.cassandra.index.sai.utils.RangeUtil; import org.apache.cassandra.index.sai.utils.PrimaryKeyWithSortKey; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.AbstractIterator; import org.apache.cassandra.utils.CloseableIterator; import org.apache.cassandra.utils.FBUtilities; @@ -438,6 +445,31 @@ public void close() */ public static class ScoreOrderedResultRetriever extends AbstractIterator implements UnfilteredPartitionIterator { + private static final LocalAwareExecutorService PARALLEL_EXECUTOR = getExecutor(); + + /** + * Executor to use for parallel index reads. + * Defined by -Dcassandra.index_read.parallele=true/false, true by default. + * + * INDEX_READ uses 2 * cpus threads by default but can be overridden with -Dcassandra.index_read.parallel_thread_num= + * + * @return stage to use, default INDEX_READ + */ + private static LocalAwareExecutorService getExecutor() + { + boolean isParallel = CassandraRelevantProperties.USE_PARALLEL_INDEX_READ.getBoolean(); + + if (isParallel) + { + int numThreads = CassandraRelevantProperties.PARALLEL_INDEX_READ_NUM_THREADS.isPresent() + ? CassandraRelevantProperties.PARALLEL_INDEX_READ_NUM_THREADS.getInt() + : FBUtilities.getAvailableProcessors() * 2; + return SharedExecutorPool.SHARED.newExecutor(numThreads, maximumPoolSize -> {}, "request", "IndexParallelRead"); + } + else + return ImmediateExecutor.INSTANCE; + } + private final ColumnFamilyStore.RefViewFragment view; private final List> keyRanges; private final boolean coversFullRing; @@ -447,7 +479,7 @@ public static class ScoreOrderedResultRetriever extends AbstractIterator processedKeys; + private final ConcurrentHashMap processedKeys; private final Queue pendingRows; // The limit requested by the query. We cannot load more than softLimit rows in bulk because we only want @@ -475,7 +507,7 @@ private ScoreOrderedResultRetriever(CloseableIterator sco this.executionController = executionController; this.queryContext = queryContext; - this.processedKeys = new HashSet<>(limit); + this.processedKeys = new ConcurrentHashMap<>(limit); this.pendingRows = new ArrayDeque<>(limit); this.softLimit = limit; } @@ -492,56 +524,111 @@ public UnfilteredRowIterator computeNext() /** * Fills the pendingRows queue to generate a queue of row iterators for the supplied keys by repeatedly calling - * {@link #readAndValidatePartition} until it gives enough non-null results. + * {@link #fillRowsAsync} until it gives enough results or the source iterator is exhausted. */ private void fillPendingRows() { // We always want to get at least 1. int rowsToRetrieve = Math.max(1, softLimit - returnedRowCount); - var keys = new HashMap>(); - // We want to get the first unique `rowsToRetrieve` keys to materialize - // Don't pass the priority queue here because it is more efficient to add keys in bulk - fillKeys(keys, rowsToRetrieve, null); - // Sort the primary keys by PrK order, just in case that helps with cache and disk efficiency - var primaryKeyPriorityQueue = new PriorityQueue<>(keys.keySet()); - - while (!keys.isEmpty()) + + while (pendingRows.size() < rowsToRetrieve) { - var primaryKey = primaryKeyPriorityQueue.poll(); - var primaryKeyWithSortKeys = keys.remove(primaryKey); - var partitionIterator = readAndValidatePartition(primaryKey, primaryKeyWithSortKeys); - if (partitionIterator != null) - pendingRows.add(partitionIterator); - else - // The current primaryKey did not produce a partition iterator. We know the caller will need - // `rowsToRetrieve` rows, so we get the next unique key and add it to the queue. - fillKeys(keys, 1, primaryKeyPriorityQueue); + queryContext.checkpoint(); + // We want to get the first unique `rowsToRetrieve` keys to materialize + // Don't pass the priority queue here because it is more efficient to add keys in bulk + var isSourceExchausted = fillRowsAsync(rowsToRetrieve - pendingRows.size()); + if (isSourceExchausted) + return; } } /** - * Fills the keys map with the next `count` unique primary keys that are in the keys produced by calling - * {@link #nextSelectedKeyInRange()}. We map PrimaryKey to List because the same - * primary key can be in the result set multiple times, but with different source tables. - * @param keys the map to fill + * Consumes the next `count` unique primary keys produced {@link #nextSelectedKeyInRange()}, then materializes + * and validates the associated rows for each key using the configured executor. The method updates the + * {@link #pendingRows} queue with the materialized rows and the {@link #processedKeys} map with the keys that + * have been processed and can safely be skipped if seen again, which is possible because the score ordered + * iterator does not deduplicate keys. + * * @param count the number of unique PrimaryKeys to consume from the iterator - * @param primaryKeyPriorityQueue the priority queue to add new keys to. If the queue is null, we do not add - * keys to the queue. + * @return true if the source iterator is exhausted, false otherwise. We pass this information to the caller + * to skip an unnecessary call to {@link Iterator#hasNext()}, which likely triggers a disk read. */ - private void fillKeys(Map> keys, int count, PriorityQueue primaryKeyPriorityQueue) + private boolean fillRowsAsync(int count) { - int initialSize = keys.size(); - while (keys.size() - initialSize < count) + // We store a mapping of primary key to the collection of PrimaryKeyWithSortKey objects that will be used + // to validate the row. + var keysToMaterialize = new HashMap>(); + while (keysToMaterialize.size() < count) { var primaryKeyWithSortKey = nextSelectedKeyInRange(); + // The iterator is exhausted. if (primaryKeyWithSortKey == null) - return; + break; + // If we've already processed the key, we can skip it. Because the score ordered iterator does not + // deduplicate rows, we could see dupes if a row is in the ordering index multiple times. This happens + // in the case of dupes and of overwrites. + if (processedKeys.containsKey(primaryKeyWithSortKey.primaryKey())) + continue; var nextPrimaryKey = primaryKeyWithSortKey.primaryKey(); - var accumulator = keys.computeIfAbsent(nextPrimaryKey, k -> new ArrayList<>()); - if (primaryKeyPriorityQueue != null && accumulator.isEmpty()) - primaryKeyPriorityQueue.add(nextPrimaryKey); - accumulator.add(primaryKeyWithSortKey); + keysToMaterialize.computeIfAbsent(nextPrimaryKey, k -> new ArrayList<>()).add(primaryKeyWithSortKey); } + + if (keysToMaterialize.isEmpty()) + return true; + + List> rowFutures = new ArrayList<>(keysToMaterialize.size()); + + // Iterate through the keys and materialize the partitions using the configured executor. + for (var entry : keysToMaterialize.entrySet()) + { + var primaryKey = entry.getKey(); + var primaryKeyValidators = entry.getValue(); + CompletableFuture future = new CompletableFuture<>(); + rowFutures.add(future); + PARALLEL_EXECUTOR.maybeExecuteImmediately(() -> { + // We cancel the future if the query has timed out, so it is worth a quick read to the + // volatile field to possibly avoid getting the partition unnecessarily. + if (future.isDone()) + return; + try (var partition = controller.getPartition(primaryKey, view, executionController)) + { + // Validates that the parition's row is valid for the query predicates. It returns null + // if the row is not valid. Reasons for invalidity include: + // 1. The row is a range or row tombstone or is expired. + // 2. The row does not satisfy the query predicates + // 3. The row does not satisfy the ORDER BY clause + var partitionIterator = validatePartition(primaryKey, primaryKeyValidators, partition); + future.complete(partitionIterator); + } + catch (Throwable t) + { + future.completeExceptionally(t); + } + }); + } + + var rowsFuturesIterator = rowFutures.iterator(); + try + { + while (rowsFuturesIterator.hasNext()) + { + var nanosRemaining = queryContext.approximateRemainingTimeNs(); + UnfilteredRowIterator partitionIterator = rowsFuturesIterator.next().get(nanosRemaining, TimeUnit.NANOSECONDS); + if (partitionIterator != null) + pendingRows.add(partitionIterator); + } + } + catch (Throwable t) + { + // Instead of cancelling the futures, we complete them with null to avoid extra work associated with + // building a stack trace. + rowsFuturesIterator.forEachRemaining(f -> f.complete(null)); + if (t.getCause() instanceof AbortedOperationException) + throw (AbortedOperationException) t.getCause(); + throw new CompletionException(t); + } + + return false; } /** @@ -577,48 +664,53 @@ private boolean isInRange(DecoratedKey key) return null; } - public UnfilteredRowIterator readAndValidatePartition(PrimaryKey key, List primaryKeys) + public UnfilteredRowIterator validatePartition(PrimaryKey key, List primaryKeys, UnfilteredRowIterator partition) { - // If we've already processed the key, we can skip it. Because the score ordered iterator does not - // deduplicate rows, we could see dupes if a row is in the ordering index multiple times. This happens - // in the case of dupes and of overwrites. - if (processedKeys.contains(key)) - return null; + queryContext.addPartitionsRead(1); + queryContext.checkpoint(); + var staticRow = partition.staticRow(); + UnfilteredRowIterator clusters = applyIndexFilter(partition, filterTree, queryContext); - try (UnfilteredRowIterator partition = controller.getPartition(key, view, executionController)) + if (clusters == null || !clusters.hasNext()) { - queryContext.addPartitionsRead(1); - queryContext.checkpoint(); - var staticRow = partition.staticRow(); - UnfilteredRowIterator clusters = applyIndexFilter(partition, filterTree, queryContext); - - if (clusters == null || !clusters.hasNext()) - { - processedKeys.add(key); - return null; - } + processedKeys.put(key, Boolean.TRUE); + return null; + } - var now = FBUtilities.nowInSeconds(); - boolean isRowValid = false; - var row = clusters.next(); - assert !clusters.hasNext() : "Expected only one row per partition"; - if (!row.isRangeTombstoneMarker()) + var now = FBUtilities.nowInSeconds(); + boolean isRowValid = false; + var row = clusters.next(); + assert !clusters.hasNext() : "Expected only one row per partition"; + if (!row.isRangeTombstoneMarker()) + { + for (PrimaryKeyWithSortKey primaryKeyWithSortKey : primaryKeys) { - for (PrimaryKeyWithSortKey primaryKeyWithSortKey : primaryKeys) + // Each of these primary keys are equal, but they have different source tables. Therefore, + // we check to see if the row is valid for any of them, and if it is, we return the row. + if (primaryKeyWithSortKey.isIndexDataValid((Row) row, now)) { - // Each of these primary keys are equal, but they have different source tables. Therefore, - // we check to see if the row is valid for any of them, and if it is, we return the row. - if (primaryKeyWithSortKey.isIndexDataValid((Row) row, now)) - { - isRowValid = true; - // We can only count the key as processed once we know it was valid for one of the - // primary keys. - processedKeys.add(key); - break; - } + isRowValid = true; + // We can only count the key as processed once we know it was valid for one of the + // primary keys. + processedKeys.put(key, Boolean.TRUE); + break; } } - return isRowValid ? new PrimaryKeyIterator(partition, staticRow, row) : null; + } + return isRowValid ? new PrimaryKeyIterator(partition, staticRow, row) : null; + } + + private static class PrimaryKeyResult + { + final PrimaryKey primaryKey; + final List primaryKeyWithSortKeys; + final UnfilteredRowIterator partition; + + PrimaryKeyResult(PrimaryKey primaryKey, List primaryKeyWithSortKeys, UnfilteredRowIterator partition) + { + this.primaryKey = primaryKey; + this.primaryKeyWithSortKeys = primaryKeyWithSortKeys; + this.partition = partition; } } diff --git a/src/java/org/apache/cassandra/index/sai/plan/TopKProcessor.java b/src/java/org/apache/cassandra/index/sai/plan/TopKProcessor.java index 147acf41eaef..dafe0229c398 100644 --- a/src/java/org/apache/cassandra/index/sai/plan/TopKProcessor.java +++ b/src/java/org/apache/cassandra/index/sai/plan/TopKProcessor.java @@ -27,8 +27,6 @@ import java.util.SortedSet; import java.util.TreeMap; import java.util.TreeSet; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; import javax.annotation.Nullable; import org.apache.commons.lang3.tuple.Triple; @@ -38,10 +36,6 @@ import io.github.jbellis.jvector.vector.VectorizationProvider; import io.github.jbellis.jvector.vector.types.VectorFloat; import io.github.jbellis.jvector.vector.types.VectorTypeSupport; -import org.apache.cassandra.concurrent.ImmediateExecutor; -import org.apache.cassandra.concurrent.LocalAwareExecutorService; -import org.apache.cassandra.concurrent.SharedExecutorPool; -import org.apache.cassandra.config.CassandraRelevantProperties; import org.apache.cassandra.cql3.Operator; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.DecoratedKey; @@ -49,7 +43,6 @@ import org.apache.cassandra.db.ReadCommand; import org.apache.cassandra.db.filter.RowFilter; import org.apache.cassandra.db.partitions.BasePartitionIterator; -import org.apache.cassandra.db.partitions.ParallelCommandProcessor; import org.apache.cassandra.db.partitions.PartitionIterator; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; import org.apache.cassandra.db.rows.BaseRowIterator; @@ -59,7 +52,6 @@ import org.apache.cassandra.index.SecondaryIndexManager; import org.apache.cassandra.index.sai.IndexContext; import org.apache.cassandra.index.sai.StorageAttachedIndex; -import org.apache.cassandra.index.sai.utils.AbortedOperationException; import org.apache.cassandra.index.sai.utils.InMemoryPartitionIterator; import org.apache.cassandra.index.sai.utils.InMemoryUnfilteredPartitionIterator; import org.apache.cassandra.index.sai.utils.PartitionInfo; @@ -92,7 +84,6 @@ public class TopKProcessor public static final String INDEX_MAY_HAVE_BEEN_DROPPED = "An index may have been dropped. Ordering on non-clustering " + "column requires the column to be indexed"; protected static final Logger logger = LoggerFactory.getLogger(TopKProcessor.class); - private static final LocalAwareExecutorService PARALLEL_EXECUTOR = getExecutor(); private static final VectorTypeSupport vts = VectorizationProvider.getInstance().getVectorTypeSupport(); private final ReadCommand command; @@ -120,28 +111,7 @@ public TopKProcessor(ReadCommand command) this.limit = command.limits().count(); } - /** - * Executor to use for parallel index reads. - * Defined by -Dcassandra.index_read.parallele=true/false, true by default. - * - * INDEX_READ uses 2 * cpus threads by default but can be overridden with -Dcassandra.index_read.parallel_thread_num= - * - * @return stage to use, default INDEX_READ - */ - private static LocalAwareExecutorService getExecutor() - { - boolean isParallel = CassandraRelevantProperties.USE_PARALLEL_INDEX_READ.getBoolean(); - if (isParallel) - { - int numThreads = CassandraRelevantProperties.PARALLEL_INDEX_READ_NUM_THREADS.isPresent() - ? CassandraRelevantProperties.PARALLEL_INDEX_READ_NUM_THREADS.getInt() - : FBUtilities.getAvailableProcessors() * 2; - return SharedExecutorPool.SHARED.newExecutor(numThreads, maximumPoolSize -> {}, "request", "IndexParallelRead"); - } - else - return ImmediateExecutor.INSTANCE; - } /** * Filter given partitions and keep the rows with highest scores. In case of {@link UnfilteredPartitionIterator}, @@ -161,67 +131,10 @@ public , P extends BasePartit private , P extends BasePartitionIterator> BasePartitionIterator filterInternal(P partitions) { - // priority queue ordered by score in descending order - Comparator> comparator; - if (queryVector != null) - comparator = Comparator.comparing((Triple t) -> (Float) t.getRight()).reversed(); - else - { - comparator = Comparator.comparing(t -> (ByteBuffer) t.getRight(), indexContext.getValidator()); - if (expression.operator() == Operator.ORDER_BY_DESC) - comparator = comparator.reversed(); - } - var topK = new TopKSelector<>(comparator, limit); // to store top-k results in primary key order TreeMap> unfilteredByPartition = new TreeMap<>(Comparator.comparing(p -> p.key)); - if (PARALLEL_EXECUTOR != ImmediateExecutor.INSTANCE && partitions instanceof ParallelCommandProcessor) { - ParallelCommandProcessor pIter = (ParallelCommandProcessor) partitions; - var commands = pIter.getUninitializedCommands(); - List> results = new ArrayList<>(commands.size()); - - int count = commands.size(); - for (var command: commands) { - CompletableFuture future = new CompletableFuture<>(); - results.add(future); - - // run last command immediately, others in parallel (if possible) - count--; - var executor = count == 0 ? ImmediateExecutor.INSTANCE : PARALLEL_EXECUTOR; - - executor.maybeExecuteImmediately(() -> { - try (var partitionRowIterator = pIter.commandToIterator(command.left(), command.right())) - { - future.complete(partitionRowIterator == null ? null : processPartition(partitionRowIterator)); - } - catch (Throwable t) - { - future.completeExceptionally(t); - } - }); - } - - for (CompletableFuture triplesFuture: results) { - PartitionResults pr; - try - { - pr = triplesFuture.join(); - } - catch (CompletionException t) - { - if (t.getCause() instanceof AbortedOperationException) - throw (AbortedOperationException) t.getCause(); - throw t; - } - if (pr == null) - continue; - topK.addAll(pr.rows); - for (var uf: pr.tombstones) - addUnfiltered(unfilteredByPartition, pr.partitionInfo, uf); - } - } else if (partitions instanceof StorageAttachedIndexSearcher.ScoreOrderedResultRetriever) { - // FilteredPartitions does not implement ParallelizablePartitionIterator. - // Realistically, this won't benefit from parallelizm as these are coming from in-memory/memtable data. + if (partitions instanceof StorageAttachedIndexSearcher.ScoreOrderedResultRetriever) { int rowsMatched = 0; // Check rowsMatched first to prevent fetching one more partition than needed. while (rowsMatched < limit && partitions.hasNext()) @@ -233,8 +146,19 @@ private , P extends BaseParti } } } else { - // FilteredPartitions does not implement ParallelizablePartitionIterator. - // Realistically, this won't benefit from parallelizm as these are coming from in-memory/memtable data. + // Only the coordinator reaches this block when doing postprocessing on query results, which are all in + // memory, so we do not parallelize the processing. + // priority queue ordered by score in descending order + Comparator> comparator; + if (queryVector != null) + comparator = Comparator.comparing((Triple t) -> (Float) t.getRight()).reversed(); + else + { + comparator = Comparator.comparing(t -> (ByteBuffer) t.getRight(), indexContext.getValidator()); + if (expression.operator() == Operator.ORDER_BY_DESC) + comparator = comparator.reversed(); + } + var topK = new TopKSelector<>(comparator, limit); while (partitions.hasNext()) { // have to close to move to the next partition, otherwise hasNext() fails @@ -258,12 +182,11 @@ private , P extends BaseParti } } + // reorder rows in partition/clustering order + for (var triple : topK.getUnsortedShared()) + addUnfiltered(unfilteredByPartition, triple.getLeft(), triple.getMiddle()); } - // reorder rows in partition/clustering order - for (var triple : topK.getUnsortedShared()) - addUnfiltered(unfilteredByPartition, triple.getLeft(), triple.getMiddle()); - if (partitions instanceof PartitionIterator) return new InMemoryPartitionIterator(command, unfilteredByPartition); return new InMemoryUnfilteredPartitionIterator(command, unfilteredByPartition); From b2dfada00fc5b43be64e2fc64ed4f06fca3f1283 Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Mon, 6 Jan 2025 19:20:15 -0600 Subject: [PATCH 8/9] Fix QueryContext#checkpoint bug introduce in last commit --- src/java/org/apache/cassandra/index/sai/QueryContext.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/java/org/apache/cassandra/index/sai/QueryContext.java b/src/java/org/apache/cassandra/index/sai/QueryContext.java index f3af9b5a2f83..0a888a40ab36 100644 --- a/src/java/org/apache/cassandra/index/sai/QueryContext.java +++ b/src/java/org/apache/cassandra/index/sai/QueryContext.java @@ -218,7 +218,7 @@ public long approximateRemainingTimeNs() public void checkpoint() { - if (approximateRemainingTimeNs() >= 0 && !DISABLE_TIMEOUT) + if (approximateRemainingTimeNs() < 0 && !DISABLE_TIMEOUT) { addQueryTimeouts(1); throw new AbortedOperationException(); From 573aa8352b6683f9cc2e086dd54c47ed598bfe89 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Wed, 8 Jan 2025 09:10:42 +0100 Subject: [PATCH 9/9] StorageAttachedIndexSearcher: do not use maybeExecuteImmediately in fillRowsAsync --- .../cassandra/index/sai/plan/StorageAttachedIndexSearcher.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexSearcher.java b/src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexSearcher.java index 452af7d446f6..1efd88d18af4 100644 --- a/src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexSearcher.java +++ b/src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexSearcher.java @@ -585,7 +585,7 @@ private boolean fillRowsAsync(int count) var primaryKeyValidators = entry.getValue(); CompletableFuture future = new CompletableFuture<>(); rowFutures.add(future); - PARALLEL_EXECUTOR.maybeExecuteImmediately(() -> { + PARALLEL_EXECUTOR.submit(() -> { // We cancel the future if the query has timed out, so it is worth a quick read to the // volatile field to possibly avoid getting the partition unnecessarily. if (future.isDone())