Skip to content

Commit

Permalink
FMWK-497 Replace usages of Info.request() with client.info() (#760)
Browse files Browse the repository at this point in the history
Co-authored-by: yrizhkov <[email protected]>
  • Loading branch information
agrgr and reugn authored Jul 23, 2024
1 parent b015df5 commit eb05ff4
Show file tree
Hide file tree
Showing 21 changed files with 227 additions and 125 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
- name: Set up Maven
uses: stCarolas/setup-maven@v5
with:
maven-version: 3.9.6
maven-version: 3.9.8

# See: https://github.com/actions/cache/blob/main/examples.md#java---maven
- name: Maven cache and restore deps
Expand Down
68 changes: 36 additions & 32 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@
<spring-cloud-starter-bootstrap>4.1.2</spring-cloud-starter-bootstrap>
<maven.javadoc.plugin>3.3.0</maven.javadoc.plugin>
<maven.gpg.plugin>1.6</maven.gpg.plugin>
<aerospike-client>7.2.1</aerospike-client>
<aerospike-reactor-client>7.1.0</aerospike-reactor-client>
<reactor-test>3.6.1</reactor-test>
<aerospike-client-jdk8>8.1.2</aerospike-client-jdk8>
<aerospike-reactor-client>8.1.2</aerospike-reactor-client>
<aerospike-proxy-client>8.1.2</aerospike-proxy-client>
<reactor-test>3.6.8</reactor-test>
<embedded-aerospike>3.1.6</embedded-aerospike>
<jodatime>2.12.7</jodatime>
<lombok>1.18.32</lombok>
Expand Down Expand Up @@ -194,14 +195,19 @@
</dependency>
<dependency>
<groupId>com.aerospike</groupId>
<artifactId>aerospike-client</artifactId>
<version>${aerospike-client}</version>
<artifactId>aerospike-client-jdk8</artifactId>
<version>${aerospike-client-jdk8}</version>
</dependency>
<dependency>
<groupId>com.aerospike</groupId>
<artifactId>aerospike-reactor-client</artifactId>
<version>${aerospike-reactor-client}</version>
</dependency>
<dependency>
<groupId>com.aerospike</groupId>
<artifactId>aerospike-proxy-client</artifactId>
<version>${aerospike-proxy-client}</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
Expand Down Expand Up @@ -248,12 +254,15 @@
</dependency>
<dependency>
<groupId>com.aerospike</groupId>
<artifactId>aerospike-client</artifactId>
<artifactId>aerospike-client-jdk8</artifactId>
</dependency>
<dependency>
<groupId>com.aerospike</groupId>
<artifactId>aerospike-reactor-client</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.aerospike</groupId>
<artifactId>aerospike-proxy-client</artifactId>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
Expand All @@ -273,6 +282,26 @@
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-kqueue</artifactId>
<version>${netty.version}</version>
</dependency>
<!--TEST dependencies-->
<dependency>
<groupId>io.projectreactor</groupId>
Expand Down Expand Up @@ -302,7 +331,6 @@
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>${awaitility}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
Expand All @@ -322,30 +350,6 @@
<version>${hibernate.validator}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport</artifactId>
<version>${netty.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
<version>${netty.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
<version>${netty.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-kqueue</artifactId>
<version>${netty.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ public interface ReactivePersonRepository extends ReactiveAerospikeRepository<Pe

Right now this interface simply serves typing purposes but we will add additional methods to it later.

For Java configuration, use the `@EnableReactiveAerospikeRepositories` annotation. The annotation carries the base packages attribute. These base packages are to be scanned for interfaces extending `ReactiveAerospikeRepository` and create Spring beans for each of them found. If no base package is configured, the infrastructure scans the package of the annotated configuration class.
For Java configuration, use the `@EnableReactiveAerospikeRepositories` annotation. The annotation carries the base
packages attribute. These base packages are to be scanned for interfaces extending `ReactiveAerospikeRepository`
and create Spring beans for each of them found. If no base package is configured, the infrastructure scans the package
of the annotated configuration class.

The following listing shows how to use Java configuration for a repository:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package org.springframework.data.aerospike.config;

import com.aerospike.client.IAerospikeClient;
import com.aerospike.client.async.EventLoops;
import com.aerospike.client.policy.ClientPolicy;
import com.aerospike.client.reactor.AerospikeReactorClient;
import com.aerospike.client.reactor.IAerospikeReactorClient;
Expand Down Expand Up @@ -99,13 +98,10 @@ public IAerospikeReactorClient aerospikeReactorClient(IAerospikeClient aerospike
return new AerospikeReactorClient(aerospikeClient);
}

@Bean
protected abstract EventLoops eventLoops();

@Override
protected ClientPolicy getClientPolicy() {
ClientPolicy clientPolicy = super.getClientPolicy(); // applying default values first
clientPolicy.eventLoops = eventLoops();
// set particular clientPolicy fields if needed
return clientPolicy;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,16 @@
import com.aerospike.client.AerospikeClient;
import com.aerospike.client.Host;
import com.aerospike.client.IAerospikeClient;
import com.aerospike.client.async.EventLoops;
import com.aerospike.client.async.EventPolicy;
import com.aerospike.client.async.NettyEventLoops;
import com.aerospike.client.policy.ClientPolicy;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.kqueue.KQueue;
import io.netty.channel.kqueue.KQueueEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.boot.context.properties.ConfigurationProperties;
Expand Down Expand Up @@ -52,6 +61,7 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Set;

/**
Expand Down Expand Up @@ -132,6 +142,25 @@ public FilterExpressionsBuilder filterExpressionsBuilder() {
return new FilterExpressionsBuilder();
}

@Bean
public EventLoops eventLoops() {
int nThreads = Math.max(2, Runtime.getRuntime().availableProcessors() * 2);
String os = System.getProperty("os.name").toLowerCase(Locale.ENGLISH);

EventLoopGroup eventLoopGroup;
if (os.contains("nux") && Epoll.isAvailable()) {
eventLoopGroup = new EpollEventLoopGroup(nThreads);
} else if (os.contains("mac") && KQueue.isAvailable()) {
eventLoopGroup = new KQueueEventLoopGroup(nThreads);
} else {
eventLoopGroup = new NioEventLoopGroup(nThreads);
}

EventPolicy eventPolicy = new EventPolicy();
eventPolicy.maxCommandsInProcess = 40;
eventPolicy.maxCommandsInQueue = 1024;
return new NettyEventLoops(eventPolicy, eventLoopGroup);
}

@Bean(name = "aerospikeIndexResolver")
public AerospikeIndexResolver aerospikeIndexResolver() {
Expand Down Expand Up @@ -233,6 +262,7 @@ protected ClientPolicy getClientPolicy() {
clientPolicy.batchWritePolicyDefault.sendKey = sendKey;
clientPolicy.queryPolicyDefault.sendKey = sendKey;
clientPolicy.scanPolicyDefault.sendKey = sendKey;
clientPolicy.eventLoops = eventLoops();
return clientPolicy;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.springframework.data.aerospike.query.qualifier.Qualifier;
import org.springframework.data.aerospike.repository.query.Query;
import org.springframework.data.aerospike.server.version.ServerVersionSupport;
import org.springframework.data.aerospike.util.InfoCommandUtils;
import org.springframework.data.aerospike.util.Utils;
import org.springframework.data.domain.Sort;
import org.springframework.data.util.StreamUtils;
Expand Down Expand Up @@ -1148,11 +1149,10 @@ public long count(String setName) {

try {
Node[] nodes = client.getNodes();

int replicationFactor = Utils.getReplicationFactor(nodes, namespace);
int replicationFactor = Utils.getReplicationFactor(client, nodes, namespace);

long totalObjects = Arrays.stream(nodes)
.mapToLong(node -> Utils.getObjectsCount(node, namespace, setName))
.mapToLong(node -> Utils.getObjectsCount(client, node, namespace, setName))
.sum();

return (nodes.length > 1) ? (totalObjects / replicationFactor) : totalObjects;
Expand Down Expand Up @@ -1308,7 +1308,8 @@ public boolean indexExists(String indexName) {
try {
Node[] nodes = client.getNodes();
for (Node node : nodes) {
String response = Info.request(node, "sindex-exists:ns=" + namespace + ";indexname=" + indexName);
String response = InfoCommandUtils.request(client, node, "sindex-exists:ns=" + namespace +
";indexname=" + indexName);
if (response == null) throw new AerospikeException("Null node response");

if (response.equalsIgnoreCase("true")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,15 @@
*/
package org.springframework.data.aerospike.core;

import com.aerospike.client.AerospikeException;
import com.aerospike.client.BatchRecord;
import com.aerospike.client.BatchResults;
import com.aerospike.client.Bin;
import com.aerospike.client.Key;
import com.aerospike.client.Operation;
import com.aerospike.client.Record;
import com.aerospike.client.*;
import com.aerospike.client.ResultCode;
import com.aerospike.client.Value;
import com.aerospike.client.cdt.CTX;
import com.aerospike.client.cluster.Node;
import com.aerospike.client.policy.BatchPolicy;
Expand All @@ -41,6 +48,7 @@
import org.springframework.data.aerospike.query.qualifier.Qualifier;
import org.springframework.data.aerospike.repository.query.Query;
import org.springframework.data.aerospike.server.version.ServerVersionSupport;
import org.springframework.data.aerospike.util.InfoCommandUtils;
import org.springframework.data.aerospike.util.Utils;
import org.springframework.data.domain.Sort;
import org.springframework.data.keyvalue.core.IterableConverter;
Expand Down Expand Up @@ -1082,10 +1090,10 @@ public Mono<Long> count(String setName) {
private long countSet(String setName) {
Node[] nodes = reactorClient.getAerospikeClient().getNodes();

int replicationFactor = Utils.getReplicationFactor(nodes, namespace);
int replicationFactor = Utils.getReplicationFactor(reactorClient.getAerospikeClient(), nodes, namespace);

long totalObjects = Arrays.stream(nodes)
.mapToLong(node -> Utils.getObjectsCount(node, namespace, setName))
.mapToLong(node -> Utils.getObjectsCount(reactorClient.getAerospikeClient(), node, namespace, setName))
.sum();

return (nodes.length > 1) ? (totalObjects / replicationFactor) : totalObjects;
Expand Down Expand Up @@ -1191,8 +1199,8 @@ public Mono<Boolean> indexExists(String indexName) {
try {
Node[] nodes = reactorClient.getAerospikeClient().getNodes();
for (Node node : nodes) {
String response = Info.request(reactorClient.getAerospikeClient().getInfoPolicyDefault(),
node, "sindex-exists:ns=" + namespace + ";indexname=" + indexName);
String response = InfoCommandUtils.request(reactorClient.getAerospikeClient(), node,
"sindex-exists:ns=" + namespace + ";indexname=" + indexName);
if (response == null) throw new AerospikeException("Null node response");

if (response.equalsIgnoreCase("true")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
package org.springframework.data.aerospike.query.cache;

import com.aerospike.client.IAerospikeClient;
import com.aerospike.client.Info;
import com.aerospike.client.cluster.Node;
import com.aerospike.client.policy.InfoPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.aerospike.query.model.IndexesInfo;
import org.springframework.data.aerospike.server.version.ServerVersionSupport;
import org.springframework.data.aerospike.util.InfoCommandUtils;

import java.util.Arrays;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -65,7 +65,7 @@ public void refreshIndexes() {
.filter(Node::isActive)
.findAny() // we do want to send info request to the random node (sending request to the first node may
// lead to uneven request distribution)
.map(node -> Info.request(infoPolicy, node, indexOperations.buildGetIndexesCommand()))
.map(node -> InfoCommandUtils.request(client, infoPolicy, node, indexOperations.buildGetIndexesCommand()))
.map(response -> {
IndexesInfo indexesInfo = indexOperations.parseIndexesInfo(response);
indexOperations.enrichIndexesWithCardinality(client, indexesInfo.indexes, serverVersionSupport);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@
package org.springframework.data.aerospike.query.cache;

import com.aerospike.client.IAerospikeClient;
import com.aerospike.client.Info;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.aerospike.query.model.Index;
import org.springframework.data.aerospike.query.model.IndexKey;
import org.springframework.data.aerospike.query.model.IndexesInfo;
import org.springframework.data.aerospike.server.version.ServerVersionSupport;
import org.springframework.data.aerospike.util.InfoCommandUtils;

import java.util.Arrays;
import java.util.Collections;
Expand All @@ -41,7 +41,6 @@ public class InternalIndexOperations {

// Base64 will return index context as a base64 response
private static final String SINDEX_WITH_BASE64 = "sindex-list:;b64=true";

private final IndexInfoParser indexInfoParser;

public InternalIndexOperations(IndexInfoParser indexInfoParser) {
Expand Down Expand Up @@ -81,7 +80,7 @@ public int getIndexBinValuesRatio(IAerospikeClient client, ServerVersionSupport
String namespace, String indexName) {
if (serverVersionSupport.isSIndexCardinalitySupported()) {
try {
String indexStatData = Info.request(client.getInfoPolicyDefault(), client.getCluster().getRandomNode(),
String indexStatData = InfoCommandUtils.request(client, client.getCluster().getRandomNode(),
String.format("sindex-stat:ns=%s;indexname=%s", namespace, indexName));

return Integer.parseInt(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package org.springframework.data.aerospike.server.version;

import com.aerospike.client.IAerospikeClient;
import com.aerospike.client.Info;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.aerospike.util.InfoCommandUtils;

import java.lang.module.ModuleDescriptor;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -40,9 +40,10 @@ public void scheduleServerVersionRefresh(long intervalSeconds) {
}

private String findServerVersion() {
String versionString = Info.request(client.getInfoPolicyDefault(),
client.getCluster().getRandomNode(), "version");
versionString = versionString.substring(versionString.lastIndexOf(' ') + 1);
String fullVersionString = InfoCommandUtils.request(client, client.getCluster().getRandomNode(),
"version");

String versionString = fullVersionString.substring(fullVersionString.lastIndexOf(' ') + 1);
log.debug("Found server version {}", versionString);
return versionString;
}
Expand Down
Loading

0 comments on commit eb05ff4

Please sign in to comment.