Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FMWK-196 Index Cardinality #657

Merged
merged 4 commits into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,16 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.aerospike.query.cache.IndexesCache;
import org.springframework.data.aerospike.query.model.Index;
import org.springframework.data.aerospike.query.model.IndexedField;
import org.springframework.data.aerospike.repository.query.Query;
import org.springframework.lang.Nullable;
import org.springframework.util.StringUtils;

import java.util.Comparator;
import java.util.List;
import java.util.Optional;

import static org.springframework.data.aerospike.query.QualifierUtils.queryCriteriaIsNotNull;

/**
Expand Down Expand Up @@ -59,13 +64,40 @@ public Statement build(String namespace, String set, @Nullable Query query, Stri
}

private void setStatementFilterFromQualifiers(Statement stmt, Qualifier qualifier) {
/*
* query with qualifier
*/
if (qualifier == null) return;
// No qualifier, no need to set statement filter
if (qualifier == null) {
return;
}

// Multiple qualifiers
// No sense to use secondary index in case of OR as it requires to enlarge selection to more than 1 field
if (qualifier.getOperation() == FilterOperation.AND) {
// no sense to use secondary index in case of OR
// as it requires to enlarge selection to more than 1 field
setFilterFromMultipleQualifiers(stmt, qualifier);
} else if (isIndexedBin(stmt, qualifier)) { // Single qualifier
setFilterFromSingleQualifier(stmt, qualifier);
}
}

private void setFilterFromMultipleQualifiers(Statement stmt, Qualifier qualifier) {
int minBinValuesRatio = Integer.MAX_VALUE;
Qualifier minBinValuesRatioQualifier = null;

for (Qualifier innerQualifier : qualifier.getQualifiers()) {
if (innerQualifier != null && isIndexedBin(stmt, innerQualifier)) {
int currBinValuesRatio = getMinBinValuesRatioForQualifier(stmt, innerQualifier);
// Compare the cardinality of each qualifier and select the qualifier that has the index with
// the lowest bin values ratio
if (currBinValuesRatio != 0 && currBinValuesRatio < minBinValuesRatio) {
minBinValuesRatio = currBinValuesRatio;
minBinValuesRatioQualifier = innerQualifier;
}
}
}

// If index with min bin values ratio found, set filter with the matching qualifier
if (minBinValuesRatioQualifier != null) {
setFilterFromSingleQualifier(stmt, minBinValuesRatioQualifier);
} else { // No index with bin values ratio found, do not consider cardinality when setting a filter
for (Qualifier innerQualifier : qualifier.getQualifiers()) {
if (innerQualifier != null && isIndexedBin(stmt, innerQualifier)) {
Filter filter = innerQualifier.setQueryAsFilter();
Expand All @@ -76,13 +108,14 @@ private void setStatementFilterFromQualifiers(Statement stmt, Qualifier qualifie
}
}
}
} else if (isIndexedBin(stmt, qualifier)) {
Filter filter = qualifier.setQueryAsFilter();
if (filter != null) {
stmt.setFilter(filter);
// the filter from the first processed qualifier becomes statement's sIndex filter
qualifier.setQueryAsFilter(true);
}
}
}

private void setFilterFromSingleQualifier(Statement stmt, Qualifier qualifier) {
Filter filter = qualifier.setQueryAsFilter();
if (filter != null) {
stmt.setFilter(filter);
qualifier.setQueryAsFilter(true);
}
}

Expand All @@ -101,4 +134,17 @@ private boolean isIndexedBin(Statement stmt, Qualifier qualifier) {
}
return hasIndex;
}

