Skip to content

Commit

Permalink
handle eq in an analyzed index by transforming it into a Match restri…
Browse files Browse the repository at this point in the history
…ction in SingleColumnRelation.newEQRestriction. This eliminates the need for skipMerge and special cases in doMergeWith, and moves the issuing of warnings next to the place where the transformation occurs instead of doing it much later in RowFilterValidator (which is no longer needed)
  • Loading branch information
jbellis committed Dec 11, 2024
1 parent cfe204a commit cef71e3
Show file tree
Hide file tree
Showing 12 changed files with 62 additions and 188 deletions.
13 changes: 12 additions & 1 deletion src/java/org/apache/cassandra/cql3/SingleColumnRelation.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.util.Objects;

import org.apache.cassandra.db.marshal.VectorType;
import org.apache.cassandra.index.IndexRegistry;
import org.apache.cassandra.index.sai.analyzer.AnalyzerEqOperatorSupport;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.cql3.Term.Raw;
Expand All @@ -33,6 +35,7 @@
import org.apache.cassandra.db.marshal.ListType;
import org.apache.cassandra.db.marshal.MapType;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.service.ClientWarn;

import static org.apache.cassandra.cql3.statements.RequestValidations.checkFalse;
import static org.apache.cassandra.cql3.statements.RequestValidations.checkTrue;
Expand Down Expand Up @@ -191,7 +194,15 @@ protected Restriction newEQRestriction(TableMetadata table, VariableSpecificatio
if (mapKey == null)
{
Term term = toTerm(toReceivers(columnDef), value, table.keyspace, boundNames);
return new SingleColumnRestriction.EQRestriction(columnDef, term);
var analyzedIndex = IndexRegistry.obtain(table).supportsAnalyzedEq(columnDef);
if (analyzedIndex == null)
return new SingleColumnRestriction.EQRestriction(columnDef, term);

ClientWarn.instance.warn(String.format(AnalyzerEqOperatorSupport.EQ_RESTRICTION_ON_ANALYZED_WARNING,
columnDef.toString(),
analyzedIndex.getIndexMetadata().name),
columnDef);
return new SingleColumnRestriction.AnalyzerMatchesRestriction(columnDef, term);
}
List<? extends ColumnSpecification> receivers = toReceivers(columnDef);
Term entryKey = toTerm(Collections.singletonList(receivers.get(0)), mapKey, table.keyspace, boundNames);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public ClusteringColumnRestrictions.Builder addRestriction(Restriction restricti
SingleRestriction lastRestriction = restrictions.lastRestriction();
ColumnMetadata lastRestrictionStart = lastRestriction.getFirstColumn();
ColumnMetadata newRestrictionStart = newRestriction.getFirstColumn();
restrictions.addRestriction(newRestriction, isDisjunction, indexRegistry);
restrictions.addRestriction(newRestriction, isDisjunction);

checkFalse(lastRestriction.isSlice() && newRestrictionStart.position() > lastRestrictionStart.position(),
"Clustering column \"%s\" cannot be restricted (preceding column \"%s\" is restricted by a non-EQ relation)",
Expand All @@ -203,7 +203,7 @@ public ClusteringColumnRestrictions.Builder addRestriction(Restriction restricti
}
else
{
restrictions.addRestriction(newRestriction, isDisjunction, indexRegistry);
restrictions.addRestriction(newRestriction, isDisjunction);
}

return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public PartitionKeyRestrictions build(IndexRegistry indexRegistry, boolean isDis
if (restriction.isOnToken())
return buildWithTokens(restrictionSet, i, indexRegistry);

restrictionSet.addRestriction((SingleRestriction) restriction, isDisjunction, indexRegistry);
restrictionSet.addRestriction((SingleRestriction) restriction, isDisjunction);
}

return buildPartitionKeyRestrictions(restrictionSet);
Expand Down
36 changes: 13 additions & 23 deletions src/java/org/apache/cassandra/cql3/restrictions/RestrictionSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -413,45 +413,35 @@ private Builder()
{
}

public void addRestriction(SingleRestriction restriction, boolean isDisjunction, IndexRegistry indexRegistry)
public void addRestriction(SingleRestriction restriction, boolean isDisjunction)
{
List<ColumnMetadata> columnDefs = restriction.getColumnDefs();

if (isDisjunction)
{
// If this restriction is part of a disjunction query then we don't want
// to merge the restrictions (if that is possible), we just add the
// restriction to the set of restrictions for the column.
// to merge the restrictions, we just add the new restriction
addRestrictionForColumns(columnDefs, restriction, null);
}
else
{
// In some special cases such as EQ in analyzed index we need to skip merging the restriction,
// so we can send multiple EQ restrictions to the index.
if (restriction.skipMerge(indexRegistry))
// ANDed together restrictions against the same columns should be merged.
Set<SingleRestriction> existingRestrictions = getRestrictions(newRestrictions, columnDefs);
// Trivial case of no existing restrictions
if (existingRestrictions.isEmpty())
{
addRestrictionForColumns(columnDefs, restriction, null);
return;
}
// Since we merge new restrictions into the existing ones at each pass, there should only be
// at most one existing restriction across the same columnDefs
assert existingRestrictions.size() == 1 : existingRestrictions;

// If this restriction isn't part of a disjunction then we need to get
// the set of existing restrictions for the column and merge them with the
// new restriction
Set<SingleRestriction> existingRestrictions = getRestrictions(newRestrictions, columnDefs);

SingleRestriction merged = restriction;
Set<SingleRestriction> replacedRestrictions = new HashSet<>();

for (SingleRestriction existing : existingRestrictions)
{
if (!existing.skipMerge(indexRegistry))
{
merged = existing.mergeWith(merged);
replacedRestrictions.add(existing);
}
}
// Perform the merge
SingleRestriction existing = existingRestrictions.iterator().next();
var merged = existing.mergeWith(restriction);

addRestrictionForColumns(merged.getColumnDefs(), merged, replacedRestrictions);
addRestrictionForColumns(merged.getColumnDefs(), merged, Set.of(existing));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,24 +198,6 @@ public String toString()
return String.format("EQ(%s)", term);
}

@Override
public boolean skipMerge(IndexRegistry indexRegistry)
{
// We should skip merging this EQ if there is an analyzed index for this column that supports EQ,
// so there can be multiple EQs for the same column.

if (indexRegistry == null)
return false;

for (Index index : indexRegistry.listIndexes())
{
if (index.supportsExpression(columnDef, Operator.ANALYZER_MATCHES) &&
index.supportsExpression(columnDef, Operator.EQ))
return true;
}
return false;
}

@Override
public SingleRestriction doMergeWith(SingleRestriction otherRestriction)
{
Expand Down Expand Up @@ -1402,10 +1384,12 @@ public String toString()
@Override
public SingleRestriction doMergeWith(SingleRestriction otherRestriction)
{
if (!(otherRestriction.isAnalyzerMatches()))
if (!otherRestriction.isAnalyzerMatches())
throw invalidRequest(CANNOT_BE_MERGED_ERROR, columnDef.name);

List<Term> otherValues = ((AnalyzerMatchesRestriction) otherRestriction).getValues();
List<Term> otherValues = otherRestriction instanceof AnalyzerMatchesRestriction
? ((AnalyzerMatchesRestriction) otherRestriction).getValues()
: List.of(((EQRestriction) otherRestriction).term);
List<Term> newValues = new ArrayList<>(values.size() + otherValues.size());
newValues.addAll(values);
newValues.addAll(otherValues);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.cassandra.cql3.QueryOptions;
import org.apache.cassandra.cql3.statements.Bound;
import org.apache.cassandra.db.MultiClusteringBuilder;
import org.apache.cassandra.index.IndexRegistry;

/**
* A single restriction/clause on one or multiple column.
Expand Down Expand Up @@ -97,17 +96,6 @@ public default boolean isInclusive(Bound b)
return true;
}

/**
* Checks if this restriction shouldn't be merged with other restrictions.
*
* @param indexRegistry the index registry
* @return {@code true} if this shouldn't be merged with other restrictions
*/
default boolean skipMerge(IndexRegistry indexRegistry)
{
return false;
}

/**
* Merges this restriction with the specified one.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ else if (def.isClusteringColumn() && nestingLevel == 0)
}
else
{
nonPrimaryKeyRestrictionSet.addRestriction((SingleRestriction) restriction, element.isDisjunction(), indexRegistry);
nonPrimaryKeyRestrictionSet.addRestriction((SingleRestriction) restriction, element.isDisjunction());
}
}
}
Expand Down Expand Up @@ -699,7 +699,7 @@ else if (indexOrderings.size() == 1)
throw new InvalidRequestException(String.format(NON_CLUSTER_ORDERING_REQUIRES_INDEX_MESSAGE,
restriction.getFirstColumn()));
}
receiver.addRestriction(restriction, false, indexRegistry);
receiver.addRestriction(restriction, false);
}
}

Expand Down
2 changes: 0 additions & 2 deletions src/java/org/apache/cassandra/db/ReadCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -383,8 +383,6 @@ static Index.QueryPlan findIndexQueryPlan(TableMetadata table, RowFilter rowFilt
@Override
public void maybeValidateIndexes()
{
IndexRegistry.obtain(metadata()).validate(rowFilter());

if (null != indexQueryPlan)
indexQueryPlan.validate(this);
}
Expand Down
25 changes: 11 additions & 14 deletions src/java/org/apache/cassandra/index/IndexRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,6 @@ public Optional<Index> getBestIndexFor(RowFilter.Expression expression)
public void validate(PartitionUpdate update)
{
}

@Override
public void validate(RowFilter filter)
{
// no-op since it's an empty registry
}
};

/**
Expand Down Expand Up @@ -295,12 +289,6 @@ public Optional<Index> getBestIndexFor(RowFilter.Expression expression)
public void validate(PartitionUpdate update)
{
}

@Override
public void validate(RowFilter filter)
{
// no-op since it's an empty registry
}
};

default void registerIndex(Index index)
Expand Down Expand Up @@ -341,8 +329,6 @@ default Optional<Index.Analyzer> getAnalyzerFor(ColumnMetadata column, Operator
*/
void validate(PartitionUpdate update);

void validate(RowFilter filter);

/**
* Returns the {@code IndexRegistry} associated to the specified table.
*
Expand All @@ -356,4 +342,15 @@ public static IndexRegistry obtain(TableMetadata table)

return table.isVirtual() ? EMPTY : Keyspace.openAndGetStore(table).indexManager;
}

default Index supportsAnalyzedEq(ColumnMetadata cm)
{
for (Index index : listIndexes())
{
if (index.supportsExpression(cm, Operator.ANALYZER_MATCHES) &&
index.supportsExpression(cm, Operator.EQ))
return index;
}
return null;
}
}
103 changes: 0 additions & 103 deletions src/java/org/apache/cassandra/index/RowFilterValidator.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -1277,12 +1277,6 @@ public void validate(PartitionUpdate update) throws InvalidRequestException
index.validate(update);
}

@Override
public void validate(RowFilter filter)
{
RowFilterValidator.validate(filter, indexes.values());
}

/*
* IndexRegistry methods
*/
Expand Down
Loading

0 comments on commit cef71e3

Please sign in to comment.