Skip to content

Commit

Permalink
FMWK-196 Index Cardinality (#657)
Browse files Browse the repository at this point in the history
* Support choosing a qualifier based on index cardinality

* 1. Remove wrong TODO comment, sindex-stat is only supported per index
2. Reuse methods, avoid code duplication

* Polish: use primitives where possible, @slf4j rather than Logger, simplify conditions

* Remove redundant initialization on a primitive field
  • Loading branch information
roimenashe authored Nov 16, 2023
1 parent 579a204 commit 070b2a9
Show file tree
Hide file tree
Showing 8 changed files with 158 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,16 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.aerospike.query.cache.IndexesCache;
import org.springframework.data.aerospike.query.model.Index;
import org.springframework.data.aerospike.query.model.IndexedField;
import org.springframework.data.aerospike.repository.query.Query;
import org.springframework.lang.Nullable;
import org.springframework.util.StringUtils;

import java.util.Comparator;
import java.util.List;
import java.util.Optional;

import static org.springframework.data.aerospike.query.QualifierUtils.queryCriteriaIsNotNull;

/**
Expand Down Expand Up @@ -59,13 +64,40 @@ public Statement build(String namespace, String set, @Nullable Query query, Stri
}

private void setStatementFilterFromQualifiers(Statement stmt, Qualifier qualifier) {
/*
* query with qualifier
*/
if (qualifier == null) return;
// No qualifier, no need to set statement filter
if (qualifier == null) {
return;
}

// Multiple qualifiers
// No sense to use secondary index in case of OR as it requires to enlarge selection to more than 1 field
if (qualifier.getOperation() == FilterOperation.AND) {
// no sense to use secondary index in case of OR
// as it requires to enlarge selection to more than 1 field
setFilterFromMultipleQualifiers(stmt, qualifier);
} else if (isIndexedBin(stmt, qualifier)) { // Single qualifier
setFilterFromSingleQualifier(stmt, qualifier);
}
}

private void setFilterFromMultipleQualifiers(Statement stmt, Qualifier qualifier) {
int minBinValuesRatio = Integer.MAX_VALUE;
Qualifier minBinValuesRatioQualifier = null;

for (Qualifier innerQualifier : qualifier.getQualifiers()) {
if (innerQualifier != null && isIndexedBin(stmt, innerQualifier)) {
int currBinValuesRatio = getMinBinValuesRatioForQualifier(stmt, innerQualifier);
// Compare the cardinality of each qualifier and select the qualifier that has the index with
// the lowest bin values ratio
if (currBinValuesRatio != 0 && currBinValuesRatio < minBinValuesRatio) {
minBinValuesRatio = currBinValuesRatio;
minBinValuesRatioQualifier = innerQualifier;
}
}
}

// If index with min bin values ratio found, set filter with the matching qualifier
if (minBinValuesRatioQualifier != null) {
setFilterFromSingleQualifier(stmt, minBinValuesRatioQualifier);
} else { // No index with bin values ratio found, do not consider cardinality when setting a filter
for (Qualifier innerQualifier : qualifier.getQualifiers()) {
if (innerQualifier != null && isIndexedBin(stmt, innerQualifier)) {
Filter filter = innerQualifier.setQueryAsFilter();
Expand All @@ -76,13 +108,14 @@ private void setStatementFilterFromQualifiers(Statement stmt, Qualifier qualifie
}
}
}
} else if (isIndexedBin(stmt, qualifier)) {
Filter filter = qualifier.setQueryAsFilter();
if (filter != null) {
stmt.setFilter(filter);
// the filter from the first processed qualifier becomes statement's sIndex filter
qualifier.setQueryAsFilter(true);
}
}
}

private void setFilterFromSingleQualifier(Statement stmt, Qualifier qualifier) {
Filter filter = qualifier.setQueryAsFilter();
if (filter != null) {
stmt.setFilter(filter);
qualifier.setQueryAsFilter(true);
}
}

Expand All @@ -101,4 +134,17 @@ private boolean isIndexedBin(Statement stmt, Qualifier qualifier) {
}
return hasIndex;
}

