Skip to content

Commit

Permalink
don't inject +score unless coordinator requests it; this is a cleaner…
Browse files Browse the repository at this point in the history
… approach than ignoring it when serialization fails later
  • Loading branch information
jbellis committed Dec 13, 2024
1 parent c0de416 commit 3967e7c
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,8 @@ public class SelectStatement implements CQLStatement.SingleKeyspaceCqlStatement
{
// TODO remove this when we no longer need to downgrade to replicas that don't know about synthetic columns,
// and the related code in
// - Columns.Serializer.encodeBitmap
// - UnfilteredSerializer.serializeRowBody)
// - StatementRestrictions.addOrderingRestrictions
// - StorageAttachedIndexSearcher.PrimaryKeyIterator constructor
public static final boolean ANN_USE_SYNTHETIC_SCORE = Boolean.parseBoolean(System.getProperty("cassandra.sai.ann_use_synthetic_score", "false"));

private static final Logger logger = LoggerFactory.getLogger(SelectStatement.class);
Expand Down
8 changes: 0 additions & 8 deletions src/java/org/apache/cassandra/db/Columns.java
Original file line number Diff line number Diff line change
Expand Up @@ -716,15 +716,7 @@ private static long encodeBitmap(Collection<ColumnMetadata> columns, Columns sup
for (ColumnMetadata column : columns)
{
if (iter.next(column) == null)
{
// We can't know for sure whether to add the synthetic score column because WildcardColumnFilter
// just says "yes" to everything; instead, we just skip it here.
// TODO remove this with SelectStatement.ANN_USE_SYNTHETIC_SCORE.
if (column.isSynthetic())
continue;

throw new IllegalStateException(columns + " is not a subset of " + superset);
}

int currentIndex = iter.indexOfCurrent();
int count = currentIndex - expectIndex;
Expand Down
15 changes: 15 additions & 0 deletions src/java/org/apache/cassandra/db/filter/ColumnFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ public abstract class ColumnFilter

public static final Serializer serializer = new Serializer();

// TODO remove this with ANN_USE_SYNTHETIC_SCORE
public abstract boolean fetchesExplicitly(ColumnMetadata column);

/**
* The fetching strategy for the different queries.
*/
Expand Down Expand Up @@ -669,6 +672,12 @@ public boolean fetches(ColumnMetadata column)
return true;
}

@Override
public boolean fetchesExplicitly(ColumnMetadata column)
{
return false;
}

@Override
public boolean fetchedColumnIsQueried(ColumnMetadata column)
{
Expand Down Expand Up @@ -817,6 +826,12 @@ public boolean fetches(ColumnMetadata column)
return fetchingStrategy.fetchesAllColumns(column.isStatic()) || fetched.contains(column);
}

@Override
public boolean fetchesExplicitly(ColumnMetadata column)
{
return fetched.contains(column);
}

/**
* Whether the provided complex cell (identified by its column and path), which is assumed to be _fetched_ by
* this filter, is also _queried_ by the user.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,11 +242,6 @@ private void serializeRowBody(Row row, int flags, SerializationHelper helper, Da
// with. So we use the ColumnMetadata from the "header" which is "current". Also see #11810 for what
// happens if we don't do that.
ColumnMetadata column = si.next(cd.column());
// We can't know for sure whether to add the synthetic score column because WildcardColumnFilter
// just says "yes" to everything; instead, we just skip it here.
// TODO remove this with SelectStatement.ANN_USE_SYNTHETIC_SCORE.
if (column == null)
return;
assert column != null : cd.column.toString();

try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,11 @@ public TableMetadata metadata()
return command.metadata();
}

public ReadCommand command()
{
return command;
}

RowFilter.FilterElement filterOperation()
{
// NOTE: we cannot remove the order by filter expression here yet because it is used in the FilterTree class
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.cassandra.db.PartitionPosition;
import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.ReadExecutionController;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.marshal.FloatType;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
import org.apache.cassandra.db.rows.AbstractUnfilteredRowIterator;
Expand Down Expand Up @@ -636,7 +637,7 @@ public UnfilteredRowIterator readAndValidatePartition(PrimaryKey pk, List<Primar
}
}
}
return isRowValid ? new PrimaryKeyIterator(partition, staticRow, row, sourceKeys) : null;
return isRowValid ? new PrimaryKeyIterator(partition, staticRow, row, sourceKeys, controller.command()) : null;
}
}

Expand All @@ -657,7 +658,7 @@ public static class PrimaryKeyIterator extends AbstractUnfilteredRowIterator
private boolean consumed = false;
private final Unfiltered row;

public PrimaryKeyIterator(UnfilteredRowIterator partition, Row staticRow, Unfiltered content, List<PrimaryKeyWithSortKey> primaryKeysWithScore)
public PrimaryKeyIterator(UnfilteredRowIterator partition, Row staticRow, Unfiltered content, List<PrimaryKeyWithSortKey> primaryKeysWithScore, ReadCommand command)
{
super(partition.metadata(),
partition.partitionKey(),
Expand All @@ -668,7 +669,24 @@ public PrimaryKeyIterator(UnfilteredRowIterator partition, Row staticRow, Unfilt
partition.stats());

assert !primaryKeysWithScore.isEmpty();
if (!content.isRow() || !(primaryKeysWithScore.get(0) instanceof PrimaryKeyWithScore))
var isScoredRow = primaryKeysWithScore.get(0) instanceof PrimaryKeyWithScore;
if (!content.isRow() || !isScoredRow)
{
this.row = content;
return;
}

// When +score is added on the coordinator side, it's represented as a PrecomputedColumnFilter
// even in a 'SELECT *' because WCF is not capable of representing synthetic columns.
// This can be simplified when we remove ANN_USE_SYNTHETIC_SCORE
var tm = metadata();
var scoreColumn = ColumnMetadata.syntheticColumn(tm.keyspace,
tm.name,
ColumnMetadata.SYNTHETIC_SCORE_ID,
FloatType.instance);
var isScoreFetched = !(command.columnFilter() instanceof ColumnFilter.WildCardColumnFilter)
&& command.columnFilter().fetchesExplicitly(scoreColumn);
if (!isScoreFetched)
{
this.row = content;
return;
Expand All @@ -680,11 +698,6 @@ public PrimaryKeyIterator(UnfilteredRowIterator partition, Row staticRow, Unfilt
columnData.addAll(originalRow.columnData());

// inject +score as a new column
var tm = metadata();
var scoreColumn = ColumnMetadata.syntheticColumn(tm.keyspace,
tm.name,
ColumnMetadata.SYNTHETIC_SCORE_ID,
FloatType.instance);
var pkWithScore = (PrimaryKeyWithScore) primaryKeysWithScore.get(0);
columnData.add(BufferCell.live(scoreColumn,
FBUtilities.nowInSeconds(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ private float getScoreForRow(DecoratedKey key, Row row)
return FloatType.instance.compose(cell.buffer());
}

// TODO remove this once we enable the scored path for vector queries
// TODO remove this once we enable ANN_USE_SYNTHETIC_SCORE
ByteBuffer value = indexContext.getValueOf(key, row, FBUtilities.nowInSeconds());
if (value != null)
{
Expand Down

0 comments on commit 3967e7c

Please sign in to comment.