Skip to content

Commit

Permalink
set default eventLoops in configuration, replace usages of Info.reque…
Browse files Browse the repository at this point in the history
…st() with client.info()
  • Loading branch information
agrgr committed Jul 18, 2024
1 parent b015df5 commit b98630e
Show file tree
Hide file tree
Showing 18 changed files with 260 additions and 111 deletions.
66 changes: 35 additions & 31 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@
<spring-cloud-starter-bootstrap>4.1.2</spring-cloud-starter-bootstrap>
<maven.javadoc.plugin>3.3.0</maven.javadoc.plugin>
<maven.gpg.plugin>1.6</maven.gpg.plugin>
<aerospike-client>7.2.1</aerospike-client>
<aerospike-reactor-client>7.1.0</aerospike-reactor-client>
<aerospike-client-jdk8>8.1.2</aerospike-client-jdk8>
<aerospike-reactor-client>8.1.2</aerospike-reactor-client>
<aerospike-proxy-client>8.1.2</aerospike-proxy-client>
<reactor-test>3.6.1</reactor-test>
<embedded-aerospike>3.1.6</embedded-aerospike>
<jodatime>2.12.7</jodatime>
Expand Down Expand Up @@ -194,14 +195,19 @@
</dependency>
<dependency>
<groupId>com.aerospike</groupId>
<artifactId>aerospike-client</artifactId>
<version>${aerospike-client}</version>
<artifactId>aerospike-client-jdk8</artifactId>
<version>${aerospike-client-jdk8}</version>
</dependency>
<dependency>
<groupId>com.aerospike</groupId>
<artifactId>aerospike-reactor-client</artifactId>
<version>${aerospike-reactor-client}</version>
</dependency>
<dependency>
<groupId>com.aerospike</groupId>
<artifactId>aerospike-proxy-client</artifactId>
<version>${aerospike-proxy-client}</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
Expand Down Expand Up @@ -248,12 +254,15 @@
</dependency>
<dependency>
<groupId>com.aerospike</groupId>
<artifactId>aerospike-client</artifactId>
<artifactId>aerospike-client-jdk8</artifactId>
</dependency>
<dependency>
<groupId>com.aerospike</groupId>
<artifactId>aerospike-reactor-client</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.aerospike</groupId>
<artifactId>aerospike-proxy-client</artifactId>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
Expand All @@ -273,6 +282,26 @@
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-kqueue</artifactId>
<version>${netty.version}</version>
</dependency>
<!--TEST dependencies-->
<dependency>
<groupId>io.projectreactor</groupId>
Expand Down Expand Up @@ -302,7 +331,6 @@
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>${awaitility}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
Expand All @@ -322,30 +350,6 @@
<version>${hibernate.validator}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport</artifactId>
<version>${netty.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
<version>${netty.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
<version>${netty.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-kqueue</artifactId>
<version>${netty.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ public interface ReactivePersonRepository extends ReactiveAerospikeRepository<Pe

Right now this interface simply serves typing purposes but we will add additional methods to it later.

For Java configuration, use the `@EnableReactiveAerospikeRepositories` annotation. The annotation carries the base packages attribute. These base packages are to be scanned for interfaces extending `ReactiveAerospikeRepository` and create Spring beans for each of them found. If no base package is configured, the infrastructure scans the package of the annotated configuration class.
For Java configuration, use the `@EnableReactiveAerospikeRepositories` annotation. The annotation carries the base
packages attribute. These base packages are to be scanned for interfaces extending `ReactiveAerospikeRepository`
and create Spring beans for each of them found. If no base package is configured, the infrastructure scans the package
of the annotated configuration class.

The following listing shows how to use Java configuration for a repository:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package org.springframework.data.aerospike.config;

import com.aerospike.client.IAerospikeClient;
import com.aerospike.client.async.EventLoops;
import com.aerospike.client.policy.ClientPolicy;
import com.aerospike.client.reactor.AerospikeReactorClient;
import com.aerospike.client.reactor.IAerospikeReactorClient;
Expand Down Expand Up @@ -99,13 +98,10 @@ public IAerospikeReactorClient aerospikeReactorClient(IAerospikeClient aerospike
return new AerospikeReactorClient(aerospikeClient);
}

@Bean
protected abstract EventLoops eventLoops();

@Override
protected ClientPolicy getClientPolicy() {
ClientPolicy clientPolicy = super.getClientPolicy(); // applying default values first
clientPolicy.eventLoops = eventLoops();
// set particular clientPolicy fields if needed
return clientPolicy;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,16 @@
import com.aerospike.client.AerospikeClient;
import com.aerospike.client.Host;
import com.aerospike.client.IAerospikeClient;
import com.aerospike.client.async.EventLoops;
import com.aerospike.client.async.EventPolicy;
import com.aerospike.client.async.NettyEventLoops;
import com.aerospike.client.policy.ClientPolicy;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.kqueue.KQueue;
import io.netty.channel.kqueue.KQueueEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.boot.context.properties.ConfigurationProperties;
Expand Down Expand Up @@ -52,6 +61,7 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Set;

/**
Expand Down Expand Up @@ -132,6 +142,25 @@ public FilterExpressionsBuilder filterExpressionsBuilder() {
return new FilterExpressionsBuilder();
}

@Bean
protected EventLoops eventLoops() {
int nThreads = Math.max(2, Runtime.getRuntime().availableProcessors() * 2);
String os = System.getProperty("os.name").toLowerCase(Locale.ENGLISH);

EventLoopGroup eventLoopGroup;
if (os.contains("nux") && Epoll.isAvailable()) {
eventLoopGroup = new EpollEventLoopGroup(nThreads);
} else if (os.contains("mac") && KQueue.isAvailable()) {
eventLoopGroup = new KQueueEventLoopGroup(nThreads);
} else {
eventLoopGroup = new NioEventLoopGroup(nThreads);
}

EventPolicy eventPolicy = new EventPolicy();
eventPolicy.maxCommandsInProcess = 40;
eventPolicy.maxCommandsInQueue = 1024;
return new NettyEventLoops(eventPolicy, eventLoopGroup);
}

@Bean(name = "aerospikeIndexResolver")
public AerospikeIndexResolver aerospikeIndexResolver() {
Expand Down Expand Up @@ -233,6 +262,7 @@ protected ClientPolicy getClientPolicy() {
clientPolicy.batchWritePolicyDefault.sendKey = sendKey;
clientPolicy.queryPolicyDefault.sendKey = sendKey;
clientPolicy.scanPolicyDefault.sendKey = sendKey;
clientPolicy.eventLoops = eventLoops();
return clientPolicy;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import static org.springframework.data.aerospike.core.TemplateUtils.getIdValue;
import static org.springframework.data.aerospike.query.QualifierUtils.getIdQualifier;
import static org.springframework.data.aerospike.query.QualifierUtils.queryCriteriaIsNotNull;
import static org.springframework.data.aerospike.util.InfoCommandUtils.sendInfoCommand;

/**
* Primary implementation of {@link AerospikeOperations}.
Expand Down Expand Up @@ -1148,11 +1149,10 @@ public long count(String setName) {

try {
Node[] nodes = client.getNodes();

int replicationFactor = Utils.getReplicationFactor(nodes, namespace);
int replicationFactor = Utils.getReplicationFactor(client, nodes, namespace);

long totalObjects = Arrays.stream(nodes)
.mapToLong(node -> Utils.getObjectsCount(node, namespace, setName))
.mapToLong(node -> Utils.getObjectsCount(client, node, namespace, setName))
.sum();

return (nodes.length > 1) ? (totalObjects / replicationFactor) : totalObjects;
Expand Down Expand Up @@ -1308,7 +1308,8 @@ public boolean indexExists(String indexName) {
try {
Node[] nodes = client.getNodes();
for (Node node : nodes) {
String response = Info.request(node, "sindex-exists:ns=" + namespace + ";indexname=" + indexName);
String response = sendInfoCommand(client, node,
"sindex-exists:ns=" + namespace + ";indexname=" + indexName);
if (response == null) throw new AerospikeException("Null node response");

if (response.equalsIgnoreCase("true")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,15 @@
*/
package org.springframework.data.aerospike.core;

import com.aerospike.client.AerospikeException;
import com.aerospike.client.BatchRecord;
import com.aerospike.client.BatchResults;
import com.aerospike.client.Bin;
import com.aerospike.client.Key;
import com.aerospike.client.Operation;
import com.aerospike.client.Record;
import com.aerospike.client.*;
import com.aerospike.client.ResultCode;
import com.aerospike.client.Value;
import com.aerospike.client.cdt.CTX;
import com.aerospike.client.cluster.Node;
import com.aerospike.client.policy.BatchPolicy;
Expand Down Expand Up @@ -77,6 +84,7 @@
import static org.springframework.data.aerospike.core.TemplateUtils.getIdValue;
import static org.springframework.data.aerospike.query.QualifierUtils.getIdQualifier;
import static org.springframework.data.aerospike.query.QualifierUtils.queryCriteriaIsNotNull;
import static org.springframework.data.aerospike.util.InfoCommandUtils.sendInfoCommand;

/**
* Primary implementation of {@link ReactiveAerospikeOperations}.
Expand Down Expand Up @@ -1082,10 +1090,10 @@ public Mono<Long> count(String setName) {
private long countSet(String setName) {
Node[] nodes = reactorClient.getAerospikeClient().getNodes();

int replicationFactor = Utils.getReplicationFactor(nodes, namespace);
int replicationFactor = Utils.getReplicationFactor(reactorClient.getAerospikeClient(), nodes, namespace);

long totalObjects = Arrays.stream(nodes)
.mapToLong(node -> Utils.getObjectsCount(node, namespace, setName))
.mapToLong(node -> Utils.getObjectsCount(reactorClient.getAerospikeClient(), node, namespace, setName))
.sum();

return (nodes.length > 1) ? (totalObjects / replicationFactor) : totalObjects;
Expand Down Expand Up @@ -1191,8 +1199,8 @@ public Mono<Boolean> indexExists(String indexName) {
try {
Node[] nodes = reactorClient.getAerospikeClient().getNodes();
for (Node node : nodes) {
String response = Info.request(reactorClient.getAerospikeClient().getInfoPolicyDefault(),
node, "sindex-exists:ns=" + namespace + ";indexname=" + indexName);
String response = sendInfoCommand(reactorClient.getAerospikeClient(), node,
"sindex-exists:ns=" + namespace + ";indexname=" + indexName);
if (response == null) throw new AerospikeException("Null node response");

if (response.equalsIgnoreCase("true")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package org.springframework.data.aerospike.query.cache;

import com.aerospike.client.IAerospikeClient;
import com.aerospike.client.Info;
import com.aerospike.client.cluster.Node;
import com.aerospike.client.policy.InfoPolicy;
import org.slf4j.Logger;
Expand All @@ -29,6 +28,8 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import static org.springframework.data.aerospike.util.InfoCommandUtils.sendInfoCommand;

/**
* @author Anastasiia Smirnova
*/
Expand Down Expand Up @@ -65,7 +66,7 @@ public void refreshIndexes() {
.filter(Node::isActive)
.findAny() // we do want to send info request to the random node (sending request to the first node may
// lead to uneven request distribution)
.map(node -> Info.request(infoPolicy, node, indexOperations.buildGetIndexesCommand()))
.map(node -> sendInfoCommand(client, infoPolicy, node, indexOperations.buildGetIndexesCommand()))
.map(response -> {
IndexesInfo indexesInfo = indexOperations.parseIndexesInfo(response);
indexOperations.enrichIndexesWithCardinality(client, indexesInfo.indexes, serverVersionSupport);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package org.springframework.data.aerospike.query.cache;

import com.aerospike.client.IAerospikeClient;
import com.aerospike.client.Info;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.aerospike.query.model.Index;
import org.springframework.data.aerospike.query.model.IndexKey;
Expand All @@ -30,6 +29,7 @@

import static java.util.stream.Collectors.collectingAndThen;
import static java.util.stream.Collectors.toMap;
import static org.springframework.data.aerospike.util.InfoCommandUtils.sendInfoCommand;

/**
* Internal index related operations used by ReactorIndexRefresher and IndexRefresher.
Expand All @@ -41,7 +41,6 @@ public class InternalIndexOperations {

// Base64 will return index context as a base64 response
private static final String SINDEX_WITH_BASE64 = "sindex-list:;b64=true";

private final IndexInfoParser indexInfoParser;

public InternalIndexOperations(IndexInfoParser indexInfoParser) {
Expand Down Expand Up @@ -81,7 +80,7 @@ public int getIndexBinValuesRatio(IAerospikeClient client, ServerVersionSupport
String namespace, String indexName) {
if (serverVersionSupport.isSIndexCardinalitySupported()) {
try {
String indexStatData = Info.request(client.getInfoPolicyDefault(), client.getCluster().getRandomNode(),
String indexStatData = sendInfoCommand(client, client.getCluster().getRandomNode(),
String.format("sindex-stat:ns=%s;indexname=%s", namespace, indexName));

return Integer.parseInt(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.springframework.data.aerospike.server.version;

import com.aerospike.client.IAerospikeClient;
import com.aerospike.client.Info;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -10,6 +9,8 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import static org.springframework.data.aerospike.util.InfoCommandUtils.sendInfoCommand;

@Slf4j
public class ServerVersionSupport {

Expand Down Expand Up @@ -40,9 +41,10 @@ public void scheduleServerVersionRefresh(long intervalSeconds) {
}

private String findServerVersion() {
String versionString = Info.request(client.getInfoPolicyDefault(),
client.getCluster().getRandomNode(), "version");
versionString = versionString.substring(versionString.lastIndexOf(' ') + 1);
String fullVersionString = sendInfoCommand(client, client.getCluster().getRandomNode(),
"version");

String versionString = fullVersionString.substring(fullVersionString.lastIndexOf(' ') + 1);
log.debug("Found server version {}", versionString);
return versionString;
}
Expand Down
Loading

0 comments on commit b98630e

Please sign in to comment.