private int getMinBinValuesRatioForQualifier(Statement stmt, Qualifier qualifier) {
// Get all indexes that uses this field
List<Index> indexList = indexesCache.getAllIndexesForField(
new IndexedField(stmt.getNamespace(), stmt.getSetName(), qualifier.getField()));

// Return the lowest bin values ratio of the indexes in indexList
Optional<Index> minBinValuesRatio = indexList.stream()
.filter(index -> index.getBinValuesRatio() != 0)
.min(Comparator.comparing(Index::getBinValuesRatio));

return minBinValuesRatio.map(Index::getBinValuesRatio).orElse(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,11 @@ public void refreshIndexes() {
.findAny() // we do want to send info request to the random node (sending request to the first node may
// lead to uneven request distribution)
.map(node -> Info.request(infoPolicy, node, indexOperations.buildGetIndexesCommand()))
.map(indexOperations::parseIndexesInfo)
.map(response -> {
IndexesInfo indexesInfo = indexOperations.parseIndexesInfo(response);
indexOperations.enrichIndexesWithCardinality(client, indexesInfo.indexes);
return indexesInfo;
})
.orElse(IndexesInfo.empty());
log.debug("Loaded indexes: {}", cache.indexes);
this.indexesCacheUpdater.update(cache);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,32 @@
import org.springframework.data.aerospike.query.model.IndexKey;
import org.springframework.data.aerospike.query.model.IndexedField;

import java.util.List;
import java.util.Optional;

public interface IndexesCache {

/**
* @param indexKey to search by
* Get index from the indexes cache by providing an index key.
*
* @param indexKey Index key to search by.
* @return Optional {@link Index}
*/
Optional<Index> getIndex(IndexKey indexKey);

/**
* @param indexedField to search by
* @return true if there is an index for the given indexed field
* Get all indexes for a given indexed field object.
*
* @param indexedField Indexed field to search by.
* @return List of indexes matching the indexed field properties.
*/
List<Index> getAllIndexesForField(IndexedField indexedField);

/**
* Boolean indication of whether an index exists for the given indexed field
*
* @param indexedField Indexed field to search by
* @return True if there is an index for the given indexed field.
*/
boolean hasIndexFor(IndexedField indexedField);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import org.springframework.data.aerospike.query.model.IndexedField;
import org.springframework.data.aerospike.query.model.IndexesInfo;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;

/**
Expand All @@ -34,6 +37,21 @@ public Optional<Index> getIndex(IndexKey indexKey) {
return Optional.ofNullable(cache.indexes.get(indexKey));
}

@Override
public List<Index> getAllIndexesForField(IndexedField indexedField) {
List<Index> indexList = new ArrayList<>();

cache.indexes.forEach((key, value) -> {
if (Objects.equals(key.getNamespace(), indexedField.getNamespace()) &&
Objects.equals(key.getSet(), indexedField.getSet()) &&
Objects.equals(key.getField(), indexedField.getField())
) {
indexList.add(value);
}
});
return indexList;
}

@Override
public boolean hasIndexFor(IndexedField indexedField) {
return cache.indexedFields.contains(indexedField);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,18 @@
*/
package org.springframework.data.aerospike.query.cache;

import com.aerospike.client.IAerospikeClient;
import com.aerospike.client.Info;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.aerospike.query.model.Index;
import org.springframework.data.aerospike.query.model.IndexKey;
import org.springframework.data.aerospike.query.model.IndexesInfo;
import org.springframework.data.aerospike.utility.ServerVersionUtils;

import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.stream.Collectors;

import static java.util.stream.Collectors.collectingAndThen;
import static java.util.stream.Collectors.toMap;
Expand All @@ -30,6 +36,7 @@
*
* @author Sergii Karpenko
*/
@Slf4j
public class InternalIndexOperations {

// Base64 will return index context as a base64 response
Expand Down Expand Up @@ -60,4 +67,34 @@ public IndexesInfo parseIndexesInfo(String infoResponse) {
public String buildGetIndexesCommand() {
return SINDEX_WITH_BASE64;
}

public void enrichIndexesWithCardinality(IAerospikeClient client, Map<IndexKey, Index> indexes) {
log.debug("Enriching secondary indexes with cardinality");
indexes.values().forEach(
index -> index.setBinValuesRatio(getIndexBinValuesRatio(client, index.getNamespace(), index.getName()))
);
}

public int getIndexBinValuesRatio(IAerospikeClient client, String namespace, String indexName) {
if (ServerVersionUtils.isSIndexCardinalitySupported(client)) {

try {
String indexStatData = Info.request(null, client.getCluster().getRandomNode(),
String.format("sindex-stat:ns=%s;indexname=%s", namespace, indexName));

return Integer.parseInt(
Arrays.stream(indexStatData.split(";"))
.map(String::trim)
.toList().stream()
.map(stat -> Arrays.stream(stat.split("="))
.map(String::trim)
.collect(Collectors.toList()))
.collect(Collectors.toMap(t -> t.get(0), t -> t.get(1)))
.get("entries_per_bval"));
} catch (Exception e) {
log.warn("Failed to fetch secondary index {} cardinality", indexName, e);
}
}
return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public Mono<Void> refreshIndexes() {
.doOnSubscribe(subscription -> log.trace("Loading indexes"))
.doOnNext(indexInfo -> {
IndexesInfo cache = indexOperations.parseIndexesInfo(indexInfo);
indexOperations.enrichIndexesWithCardinality(client.getAerospikeClient(), cache.indexes);
this.indexesCacheUpdater.update(cache);
log.debug("Loaded indexes: {}", cache.indexes);
}).then();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,30 +19,35 @@
import com.aerospike.client.cdt.CTX;
import com.aerospike.client.query.IndexCollectionType;
import com.aerospike.client.query.IndexType;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Value;
import lombok.Setter;

/**
* This class represents a Secondary Index created in the cluster.
*
* @author Peter Milne
* @author Anastasiia Smirnova
*/
@Value
@Builder
@RequiredArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode
@Getter
public class Index {

String name;
String namespace;
String set;
String bin;
IndexType indexType;
IndexCollectionType indexCollectionType;
CTX[] ctx;
private final String name;
private final String namespace;
private final String set;
private final String bin;
private final IndexType indexType;
private final IndexCollectionType indexCollectionType;
private final CTX[] ctx;
@Setter
private int binValuesRatio;

public Index(String name, String namespace, String set, String bin, IndexType indexType,
IndexCollectionType indexCollectionType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@

import com.aerospike.client.IAerospikeClient;
import com.aerospike.client.Info;
import lombok.experimental.UtilityClass;

import java.lang.module.ModuleDescriptor;

@UtilityClass
public class ServerVersionUtils {

private static final ModuleDescriptor.Version SERVER_VERSION_6_0_0_0 = ModuleDescriptor.Version.parse("6.0.0.0");
private static final ModuleDescriptor.Version SERVER_VERSION_6_1_0_0 = ModuleDescriptor.Version.parse("6.1.0.0");
private static final ModuleDescriptor.Version SERVER_VERSION_6_1_0_1 = ModuleDescriptor.Version.parse("6.1.0.1");
private static final ModuleDescriptor.Version SERVER_VERSION_6_3_0_0 = ModuleDescriptor.Version.parse("6.3.0.0");

Expand Down Expand Up @@ -37,4 +40,9 @@ public static boolean isBatchWriteSupported(IAerospikeClient client) {
return ModuleDescriptor.Version.parse(ServerVersionUtils.getServerVersion(client))
.compareTo(SERVER_VERSION_6_0_0_0) >= 0;
}

public static boolean isSIndexCardinalitySupported(IAerospikeClient client) {
return ModuleDescriptor.Version.parse(ServerVersionUtils.getServerVersion(client))
.compareTo(SERVER_VERSION_6_1_0_0) >= 0;
}
}

0 comments on commit 070b2a9

Please sign in to comment.