Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FMWK-497 Replace usages of Info.request() with client.info() #760

Merged
merged 25 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
b98630e
set default eventLoops in configuration, replace usages of Info.reque…
agrgr Jul 18, 2024
73b8992
update embedded-aerospike
agrgr Jul 18, 2024
ae84624
use CompletableFuture
agrgr Jul 18, 2024
c332be3
use CompletableFuture
agrgr Jul 18, 2024
2a91446
catch exception
agrgr Jul 18, 2024
27ab48f
rename method for sending info commands
agrgr Jul 21, 2024
8e944ad
rename method for sending info commands
agrgr Jul 21, 2024
ee5edf6
add timeout
agrgr Jul 21, 2024
495a2ef
update Maven version
agrgr Jul 21, 2024
541cd74
temporarily comment tests
agrgr Jul 21, 2024
e8a0599
handle potentially unfulfilled CompletableFuture
reugn Jul 21, 2024
7327b60
update default policies in tests
reugn Jul 21, 2024
ce79f1c
comment out test assertions
reugn Jul 21, 2024
3af1c54
disable test
reugn Jul 21, 2024
20839ea
uncomment and enable tests, roll back playtika version upgrade
agrgr Jul 22, 2024
8556dd2
temporarily roll back Java clients versions
agrgr Jul 22, 2024
e21103e
temporarily roll back sending async info commands
agrgr Jul 22, 2024
3b7dd23
temporarily run only tests from "core" folder
agrgr Jul 22, 2024
542c714
temporarily reshuffle arguments in a test
agrgr Jul 22, 2024
8a4bf88
remove code added for debugging, upgrade reactor-test dependency
agrgr Jul 22, 2024
0516231
remove code added for debugging
agrgr Jul 22, 2024
a79a1f4
bring back configuration parameter
agrgr Jul 22, 2024
e83c77d
disable test
agrgr Jul 23, 2024
5b0b6f9
remove unused annotation
agrgr Jul 23, 2024
4f65f13
make EventLoops bean public
agrgr Jul 23, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 36 additions & 32 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@
<spring-cloud-starter-bootstrap>4.1.2</spring-cloud-starter-bootstrap>
<maven.javadoc.plugin>3.3.0</maven.javadoc.plugin>
<maven.gpg.plugin>1.6</maven.gpg.plugin>
<aerospike-client>7.2.1</aerospike-client>
<aerospike-reactor-client>7.1.0</aerospike-reactor-client>
<aerospike-client-jdk8>8.1.2</aerospike-client-jdk8>
<aerospike-reactor-client>8.1.2</aerospike-reactor-client>
<aerospike-proxy-client>8.1.2</aerospike-proxy-client>
<reactor-test>3.6.1</reactor-test>
<embedded-aerospike>3.1.6</embedded-aerospike>
<embedded-aerospike>3.1.7</embedded-aerospike>
<jodatime>2.12.7</jodatime>
<lombok>1.18.32</lombok>
<awaitility>4.2.1</awaitility>
Expand Down Expand Up @@ -194,14 +195,19 @@
</dependency>
<dependency>
<groupId>com.aerospike</groupId>
<artifactId>aerospike-client</artifactId>
<version>${aerospike-client}</version>
<artifactId>aerospike-client-jdk8</artifactId>
<version>${aerospike-client-jdk8}</version>
</dependency>
<dependency>
<groupId>com.aerospike</groupId>
<artifactId>aerospike-reactor-client</artifactId>
<version>${aerospike-reactor-client}</version>
</dependency>
<dependency>
<groupId>com.aerospike</groupId>
<artifactId>aerospike-proxy-client</artifactId>
<version>${aerospike-proxy-client}</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
Expand Down Expand Up @@ -248,12 +254,15 @@
</dependency>
<dependency>
<groupId>com.aerospike</groupId>
<artifactId>aerospike-client</artifactId>
<artifactId>aerospike-client-jdk8</artifactId>
</dependency>
<dependency>
<groupId>com.aerospike</groupId>
<artifactId>aerospike-reactor-client</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.aerospike</groupId>
<artifactId>aerospike-proxy-client</artifactId>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
Expand All @@ -273,6 +282,26 @@
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-kqueue</artifactId>
<version>${netty.version}</version>
</dependency>
<!--TEST dependencies-->
<dependency>
<groupId>io.projectreactor</groupId>
Expand Down Expand Up @@ -302,7 +331,6 @@
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>${awaitility}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
Expand All @@ -322,30 +350,6 @@
<version>${hibernate.validator}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport</artifactId>
<version>${netty.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
<version>${netty.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
<version>${netty.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-kqueue</artifactId>
<version>${netty.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ public interface ReactivePersonRepository extends ReactiveAerospikeRepository<Pe

Right now this interface simply serves typing purposes but we will add additional methods to it later.

For Java configuration, use the `@EnableReactiveAerospikeRepositories` annotation. The annotation carries the base packages attribute. These base packages are to be scanned for interfaces extending `ReactiveAerospikeRepository` and create Spring beans for each of them found. If no base package is configured, the infrastructure scans the package of the annotated configuration class.
For Java configuration, use the `@EnableReactiveAerospikeRepositories` annotation. The annotation carries the base
packages attribute. These base packages are to be scanned for interfaces extending `ReactiveAerospikeRepository`
and create Spring beans for each of them found. If no base package is configured, the infrastructure scans the package
of the annotated configuration class.

The following listing shows how to use Java configuration for a repository:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package org.springframework.data.aerospike.config;

import com.aerospike.client.IAerospikeClient;
import com.aerospike.client.async.EventLoops;
import com.aerospike.client.policy.ClientPolicy;
import com.aerospike.client.reactor.AerospikeReactorClient;
import com.aerospike.client.reactor.IAerospikeReactorClient;
Expand Down Expand Up @@ -99,13 +98,10 @@ public IAerospikeReactorClient aerospikeReactorClient(IAerospikeClient aerospike
return new AerospikeReactorClient(aerospikeClient);
}

@Bean
protected abstract EventLoops eventLoops();

@Override
protected ClientPolicy getClientPolicy() {
ClientPolicy clientPolicy = super.getClientPolicy(); // applying default values first
clientPolicy.eventLoops = eventLoops();
// set particular clientPolicy fields if needed
return clientPolicy;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,16 @@
import com.aerospike.client.AerospikeClient;
import com.aerospike.client.Host;
import com.aerospike.client.IAerospikeClient;
import com.aerospike.client.async.EventLoops;
import com.aerospike.client.async.EventPolicy;
import com.aerospike.client.async.NettyEventLoops;
import com.aerospike.client.policy.ClientPolicy;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.kqueue.KQueue;
import io.netty.channel.kqueue.KQueueEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.boot.context.properties.ConfigurationProperties;
Expand Down Expand Up @@ -52,6 +61,7 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Set;

/**
Expand Down Expand Up @@ -132,6 +142,25 @@ public FilterExpressionsBuilder filterExpressionsBuilder() {
return new FilterExpressionsBuilder();
}

@Bean
protected EventLoops eventLoops() {
int nThreads = Math.max(2, Runtime.getRuntime().availableProcessors() * 2);
String os = System.getProperty("os.name").toLowerCase(Locale.ENGLISH);

EventLoopGroup eventLoopGroup;
if (os.contains("nux") && Epoll.isAvailable()) {
eventLoopGroup = new EpollEventLoopGroup(nThreads);
} else if (os.contains("mac") && KQueue.isAvailable()) {
eventLoopGroup = new KQueueEventLoopGroup(nThreads);
} else {
eventLoopGroup = new NioEventLoopGroup(nThreads);
}

EventPolicy eventPolicy = new EventPolicy();
eventPolicy.maxCommandsInProcess = 40;
eventPolicy.maxCommandsInQueue = 1024;
return new NettyEventLoops(eventPolicy, eventLoopGroup);
}

@Bean(name = "aerospikeIndexResolver")
public AerospikeIndexResolver aerospikeIndexResolver() {
Expand Down Expand Up @@ -233,6 +262,7 @@ protected ClientPolicy getClientPolicy() {
clientPolicy.batchWritePolicyDefault.sendKey = sendKey;
clientPolicy.queryPolicyDefault.sendKey = sendKey;
clientPolicy.scanPolicyDefault.sendKey = sendKey;
clientPolicy.eventLoops = eventLoops();
return clientPolicy;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import static org.springframework.data.aerospike.core.TemplateUtils.getIdValue;
import static org.springframework.data.aerospike.query.QualifierUtils.getIdQualifier;
import static org.springframework.data.aerospike.query.QualifierUtils.queryCriteriaIsNotNull;
import static org.springframework.data.aerospike.util.InfoCommandUtils.sendInfoCommand;

/**
* Primary implementation of {@link AerospikeOperations}.
Expand Down Expand Up @@ -1148,11 +1149,10 @@ public long count(String setName) {

try {
Node[] nodes = client.getNodes();

int replicationFactor = Utils.getReplicationFactor(nodes, namespace);
int replicationFactor = Utils.getReplicationFactor(client, nodes, namespace);

long totalObjects = Arrays.stream(nodes)
.mapToLong(node -> Utils.getObjectsCount(node, namespace, setName))
.mapToLong(node -> Utils.getObjectsCount(client, node, namespace, setName))
.sum();

return (nodes.length > 1) ? (totalObjects / replicationFactor) : totalObjects;
Expand Down Expand Up @@ -1308,7 +1308,8 @@ public boolean indexExists(String indexName) {
try {
Node[] nodes = client.getNodes();
for (Node node : nodes) {
String response = Info.request(node, "sindex-exists:ns=" + namespace + ";indexname=" + indexName);
String response = sendInfoCommand(client, node,
"sindex-exists:ns=" + namespace + ";indexname=" + indexName);
if (response == null) throw new AerospikeException("Null node response");

if (response.equalsIgnoreCase("true")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,15 @@
*/
package org.springframework.data.aerospike.core;

import com.aerospike.client.AerospikeException;
import com.aerospike.client.BatchRecord;
import com.aerospike.client.BatchResults;
import com.aerospike.client.Bin;
import com.aerospike.client.Key;
import com.aerospike.client.Operation;
import com.aerospike.client.Record;
import com.aerospike.client.*;
import com.aerospike.client.ResultCode;
import com.aerospike.client.Value;
import com.aerospike.client.cdt.CTX;
import com.aerospike.client.cluster.Node;
import com.aerospike.client.policy.BatchPolicy;
Expand Down Expand Up @@ -77,6 +84,7 @@
import static org.springframework.data.aerospike.core.TemplateUtils.getIdValue;
import static org.springframework.data.aerospike.query.QualifierUtils.getIdQualifier;
import static org.springframework.data.aerospike.query.QualifierUtils.queryCriteriaIsNotNull;
import static org.springframework.data.aerospike.util.InfoCommandUtils.sendInfoCommand;

/**
* Primary implementation of {@link ReactiveAerospikeOperations}.
Expand Down Expand Up @@ -1082,10 +1090,10 @@ public Mono<Long> count(String setName) {
private long countSet(String setName) {
Node[] nodes = reactorClient.getAerospikeClient().getNodes();

int replicationFactor = Utils.getReplicationFactor(nodes, namespace);
int replicationFactor = Utils.getReplicationFactor(reactorClient.getAerospikeClient(), nodes, namespace);

long totalObjects = Arrays.stream(nodes)
.mapToLong(node -> Utils.getObjectsCount(node, namespace, setName))
.mapToLong(node -> Utils.getObjectsCount(reactorClient.getAerospikeClient(), node, namespace, setName))
.sum();

return (nodes.length > 1) ? (totalObjects / replicationFactor) : totalObjects;
Expand Down Expand Up @@ -1191,8 +1199,8 @@ public Mono<Boolean> indexExists(String indexName) {
try {
Node[] nodes = reactorClient.getAerospikeClient().getNodes();
for (Node node : nodes) {
String response = Info.request(reactorClient.getAerospikeClient().getInfoPolicyDefault(),
node, "sindex-exists:ns=" + namespace + ";indexname=" + indexName);
String response = sendInfoCommand(reactorClient.getAerospikeClient(), node,
"sindex-exists:ns=" + namespace + ";indexname=" + indexName);
if (response == null) throw new AerospikeException("Null node response");

if (response.equalsIgnoreCase("true")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package org.springframework.data.aerospike.query.cache;

import com.aerospike.client.IAerospikeClient;
import com.aerospike.client.Info;
import com.aerospike.client.cluster.Node;
import com.aerospike.client.policy.InfoPolicy;
import org.slf4j.Logger;
Expand All @@ -29,6 +28,8 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import static org.springframework.data.aerospike.util.InfoCommandUtils.sendInfoCommand;
reugn marked this conversation as resolved.
Show resolved Hide resolved

/**
* @author Anastasiia Smirnova
*/
Expand Down Expand Up @@ -65,7 +66,7 @@ public void refreshIndexes() {
.filter(Node::isActive)
.findAny() // we do want to send info request to the random node (sending request to the first node may
// lead to uneven request distribution)
.map(node -> Info.request(infoPolicy, node, indexOperations.buildGetIndexesCommand()))
.map(node -> sendInfoCommand(client, infoPolicy, node, indexOperations.buildGetIndexesCommand()))
.map(response -> {
IndexesInfo indexesInfo = indexOperations.parseIndexesInfo(response);
indexOperations.enrichIndexesWithCardinality(client, indexesInfo.indexes, serverVersionSupport);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package org.springframework.data.aerospike.query.cache;

import com.aerospike.client.IAerospikeClient;
import com.aerospike.client.Info;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.aerospike.query.model.Index;
import org.springframework.data.aerospike.query.model.IndexKey;
Expand All @@ -30,6 +29,7 @@

import static java.util.stream.Collectors.collectingAndThen;
import static java.util.stream.Collectors.toMap;
import static org.springframework.data.aerospike.util.InfoCommandUtils.sendInfoCommand;

/**
* Internal index related operations used by ReactorIndexRefresher and IndexRefresher.
Expand All @@ -41,7 +41,6 @@ public class InternalIndexOperations {

// Base64 will return index context as a base64 response
private static final String SINDEX_WITH_BASE64 = "sindex-list:;b64=true";

private final IndexInfoParser indexInfoParser;

public InternalIndexOperations(IndexInfoParser indexInfoParser) {
Expand Down Expand Up @@ -81,7 +80,7 @@ public int getIndexBinValuesRatio(IAerospikeClient client, ServerVersionSupport
String namespace, String indexName) {
if (serverVersionSupport.isSIndexCardinalitySupported()) {
try {
String indexStatData = Info.request(client.getInfoPolicyDefault(), client.getCluster().getRandomNode(),
String indexStatData = sendInfoCommand(client, client.getCluster().getRandomNode(),
String.format("sindex-stat:ns=%s;indexname=%s", namespace, indexName));

return Integer.parseInt(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.springframework.data.aerospike.server.version;

import com.aerospike.client.IAerospikeClient;
import com.aerospike.client.Info;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -10,6 +9,8 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import static org.springframework.data.aerospike.util.InfoCommandUtils.sendInfoCommand;

@Slf4j
public class ServerVersionSupport {

Expand Down Expand Up @@ -40,9 +41,10 @@ public void scheduleServerVersionRefresh(long intervalSeconds) {
}

private String findServerVersion() {
String versionString = Info.request(client.getInfoPolicyDefault(),
client.getCluster().getRandomNode(), "version");
versionString = versionString.substring(versionString.lastIndexOf(' ') + 1);
String fullVersionString = sendInfoCommand(client, client.getCluster().getRandomNode(),
"version");

String versionString = fullVersionString.substring(fullVersionString.lastIndexOf(' ') + 1);
log.debug("Found server version {}", versionString);
return versionString;
}
Expand Down
Loading
Loading