From 3750e4d5890c2f757855c69ba7107f8f5fb9b4ca Mon Sep 17 00:00:00 2001 From: roimenashe Date: Tue, 14 Nov 2023 11:49:21 +0200 Subject: [PATCH 1/4] Support choosing a qualifier based on index cardinality --- .../aerospike/query/StatementBuilder.java | 80 ++++++++++++++++--- .../aerospike/query/cache/IndexRefresher.java | 6 +- .../aerospike/query/cache/IndexesCache.java | 19 ++++- .../query/cache/IndexesCacheHolder.java | 18 +++++ .../query/cache/InternalIndexOperations.java | 40 ++++++++++ .../query/cache/ReactorIndexRefresher.java | 1 + .../data/aerospike/query/model/Index.java | 24 +++--- .../aerospike/utility/ServerVersionUtils.java | 8 ++ 8 files changed, 170 insertions(+), 26 deletions(-) rename src/{test => main}/java/org/springframework/data/aerospike/utility/ServerVersionUtils.java (81%) diff --git a/src/main/java/org/springframework/data/aerospike/query/StatementBuilder.java b/src/main/java/org/springframework/data/aerospike/query/StatementBuilder.java index d6dca3447..c92f8cb81 100644 --- a/src/main/java/org/springframework/data/aerospike/query/StatementBuilder.java +++ b/src/main/java/org/springframework/data/aerospike/query/StatementBuilder.java @@ -20,11 +20,16 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.data.aerospike.query.cache.IndexesCache; +import org.springframework.data.aerospike.query.model.Index; import org.springframework.data.aerospike.query.model.IndexedField; import org.springframework.data.aerospike.repository.query.Query; import org.springframework.lang.Nullable; import org.springframework.util.StringUtils; +import java.util.Comparator; +import java.util.List; +import java.util.Optional; + import static org.springframework.data.aerospike.query.QualifierUtils.queryCriteriaIsNotNull; /** @@ -59,13 +64,44 @@ public Statement build(String namespace, String set, @Nullable Query query, Stri } private void setStatementFilterFromQualifiers(Statement stmt, Qualifier qualifier) { - /* - * query with qualifier - */ - if (qualifier == null) return; + // No qualifier, no need to set statement filter + if (qualifier == null) { + return; + } + + // Multiple qualifiers + // No sense to use secondary index in case of OR as it requires to enlarge selection to more than 1 field if (qualifier.getOperation() == FilterOperation.AND) { - // no sense to use secondary index in case of OR - // as it requires to enlarge selection to more than 1 field + setFilterFromMultipleQualifiers(stmt, qualifier); + } else if (isIndexedBin(stmt, qualifier)) { // Single qualifier + setFilterFromSingleQualifier(stmt, qualifier); + } + } + + private void setFilterFromMultipleQualifiers(Statement stmt, Qualifier qualifier) { + int minBinValuesRatio = Integer.MAX_VALUE; + Qualifier minBinValuesRatioQualifier = null; + + for (Qualifier innerQualifier : qualifier.getQualifiers()) { + if (innerQualifier != null && isIndexedBin(stmt, innerQualifier)) { + int currBinValuesRatio = getMinBinValuesRatioForQualifier(stmt, innerQualifier); + // Compare the cardinality of each qualifier and select the qualifier that has the index with + // the lowest bin values ratio + if (currBinValuesRatio < minBinValuesRatio && currBinValuesRatio != 0) { + minBinValuesRatio = currBinValuesRatio; + minBinValuesRatioQualifier = innerQualifier; + } + } + } + + // If index with min bin values ratio found, set filter with the matching qualifier + if (minBinValuesRatioQualifier != null) { + Filter filter = minBinValuesRatioQualifier.setQueryAsFilter(); + if (filter != null) { + stmt.setFilter(filter); + minBinValuesRatioQualifier.setQueryAsFilter(true); + } + } else { // No index with bin values ratio found, do not consider cardinality when setting a filter for (Qualifier innerQualifier : qualifier.getQualifiers()) { if (innerQualifier != null && isIndexedBin(stmt, innerQualifier)) { Filter filter = innerQualifier.setQueryAsFilter(); @@ -76,13 +112,15 @@ private void setStatementFilterFromQualifiers(Statement stmt, Qualifier qualifie } } } - } else if (isIndexedBin(stmt, qualifier)) { - Filter filter = qualifier.setQueryAsFilter(); - if (filter != null) { - stmt.setFilter(filter); - // the filter from the first processed qualifier becomes statement's sIndex filter - qualifier.setQueryAsFilter(true); - } + } + } + + private void setFilterFromSingleQualifier(Statement stmt, Qualifier qualifier) { + Filter filter = qualifier.setQueryAsFilter(); + if (filter != null) { + stmt.setFilter(filter); + // the filter from the first processed qualifier becomes statement's sIndex filter + qualifier.setQueryAsFilter(true); } } @@ -101,4 +139,20 @@ private boolean isIndexedBin(Statement stmt, Qualifier qualifier) { } return hasIndex; } + + private int getMinBinValuesRatioForQualifier(Statement stmt, Qualifier qualifier) { + // Get all indexes that uses this field + List indexList = indexesCache.getAllIndexesForField( + new IndexedField(stmt.getNamespace(), stmt.getSetName(), qualifier.getField())); + + // Return the lowest bin values ratio of the indexes in indexList + Optional minBinValuesRatio = indexList.stream() + .filter(index -> index.getBinValuesRatio() != 0) + .min(Comparator.comparing(Index::getBinValuesRatio)); + + if (minBinValuesRatio.isPresent()) { + return minBinValuesRatio.get().getBinValuesRatio(); + } + return 0; + } } diff --git a/src/main/java/org/springframework/data/aerospike/query/cache/IndexRefresher.java b/src/main/java/org/springframework/data/aerospike/query/cache/IndexRefresher.java index 8f3b33224..48285cf26 100644 --- a/src/main/java/org/springframework/data/aerospike/query/cache/IndexRefresher.java +++ b/src/main/java/org/springframework/data/aerospike/query/cache/IndexRefresher.java @@ -62,7 +62,11 @@ public void refreshIndexes() { .findAny() // we do want to send info request to the random node (sending request to the first node may // lead to uneven request distribution) .map(node -> Info.request(infoPolicy, node, indexOperations.buildGetIndexesCommand())) - .map(indexOperations::parseIndexesInfo) + .map(response -> { + IndexesInfo indexesInfo = indexOperations.parseIndexesInfo(response); + indexOperations.enrichIndexesWithCardinality(client, indexesInfo.indexes); + return indexesInfo; + }) .orElse(IndexesInfo.empty()); log.debug("Loaded indexes: {}", cache.indexes); this.indexesCacheUpdater.update(cache); diff --git a/src/main/java/org/springframework/data/aerospike/query/cache/IndexesCache.java b/src/main/java/org/springframework/data/aerospike/query/cache/IndexesCache.java index 70618d576..257506ac0 100644 --- a/src/main/java/org/springframework/data/aerospike/query/cache/IndexesCache.java +++ b/src/main/java/org/springframework/data/aerospike/query/cache/IndexesCache.java @@ -19,19 +19,32 @@ import org.springframework.data.aerospike.query.model.IndexKey; import org.springframework.data.aerospike.query.model.IndexedField; +import java.util.List; import java.util.Optional; public interface IndexesCache { /** - * @param indexKey to search by + * Get index from the indexes cache by providing an index key. + * + * @param indexKey Index key to search by. * @return Optional {@link Index} */ Optional getIndex(IndexKey indexKey); /** - * @param indexedField to search by - * @return true if there is an index for the given indexed field + * Get all indexes for a given indexed field object. + * + * @param indexedField Indexed field to search by. + * @return List of indexes matching the indexed field properties. + */ + List getAllIndexesForField(IndexedField indexedField); + + /** + * Boolean indication of whether an index exists for the given indexed field + * + * @param indexedField Indexed field to search by + * @return True if there is an index for the given indexed field. */ boolean hasIndexFor(IndexedField indexedField); } diff --git a/src/main/java/org/springframework/data/aerospike/query/cache/IndexesCacheHolder.java b/src/main/java/org/springframework/data/aerospike/query/cache/IndexesCacheHolder.java index ba4846ccb..244f58fb4 100644 --- a/src/main/java/org/springframework/data/aerospike/query/cache/IndexesCacheHolder.java +++ b/src/main/java/org/springframework/data/aerospike/query/cache/IndexesCacheHolder.java @@ -20,6 +20,9 @@ import org.springframework.data.aerospike.query.model.IndexedField; import org.springframework.data.aerospike.query.model.IndexesInfo; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; import java.util.Optional; /** @@ -34,6 +37,21 @@ public Optional getIndex(IndexKey indexKey) { return Optional.ofNullable(cache.indexes.get(indexKey)); } + @Override + public List getAllIndexesForField(IndexedField indexedField) { + List indexList = new ArrayList<>(); + + cache.indexes.forEach((key, value) -> { + if (Objects.equals(key.getNamespace(), indexedField.getNamespace()) && + Objects.equals(key.getSet(), indexedField.getSet()) && + Objects.equals(key.getField(), indexedField.getField()) + ) { + indexList.add(value); + } + }); + return indexList; + } + @Override public boolean hasIndexFor(IndexedField indexedField) { return cache.indexedFields.contains(indexedField); diff --git a/src/main/java/org/springframework/data/aerospike/query/cache/InternalIndexOperations.java b/src/main/java/org/springframework/data/aerospike/query/cache/InternalIndexOperations.java index 657fdd88c..6fee3cb5c 100644 --- a/src/main/java/org/springframework/data/aerospike/query/cache/InternalIndexOperations.java +++ b/src/main/java/org/springframework/data/aerospike/query/cache/InternalIndexOperations.java @@ -15,12 +15,19 @@ */ package org.springframework.data.aerospike.query.cache; +import com.aerospike.client.IAerospikeClient; +import com.aerospike.client.Info; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.data.aerospike.query.model.Index; import org.springframework.data.aerospike.query.model.IndexKey; import org.springframework.data.aerospike.query.model.IndexesInfo; +import org.springframework.data.aerospike.utility.ServerVersionUtils; import java.util.Arrays; import java.util.Collections; +import java.util.Map; +import java.util.stream.Collectors; import static java.util.stream.Collectors.collectingAndThen; import static java.util.stream.Collectors.toMap; @@ -37,6 +44,8 @@ public class InternalIndexOperations { private final IndexInfoParser indexInfoParser; + private final Logger log = LoggerFactory.getLogger(InternalIndexOperations.class); + public InternalIndexOperations(IndexInfoParser indexInfoParser) { this.indexInfoParser = indexInfoParser; } @@ -60,4 +69,35 @@ public IndexesInfo parseIndexesInfo(String infoResponse) { public String buildGetIndexesCommand() { return SINDEX_WITH_BASE64; } + + public void enrichIndexesWithCardinality(IAerospikeClient client, Map indexes) { + log.debug("Enriching secondary indexes with cardinality"); + // TODO: can improve by fetching index stats with 1 request instead of per index + indexes.values().forEach( + index -> index.setBinValuesRatio(getIndexBinValuesRatio(client, index.getNamespace(), index.getName())) + ); + } + + public Integer getIndexBinValuesRatio(IAerospikeClient client, String namespace, String indexName) { + if (ServerVersionUtils.isSIndexCardinalitySupported(client)) { + + try { + String indexStatData = Info.request(null, client.getCluster().getRandomNode(), + String.format("sindex-stat:ns=%s;indexname=%s", namespace, indexName)); + + return Integer.valueOf( + Arrays.stream(indexStatData.split(";")) + .map(String::trim) + .toList().stream() + .map(stat -> Arrays.stream(stat.split("=")) + .map(String::trim) + .collect(Collectors.toList())) + .collect(Collectors.toMap(t -> t.get(0), t -> t.get(1))) + .get("entries_per_bval")); + } catch (Exception e) { + log.warn("Failed to fetch secondary index %s cardinality".formatted(indexName), e); + } + } + return 0; + } } diff --git a/src/main/java/org/springframework/data/aerospike/query/cache/ReactorIndexRefresher.java b/src/main/java/org/springframework/data/aerospike/query/cache/ReactorIndexRefresher.java index 92096f645..77a5581b6 100644 --- a/src/main/java/org/springframework/data/aerospike/query/cache/ReactorIndexRefresher.java +++ b/src/main/java/org/springframework/data/aerospike/query/cache/ReactorIndexRefresher.java @@ -47,6 +47,7 @@ public Mono refreshIndexes() { .doOnSubscribe(subscription -> log.trace("Loading indexes")) .doOnNext(indexInfo -> { IndexesInfo cache = indexOperations.parseIndexesInfo(indexInfo); + indexOperations.enrichIndexesWithCardinality(client.getAerospikeClient(), cache.indexes); this.indexesCacheUpdater.update(cache); log.debug("Loaded indexes: {}", cache.indexes); }).then(); diff --git a/src/main/java/org/springframework/data/aerospike/query/model/Index.java b/src/main/java/org/springframework/data/aerospike/query/model/Index.java index a6b92f346..4dc49de2b 100644 --- a/src/main/java/org/springframework/data/aerospike/query/model/Index.java +++ b/src/main/java/org/springframework/data/aerospike/query/model/Index.java @@ -19,10 +19,12 @@ import com.aerospike.client.cdt.CTX; import com.aerospike.client.query.IndexCollectionType; import com.aerospike.client.query.IndexType; +import lombok.AllArgsConstructor; import lombok.Builder; +import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.RequiredArgsConstructor; -import lombok.Value; +import lombok.Setter; /** * This class represents a Secondary Index created in the cluster. @@ -30,19 +32,22 @@ * @author Peter Milne * @author Anastasiia Smirnova */ -@Value @Builder @RequiredArgsConstructor +@AllArgsConstructor +@EqualsAndHashCode @Getter public class Index { - String name; - String namespace; - String set; - String bin; - IndexType indexType; - IndexCollectionType indexCollectionType; - CTX[] ctx; + private final String name; + private final String namespace; + private final String set; + private final String bin; + private final IndexType indexType; + private final IndexCollectionType indexCollectionType; + private final CTX[] ctx; + @Setter + private Integer binValuesRatio; public Index(String name, String namespace, String set, String bin, IndexType indexType, IndexCollectionType indexCollectionType) { @@ -53,5 +58,6 @@ public Index(String name, String namespace, String set, String bin, IndexType in this.indexType = indexType; this.indexCollectionType = indexCollectionType; this.ctx = null; + this.binValuesRatio = 0; } } diff --git a/src/test/java/org/springframework/data/aerospike/utility/ServerVersionUtils.java b/src/main/java/org/springframework/data/aerospike/utility/ServerVersionUtils.java similarity index 81% rename from src/test/java/org/springframework/data/aerospike/utility/ServerVersionUtils.java rename to src/main/java/org/springframework/data/aerospike/utility/ServerVersionUtils.java index e5ceac184..03ef72dfb 100644 --- a/src/test/java/org/springframework/data/aerospike/utility/ServerVersionUtils.java +++ b/src/main/java/org/springframework/data/aerospike/utility/ServerVersionUtils.java @@ -2,12 +2,15 @@ import com.aerospike.client.IAerospikeClient; import com.aerospike.client.Info; +import lombok.experimental.UtilityClass; import java.lang.module.ModuleDescriptor; +@UtilityClass public class ServerVersionUtils { private static final ModuleDescriptor.Version SERVER_VERSION_6_0_0_0 = ModuleDescriptor.Version.parse("6.0.0.0"); + private static final ModuleDescriptor.Version SERVER_VERSION_6_1_0_0 = ModuleDescriptor.Version.parse("6.1.0.0"); private static final ModuleDescriptor.Version SERVER_VERSION_6_1_0_1 = ModuleDescriptor.Version.parse("6.1.0.1"); private static final ModuleDescriptor.Version SERVER_VERSION_6_3_0_0 = ModuleDescriptor.Version.parse("6.3.0.0"); @@ -37,4 +40,9 @@ public static boolean isBatchWriteSupported(IAerospikeClient client) { return ModuleDescriptor.Version.parse(ServerVersionUtils.getServerVersion(client)) .compareTo(SERVER_VERSION_6_0_0_0) >= 0; } + + public static boolean isSIndexCardinalitySupported(IAerospikeClient client) { + return ModuleDescriptor.Version.parse(ServerVersionUtils.getServerVersion(client)) + .compareTo(SERVER_VERSION_6_1_0_0) >= 0; + } } From acc3c09bf0db72dd510c233a743dc2fbed06642b Mon Sep 17 00:00:00 2001 From: roimenashe Date: Tue, 14 Nov 2023 16:46:01 +0200 Subject: [PATCH 2/4] 1. Remove wrong TODO comment, sindex-stat is only supported per index 2. Reuse methods, avoid code duplication --- .../data/aerospike/query/StatementBuilder.java | 7 +------ .../aerospike/query/cache/InternalIndexOperations.java | 1 - 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/src/main/java/org/springframework/data/aerospike/query/StatementBuilder.java b/src/main/java/org/springframework/data/aerospike/query/StatementBuilder.java index c92f8cb81..3e3628fde 100644 --- a/src/main/java/org/springframework/data/aerospike/query/StatementBuilder.java +++ b/src/main/java/org/springframework/data/aerospike/query/StatementBuilder.java @@ -96,11 +96,7 @@ private void setFilterFromMultipleQualifiers(Statement stmt, Qualifier qualifier // If index with min bin values ratio found, set filter with the matching qualifier if (minBinValuesRatioQualifier != null) { - Filter filter = minBinValuesRatioQualifier.setQueryAsFilter(); - if (filter != null) { - stmt.setFilter(filter); - minBinValuesRatioQualifier.setQueryAsFilter(true); - } + setFilterFromSingleQualifier(stmt, minBinValuesRatioQualifier); } else { // No index with bin values ratio found, do not consider cardinality when setting a filter for (Qualifier innerQualifier : qualifier.getQualifiers()) { if (innerQualifier != null && isIndexedBin(stmt, innerQualifier)) { @@ -119,7 +115,6 @@ private void setFilterFromSingleQualifier(Statement stmt, Qualifier qualifier) { Filter filter = qualifier.setQueryAsFilter(); if (filter != null) { stmt.setFilter(filter); - // the filter from the first processed qualifier becomes statement's sIndex filter qualifier.setQueryAsFilter(true); } } diff --git a/src/main/java/org/springframework/data/aerospike/query/cache/InternalIndexOperations.java b/src/main/java/org/springframework/data/aerospike/query/cache/InternalIndexOperations.java index 6fee3cb5c..bac814baf 100644 --- a/src/main/java/org/springframework/data/aerospike/query/cache/InternalIndexOperations.java +++ b/src/main/java/org/springframework/data/aerospike/query/cache/InternalIndexOperations.java @@ -72,7 +72,6 @@ public String buildGetIndexesCommand() { public void enrichIndexesWithCardinality(IAerospikeClient client, Map indexes) { log.debug("Enriching secondary indexes with cardinality"); - // TODO: can improve by fetching index stats with 1 request instead of per index indexes.values().forEach( index -> index.setBinValuesRatio(getIndexBinValuesRatio(client, index.getNamespace(), index.getName())) ); From f97b47a1541606f478d38463e521fc8735860e67 Mon Sep 17 00:00:00 2001 From: roimenashe Date: Tue, 14 Nov 2023 16:58:57 +0200 Subject: [PATCH 3/4] Polish: use primitives where possible, @Slf4j rather than Logger, simplify conditions --- .../data/aerospike/query/StatementBuilder.java | 7 ++----- .../query/cache/InternalIndexOperations.java | 12 +++++------- .../data/aerospike/query/model/Index.java | 2 +- 3 files changed, 8 insertions(+), 13 deletions(-) diff --git a/src/main/java/org/springframework/data/aerospike/query/StatementBuilder.java b/src/main/java/org/springframework/data/aerospike/query/StatementBuilder.java index 3e3628fde..9df3181bf 100644 --- a/src/main/java/org/springframework/data/aerospike/query/StatementBuilder.java +++ b/src/main/java/org/springframework/data/aerospike/query/StatementBuilder.java @@ -87,7 +87,7 @@ private void setFilterFromMultipleQualifiers(Statement stmt, Qualifier qualifier int currBinValuesRatio = getMinBinValuesRatioForQualifier(stmt, innerQualifier); // Compare the cardinality of each qualifier and select the qualifier that has the index with // the lowest bin values ratio - if (currBinValuesRatio < minBinValuesRatio && currBinValuesRatio != 0) { + if (currBinValuesRatio != 0 && currBinValuesRatio < minBinValuesRatio) { minBinValuesRatio = currBinValuesRatio; minBinValuesRatioQualifier = innerQualifier; } @@ -145,9 +145,6 @@ private int getMinBinValuesRatioForQualifier(Statement stmt, Qualifier qualifier .filter(index -> index.getBinValuesRatio() != 0) .min(Comparator.comparing(Index::getBinValuesRatio)); - if (minBinValuesRatio.isPresent()) { - return minBinValuesRatio.get().getBinValuesRatio(); - } - return 0; + return minBinValuesRatio.map(Index::getBinValuesRatio).orElse(0); } } diff --git a/src/main/java/org/springframework/data/aerospike/query/cache/InternalIndexOperations.java b/src/main/java/org/springframework/data/aerospike/query/cache/InternalIndexOperations.java index bac814baf..65ca2b1cb 100644 --- a/src/main/java/org/springframework/data/aerospike/query/cache/InternalIndexOperations.java +++ b/src/main/java/org/springframework/data/aerospike/query/cache/InternalIndexOperations.java @@ -17,8 +17,7 @@ import com.aerospike.client.IAerospikeClient; import com.aerospike.client.Info; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import lombok.extern.slf4j.Slf4j; import org.springframework.data.aerospike.query.model.Index; import org.springframework.data.aerospike.query.model.IndexKey; import org.springframework.data.aerospike.query.model.IndexesInfo; @@ -37,6 +36,7 @@ * * @author Sergii Karpenko */ +@Slf4j public class InternalIndexOperations { // Base64 will return index context as a base64 response @@ -44,8 +44,6 @@ public class InternalIndexOperations { private final IndexInfoParser indexInfoParser; - private final Logger log = LoggerFactory.getLogger(InternalIndexOperations.class); - public InternalIndexOperations(IndexInfoParser indexInfoParser) { this.indexInfoParser = indexInfoParser; } @@ -77,14 +75,14 @@ public void enrichIndexesWithCardinality(IAerospikeClient client, Map t.get(0), t -> t.get(1))) .get("entries_per_bval")); } catch (Exception e) { - log.warn("Failed to fetch secondary index %s cardinality".formatted(indexName), e); + log.warn("Failed to fetch secondary index {} cardinality", indexName, e); } } return 0; diff --git a/src/main/java/org/springframework/data/aerospike/query/model/Index.java b/src/main/java/org/springframework/data/aerospike/query/model/Index.java index 4dc49de2b..7dc498743 100644 --- a/src/main/java/org/springframework/data/aerospike/query/model/Index.java +++ b/src/main/java/org/springframework/data/aerospike/query/model/Index.java @@ -47,7 +47,7 @@ public class Index { private final IndexCollectionType indexCollectionType; private final CTX[] ctx; @Setter - private Integer binValuesRatio; + private int binValuesRatio; public Index(String name, String namespace, String set, String bin, IndexType indexType, IndexCollectionType indexCollectionType) { From 7d8344015d9d5f165292a2705342ff89fa186188 Mon Sep 17 00:00:00 2001 From: roimenashe Date: Tue, 14 Nov 2023 17:10:14 +0200 Subject: [PATCH 4/4] Remove redundant initialization on a primitive field --- .../org/springframework/data/aerospike/query/model/Index.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/java/org/springframework/data/aerospike/query/model/Index.java b/src/main/java/org/springframework/data/aerospike/query/model/Index.java index 7dc498743..c4ee04d45 100644 --- a/src/main/java/org/springframework/data/aerospike/query/model/Index.java +++ b/src/main/java/org/springframework/data/aerospike/query/model/Index.java @@ -58,6 +58,5 @@ public Index(String name, String namespace, String set, String bin, IndexType in this.indexType = indexType; this.indexCollectionType = indexCollectionType; this.ctx = null; - this.binValuesRatio = 0; } }