From 50c4a57aad2b9a7db8e32eb44a93ffe472b37e55 Mon Sep 17 00:00:00 2001 From: Jonathan Ellis Date: Thu, 12 Dec 2024 17:10:40 -0600 Subject: [PATCH] don't inject +score unless coordinator requests it; this is a cleaner approach than ignoring it when serialization fails later --- .../cql3/statements/SelectStatement.java | 3 +- src/java/org/apache/cassandra/db/Columns.java | 8 ----- .../db/rows/UnfilteredSerializer.java | 5 ---- .../index/sai/plan/QueryController.java | 5 ++++ .../plan/StorageAttachedIndexSearcher.java | 29 ++++++++++++++----- .../index/sai/plan/TopKProcessor.java | 2 +- 6 files changed, 28 insertions(+), 24 deletions(-) diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java index 847d058cf26b..810dd605577f 100644 --- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java @@ -103,9 +103,8 @@ public class SelectStatement implements CQLStatement.SingleKeyspaceCqlStatement { // TODO remove this when we no longer need to downgrade to replicas that don't know about synthetic columns, // and the related code in - // - Columns.Serializer.encodeBitmap - // - UnfilteredSerializer.serializeRowBody) // - StatementRestrictions.addOrderingRestrictions + // - StorageAttachedIndexSearcher.PrimaryKeyIterator constructor public static final boolean ANN_USE_SYNTHETIC_SCORE = Boolean.parseBoolean(System.getProperty("cassandra.sai.ann_use_synthetic_score", "false")); private static final Logger logger = LoggerFactory.getLogger(SelectStatement.class); diff --git a/src/java/org/apache/cassandra/db/Columns.java b/src/java/org/apache/cassandra/db/Columns.java index becd23508d1d..cef03393a1db 100644 --- a/src/java/org/apache/cassandra/db/Columns.java +++ b/src/java/org/apache/cassandra/db/Columns.java @@ -716,15 +716,7 @@ private static long encodeBitmap(Collection columns, Columns sup for (ColumnMetadata column : columns) { if (iter.next(column) == null) - { - // We can't know for sure whether to add the synthetic score column because WildcardColumnFilter - // just says "yes" to everything; instead, we just skip it here. - // TODO remove this with SelectStatement.ANN_USE_SYNTHETIC_SCORE. - if (column.isSynthetic()) - continue; - throw new IllegalStateException(columns + " is not a subset of " + superset); - } int currentIndex = iter.indexOfCurrent(); int count = currentIndex - expectIndex; diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java index c94e3470bb33..a38305e1dd71 100644 --- a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java +++ b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java @@ -242,11 +242,6 @@ private void serializeRowBody(Row row, int flags, SerializationHelper helper, Da // with. So we use the ColumnMetadata from the "header" which is "current". Also see #11810 for what // happens if we don't do that. ColumnMetadata column = si.next(cd.column()); - // We can't know for sure whether to add the synthetic score column because WildcardColumnFilter - // just says "yes" to everything; instead, we just skip it here. - // TODO remove this with SelectStatement.ANN_USE_SYNTHETIC_SCORE. - if (column == null) - return; assert column != null : cd.column.toString(); try diff --git a/src/java/org/apache/cassandra/index/sai/plan/QueryController.java b/src/java/org/apache/cassandra/index/sai/plan/QueryController.java index fbe4430efc6c..7cd69a2e036b 100644 --- a/src/java/org/apache/cassandra/index/sai/plan/QueryController.java +++ b/src/java/org/apache/cassandra/index/sai/plan/QueryController.java @@ -195,6 +195,11 @@ public TableMetadata metadata() return command.metadata(); } + public ReadCommand command() + { + return command; + } + RowFilter.FilterElement filterOperation() { // NOTE: we cannot remove the order by filter expression here yet because it is used in the FilterTree class diff --git a/src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexSearcher.java b/src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexSearcher.java index d9f0dfc013b1..683329c3762e 100644 --- a/src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexSearcher.java +++ b/src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexSearcher.java @@ -42,6 +42,7 @@ import org.apache.cassandra.db.PartitionPosition; import org.apache.cassandra.db.ReadCommand; import org.apache.cassandra.db.ReadExecutionController; +import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.marshal.FloatType; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; import org.apache.cassandra.db.rows.AbstractUnfilteredRowIterator; @@ -636,7 +637,7 @@ public UnfilteredRowIterator readAndValidatePartition(PrimaryKey pk, List primaryKeysWithScore) + public PrimaryKeyIterator(UnfilteredRowIterator partition, Row staticRow, Unfiltered content, List primaryKeysWithScore, ReadCommand command) { super(partition.metadata(), partition.partitionKey(), @@ -668,7 +669,24 @@ public PrimaryKeyIterator(UnfilteredRowIterator partition, Row staticRow, Unfilt partition.stats()); assert !primaryKeysWithScore.isEmpty(); - if (!content.isRow() || !(primaryKeysWithScore.get(0) instanceof PrimaryKeyWithScore)) + var isScoredRow = primaryKeysWithScore.get(0) instanceof PrimaryKeyWithScore; + if (!content.isRow() || !isScoredRow) + { + this.row = content; + return; + } + + // When +score is added on the coordinator side, it's represented as a PrecomputedColumnFilter + // even in a 'SELECT *' because WCF is not capable of representing synthetic columns. + // This can be simplified when we remove ANN_USE_SYNTHETIC_SCORE + var tm = metadata(); + var scoreColumn = ColumnMetadata.syntheticColumn(tm.keyspace, + tm.name, + ColumnMetadata.SYNTHETIC_SCORE_ID, + FloatType.instance); + var isScoreFetched = !(command.columnFilter() instanceof ColumnFilter.WildCardColumnFilter) + && command.columnFilter().fetches(scoreColumn); + if (!isScoreFetched) { this.row = content; return; @@ -680,11 +698,6 @@ public PrimaryKeyIterator(UnfilteredRowIterator partition, Row staticRow, Unfilt columnData.addAll(originalRow.columnData()); // inject +score as a new column - var tm = metadata(); - var scoreColumn = ColumnMetadata.syntheticColumn(tm.keyspace, - tm.name, - ColumnMetadata.SYNTHETIC_SCORE_ID, - FloatType.instance); var pkWithScore = (PrimaryKeyWithScore) primaryKeysWithScore.get(0); columnData.add(BufferCell.live(scoreColumn, FBUtilities.nowInSeconds(), diff --git a/src/java/org/apache/cassandra/index/sai/plan/TopKProcessor.java b/src/java/org/apache/cassandra/index/sai/plan/TopKProcessor.java index c52fe19ae3d8..775a92aedf9f 100644 --- a/src/java/org/apache/cassandra/index/sai/plan/TopKProcessor.java +++ b/src/java/org/apache/cassandra/index/sai/plan/TopKProcessor.java @@ -364,7 +364,7 @@ private float getScoreForRow(DecoratedKey key, Row row) return FloatType.instance.compose(cell.buffer()); } - // TODO remove this once we enable the scored path for vector queries + // TODO remove this once we enable ANN_USE_SYNTHETIC_SCORE ByteBuffer value = indexContext.getValueOf(key, row, FBUtilities.nowInSeconds()); if (value != null) {