private int getMinBinValuesRatioForQualifier(Statement stmt, Qualifier qualifier) {
// Get all indexes that uses this field
List<Index> indexList = indexesCache.getAllIndexesForField(
new IndexedField(stmt.getNamespace(), stmt.getSetName(), qualifier.getField()));

// Return the lowest bin values ratio of the indexes in indexList
Optional<Index> minBinValuesRatio = indexList.stream()
.filter(index -> index.getBinValuesRatio() != 0)
.min(Comparator.comparing(Index::getBinValuesRatio));

return minBinValuesRatio.map(Index::getBinValuesRatio).orElse(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,11 @@ public void refreshIndexes() {
.findAny() // we do want to send info request to the random node (sending request to the first node may
// lead to uneven request distribution)
.map(node -> Info.request(infoPolicy, node, indexOperations.buildGetIndexesCommand()))
.map(indexOperations::parseIndexesInfo)
.map(response -> {
IndexesInfo indexesInfo = indexOperations.parseIndexesInfo(response);
indexOperations.enrichIndexesWithCardinality(client, indexesInfo.indexes);
return indexesInfo;
})
.orElse(IndexesInfo.empty());
log.debug("Loaded indexes: {}", cache.indexes);
this.indexesCacheUpdater.update(cache);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,32 @@
import org.springframework.data.aerospike.query.model.IndexKey;
import org.springframework.data.aerospike.query.model.IndexedField;

import java.util.List;
import java.util.Optional;

public interface IndexesCache {

/**
* @param indexKey to search by
* Get index from the indexes cache by providing an index key.
*
* @param indexKey Index key to search by.
* @return Optional {@link Index}
*/
Optional<Index> getIndex(IndexKey indexKey);

/**
* @param indexedField to search by
* @return true if there is an index for the given indexed field
* Get all indexes for a given indexed field object.
*
* @param indexedField Indexed field to search by.
* @return List of indexes matching the indexed field properties.
*/
List<Index> getAllIndexesForField(IndexedField indexedField);

/**
* Boolean indication of whether an index exists for the given indexed field
*
* @param indexedField Indexed field to search by
* @return True if there is an index for the given indexed field.
*/
boolean hasIndexFor(IndexedField indexedField);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import org.springframework.data.aerospike.query.model.IndexedField;
import org.springframework.data.aerospike.query.model.IndexesInfo;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;

/**
Expand All @@ -34,6 +37,21 @@ public Optional<Index> getIndex(IndexKey indexKey) {
return Optional.ofNullable(cache.indexes.get(indexKey));
}

@Override
public List<Index> getAllIndexesForField(IndexedField indexedField) {
List<Index> indexList = new ArrayList<>();

cache.indexes.forEach((key, value) -> {
if (Objects.equals(key.getNamespace(), indexedField.getNamespace()) &&
Objects.equals(key.getSet(), indexedField.getSet()) &&
Objects.equals(key.getField(), indexedField.getField())
) {
indexList.add(value);
}
});
return indexList;
}

@Override
public boolean hasIndexFor(IndexedField indexedField) {
return cache.indexedFields.contains(indexedField);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,18 @@
*/
package org.springframework.data.aerospike.query.cache;

import com.aerospike.client.IAerospikeClient;
import com.aerospike.client.Info;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.aerospike.query.model.Index;
import org.springframework.data.aerospike.query.model.IndexKey;
import org.springframework.data.aerospike.query.model.IndexesInfo;
import org.springframework.data.aerospike.utility.ServerVersionUtils;

import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.stream.Collectors;

import static java.util.stream.Collectors.collectingAndThen;
import static java.util.stream.Collectors.toMap;
Expand All @@ -30,6 +36,7 @@
*
* @author Sergii Karpenko
*/
@Slf4j
public class InternalIndexOperations {

// Base64 will return index context as a base64 response
Expand Down Expand Up @@ -60,4 +67,34 @@ public IndexesInfo parseIndexesInfo(String infoResponse) {
public String buildGetIndexesCommand() {
return SINDEX_WITH_BASE64;
}

public void enrichIndexesWithCardinality(IAerospikeClient client, Map<IndexKey, Index> indexes) {
log.debug("Enriching secondary indexes with cardinality");
indexes.values().forEach(
index -> index.setBinValuesRatio(getIndexBinValuesRatio(client, index.getNamespace(), index.getName()))
);
}

public int getIndexBinValuesRatio(IAerospikeClient client, String namespace, String indexName) {
if (ServerVersionUtils.isSIndexCardinalitySupported(client)) {

try {
String indexStatData = Info.request(null, client.getCluster().getRandomNode(),
String.format("sindex-stat:ns=%s;indexname=%s", namespace, indexName));

return Integer.parseInt(
Arrays.stream(indexStatData.split(";"))
.map(String::trim)
.toList().stream()
.map(stat -> Arrays.stream(stat.split("="))
.map(String::trim)
.collect(Collectors.toList()))
.collect(Collectors.toMap(t -> t.get(0), t -> t.get(1)))
.get("entries_per_bval"));
} catch (Exception e) {
log.warn("Failed to fetch secondary index {} cardinality", indexName, e);
}
}
return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public Mono<Void> refreshIndexes() {
.doOnSubscribe(subscription -> log.trace("Loading indexes"))
.doOnNext(indexInfo -> {
IndexesInfo cache = indexOperations.parseIndexesInfo(indexInfo);
indexOperations.enrichIndexesWithCardinality(client.getAerospikeClient(), cache.indexes);
this.indexesCacheUpdater.update(cache);
log.debug("Loaded indexes: {}", cache.indexes);
}).then();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,30 +19,35 @@
import com.aerospike.client.cdt.CTX;
import com.aerospike.client.query.IndexCollectionType;
import com.aerospike.client.query.IndexType;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Value;
import lombok.Setter;

/**
* This class represents a Secondary Index created in the cluster.
*
* @author Peter Milne
* @author Anastasiia Smirnova
*/
@Value
@Builder
@RequiredArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode
@Getter
public class Index {

String name;
String namespace;
String set;
String bin;
IndexType indexType;
IndexCollectionType indexCollectionType;
CTX[] ctx;
private final String name;
private final String namespace;
private final String set;
private final String bin;
private final IndexType indexType;
private final IndexCollectionType indexCollectionType;
private final CTX[] ctx;
@Setter
private int binValuesRatio;

public Index(String name, String namespace, String set, String bin, IndexType indexType,
IndexCollectionType indexCollectionType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@

import com.aerospike.client.IAerospikeClient;
import com.aerospike.client.Info;
import lombok.experimental.UtilityClass;

import java.lang.module.ModuleDescriptor;

@UtilityClass
public class ServerVersionUtils {

private static final ModuleDescriptor.Version SERVER_VERSION_6_0_0_0 = ModuleDescriptor.Version.parse("6.0.0.0");
private static final ModuleDescriptor.Version SERVER_VERSION_6_1_0_0 = ModuleDescriptor.Version.parse("6.1.0.0");
private static final ModuleDescriptor.Version SERVER_VERSION_6_1_0_1 = ModuleDescriptor.Version.parse("6.1.0.1");
private static final ModuleDescriptor.Version SERVER_VERSION_6_3_0_0 = ModuleDescriptor.Version.parse("6.3.0.0");

Expand Down Expand Up @@ -37,4 +40,9 @@ public static boolean isBatchWriteSupported(IAerospikeClient client) {
return ModuleDescriptor.Version.parse(ServerVersionUtils.getServerVersion(client))
.compareTo(SERVER_VERSION_6_0_0_0) >= 0;
}

public static boolean isSIndexCardinalitySupported(IAerospikeClient client) {
return ModuleDescriptor.Version.parse(ServerVersionUtils.getServerVersion(client))
.compareTo(SERVER_VERSION_6_1_0_0) >= 0;
}
}
Loading