From 79fa7f8cb5771791f2bf19ff7c968cc198a5ff81 Mon Sep 17 00:00:00 2001 From: "sachin.vm" Date: Wed, 29 May 2024 10:13:26 -0700 Subject: [PATCH] [CLIENT-2776] Support info commands for proxy client --- proxy/pom.xml | 2 +- .../client/proxy/AerospikeClientProxy.java | 6 +- .../client/proxy/InfoCommandProxy.java | 86 +++++++++++++++++++ .../com/aerospike/client/proxy/Parser.java | 17 ++-- .../client/proxy/grpc/GrpcConversions.java | 12 +-- 5 files changed, 111 insertions(+), 12 deletions(-) create mode 100644 proxy/src/com/aerospike/client/proxy/InfoCommandProxy.java diff --git a/proxy/pom.xml b/proxy/pom.xml index 84c64ea2d..319cb25cc 100644 --- a/proxy/pom.xml +++ b/proxy/pom.xml @@ -22,7 +22,7 @@ com.aerospike aerospike-proxy-stub - 1.0.1 + 1.0.2-SNAPSHOT diff --git a/proxy/src/com/aerospike/client/proxy/AerospikeClientProxy.java b/proxy/src/com/aerospike/client/proxy/AerospikeClientProxy.java index 889e1c502..eaef8d072 100644 --- a/proxy/src/com/aerospike/client/proxy/AerospikeClientProxy.java +++ b/proxy/src/com/aerospike/client/proxy/AerospikeClientProxy.java @@ -2536,7 +2536,11 @@ public void dropIndex( */ @Override public void info(EventLoop eventLoop, InfoListener listener, InfoPolicy policy, Node node, String... commands) { - throw new AerospikeException(NotSupported + "info"); + if (policy == null) { + policy = infoPolicyDefault; + } + InfoCommandProxy infoCommand = new InfoCommandProxy(executor, listener, policy, commands); + infoCommand.execute(); } //----------------------------------------------------------------- diff --git a/proxy/src/com/aerospike/client/proxy/InfoCommandProxy.java b/proxy/src/com/aerospike/client/proxy/InfoCommandProxy.java new file mode 100644 index 000000000..a5e352d4f --- /dev/null +++ b/proxy/src/com/aerospike/client/proxy/InfoCommandProxy.java @@ -0,0 +1,86 @@ +package com.aerospike.client.proxy; + +import com.aerospike.client.AerospikeException; +import com.aerospike.client.Info; +import com.aerospike.client.ResultCode; +import com.aerospike.client.cluster.Node; +import com.aerospike.client.command.Command; +import com.aerospike.client.listener.InfoListener; +import com.aerospike.client.policy.InfoPolicy; +import com.aerospike.client.policy.Policy; +import com.aerospike.client.policy.ScanPolicy; +import com.aerospike.client.proxy.grpc.GrpcCallExecutor; +import com.aerospike.client.proxy.grpc.GrpcConversions; +import com.aerospike.proxy.client.Kvs; +import com.aerospike.proxy.client.InfoGrpc; + +import java.util.HashMap; +import java.util.Map; + +public class InfoCommandProxy extends SingleCommandProxy { + + private final InfoListener listener; + private final String[] commands; + private Map map; + + private final InfoPolicy infoPolicy; + + public InfoCommandProxy(GrpcCallExecutor executor, InfoListener listener, InfoPolicy policy, String... commands) { + super(InfoGrpc.getInfoMethod(), executor, createPolicy(policy)); + this.infoPolicy = policy; + this.listener = listener; + this.commands = commands; + } + + private static Policy createPolicy(InfoPolicy policy) { + Policy p = new Policy(); + + if (policy == null) { + p.setTimeout(1000); + } + else { + p.setTimeout(policy.timeout); + } + return p; + } + + @Override + Kvs.AerospikeRequestPayload.Builder getRequestBuilder() { + Kvs.AerospikeRequestPayload.Builder builder = Kvs.AerospikeRequestPayload.newBuilder(); + Kvs.InfoRequest.Builder infoRequestBuilder = Kvs.InfoRequest.newBuilder(); + + if(commands != null){ + for(String command: commands){ + infoRequestBuilder.addCommands(command); + } + } + infoRequestBuilder.setInfoPolicy(GrpcConversions.toGrpc(infoPolicy)); + builder.setInfoRequest(infoRequestBuilder.build()); + return builder; + } + + @Override + void writeCommand(Command command) { + // Nothing to do since there is no Aerospike payload. + } + + @Override + void onFailure(AerospikeException ae) { + listener.onFailure(ae); + } + + @Override + void parseResult(Parser parser) { + int resultCode = parser.parseResultCode(); + if (resultCode != ResultCode.OK) { + throw new AerospikeException(resultCode); + } + Map infoCommandResponse = parser.parseInfoResult(); + try { + listener.onSuccess(infoCommandResponse); + } + catch (Throwable t) { + logOnSuccessError(t); + } + } +} diff --git a/proxy/src/com/aerospike/client/proxy/Parser.java b/proxy/src/com/aerospike/client/proxy/Parser.java index 940c6f82d..dd3a47c59 100644 --- a/proxy/src/com/aerospike/client/proxy/Parser.java +++ b/proxy/src/com/aerospike/client/proxy/Parser.java @@ -16,18 +16,16 @@ */ package com.aerospike.client.proxy; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; import java.util.zip.DataFormatException; import java.util.zip.Inflater; +import com.aerospike.client.*; +import com.aerospike.client.Record; import org.luaj.vm2.LuaValue; -import com.aerospike.client.AerospikeException; -import com.aerospike.client.Key; -import com.aerospike.client.Record; -import com.aerospike.client.ResultCode; -import com.aerospike.client.Value; import com.aerospike.client.command.Buffer; import com.aerospike.client.command.Command; import com.aerospike.client.command.Command.OpResults; @@ -172,6 +170,15 @@ public Key parseKey(BVal bVal) { return new Key(namespace, digest, setName, userKey); } + public Map parseInfoResult(){ + HashMap responses; + Info info = new Info(buffer, receiveSize); + responses = info.parseMultiResponse(); + return responses; + } + + + public Record parseRecord(boolean isOperation) { Map bins = new LinkedHashMap<>(); diff --git a/proxy/src/com/aerospike/client/proxy/grpc/GrpcConversions.java b/proxy/src/com/aerospike/client/proxy/grpc/GrpcConversions.java index 33c7fa1c3..6f1c6e140 100644 --- a/proxy/src/com/aerospike/client/proxy/grpc/GrpcConversions.java +++ b/proxy/src/com/aerospike/client/proxy/grpc/GrpcConversions.java @@ -20,11 +20,7 @@ import com.aerospike.client.Operation; import com.aerospike.client.ResultCode; import com.aerospike.client.Value; -import com.aerospike.client.policy.Policy; -import com.aerospike.client.policy.QueryDuration; -import com.aerospike.client.policy.QueryPolicy; -import com.aerospike.client.policy.ScanPolicy; -import com.aerospike.client.policy.WritePolicy; +import com.aerospike.client.policy.*; import com.aerospike.client.query.Filter; import com.aerospike.client.query.PartitionFilter; import com.aerospike.client.query.PartitionStatus; @@ -139,6 +135,12 @@ public static Kvs.QueryPolicy toGrpc(QueryPolicy queryPolicy) { return queryPolicyBuilder.build(); } + public static Kvs.InfoPolicy toGrpc(InfoPolicy infoPolicy){ + Kvs.InfoPolicy.Builder infoPolicyBuilder = Kvs.InfoPolicy.newBuilder(); + infoPolicyBuilder.setTimeout(infoPolicy.timeout); + return infoPolicyBuilder.build(); + } + /** * Convert a value to packed bytes. *