From cda0148c5605787658aa6c33e0e94133b9a21c34 Mon Sep 17 00:00:00 2001 From: "sachin.vm" Date: Wed, 3 Jul 2024 10:11:16 -0700 Subject: [PATCH] changes from streaming to single call --- proxy/pom.xml | 2 +- .../aerospike/client/proxy/CommandProxy.java | 3 +- .../client/proxy/InfoCommandProxy.java | 79 +++++++++++++++++-- 3 files changed, 75 insertions(+), 9 deletions(-) diff --git a/proxy/pom.xml b/proxy/pom.xml index 319cb25cc..be91cffc8 100644 --- a/proxy/pom.xml +++ b/proxy/pom.xml @@ -22,7 +22,7 @@ com.aerospike aerospike-proxy-stub - 1.0.2-SNAPSHOT + 1.1.0 diff --git a/proxy/src/com/aerospike/client/proxy/CommandProxy.java b/proxy/src/com/aerospike/client/proxy/CommandProxy.java index a722dca80..684b4802f 100644 --- a/proxy/src/com/aerospike/client/proxy/CommandProxy.java +++ b/proxy/src/com/aerospike/client/proxy/CommandProxy.java @@ -27,6 +27,7 @@ import com.aerospike.client.proxy.grpc.GrpcConversions; import com.aerospike.client.proxy.grpc.GrpcStreamingCall; import com.aerospike.client.util.Util; +import com.aerospike.proxy.client.InfoGrpc; import com.aerospike.proxy.client.Kvs; import com.google.protobuf.ByteString; @@ -57,7 +58,7 @@ public CommandProxy( this.numExpectedResponses = numExpectedResponses; } - final void execute() { + void execute() { if (policy.totalTimeout > 0) { deadlineNanos = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(policy.totalTimeout); sendTimeoutMillis = (policy.socketTimeout > 0 && policy.socketTimeout < policy.totalTimeout)? diff --git a/proxy/src/com/aerospike/client/proxy/InfoCommandProxy.java b/proxy/src/com/aerospike/client/proxy/InfoCommandProxy.java index a5e352d4f..9daad38fe 100644 --- a/proxy/src/com/aerospike/client/proxy/InfoCommandProxy.java +++ b/proxy/src/com/aerospike/client/proxy/InfoCommandProxy.java @@ -1,35 +1,41 @@ package com.aerospike.client.proxy; import com.aerospike.client.AerospikeException; -import com.aerospike.client.Info; +import com.aerospike.client.Log; import com.aerospike.client.ResultCode; -import com.aerospike.client.cluster.Node; import com.aerospike.client.command.Command; import com.aerospike.client.listener.InfoListener; import com.aerospike.client.policy.InfoPolicy; import com.aerospike.client.policy.Policy; -import com.aerospike.client.policy.ScanPolicy; import com.aerospike.client.proxy.grpc.GrpcCallExecutor; import com.aerospike.client.proxy.grpc.GrpcConversions; +import com.aerospike.client.proxy.grpc.GrpcStreamingCall; +import com.aerospike.proxy.client.AboutGrpc; import com.aerospike.proxy.client.Kvs; import com.aerospike.proxy.client.InfoGrpc; +import io.grpc.*; +import io.grpc.stub.StreamObserver; -import java.util.HashMap; import java.util.Map; +import java.util.concurrent.TimeUnit; public class InfoCommandProxy extends SingleCommandProxy { private final InfoListener listener; private final String[] commands; - private Map map; - private final InfoPolicy infoPolicy; + private final GrpcCallExecutor executor; + private final MethodDescriptor methodDescriptor; + final Policy policy; public InfoCommandProxy(GrpcCallExecutor executor, InfoListener listener, InfoPolicy policy, String... commands) { super(InfoGrpc.getInfoMethod(), executor, createPolicy(policy)); + this.executor = executor; this.infoPolicy = policy; this.listener = listener; this.commands = commands; + this.methodDescriptor = InfoGrpc.getInfoMethod(); + this.policy = createPolicy(policy); } private static Policy createPolicy(InfoPolicy policy) { @@ -64,9 +70,68 @@ void writeCommand(Command command) { // Nothing to do since there is no Aerospike payload. } + @Override + void execute(){ + executeCommand(); + } + + private void executeCommand() { + Kvs.AerospikeRequestPayload.Builder builder = getRequestBuilder(); + + ManagedChannel channel = executor.getChannel(); + InfoGrpc.InfoBlockingStub stub = InfoGrpc.newBlockingStub(channel); + try{ + Kvs.AerospikeRequestPayload request = builder.build(); + Kvs.AerospikeResponsePayload response = stub.info(request); + inDoubt |= response.getInDoubt(); + onResponse(response); + }catch (Throwable t) { + inDoubt = true; + onFailure(t); + } finally { + // Shut down the channel + channel.shutdown(); + } + } + + @Override void onFailure(AerospikeException ae) { - listener.onFailure(ae); + + } + + @Override + void onFailure(Throwable t) { + AerospikeException ae; + + try { + if (t instanceof AerospikeException) { + ae = (AerospikeException)t; + ae.setPolicy(policy); + } + else if (t instanceof StatusRuntimeException) { + StatusRuntimeException sre = (StatusRuntimeException)t; + Status.Code code = sre.getStatus().getCode(); + + if (code == Status.Code.UNAVAILABLE) { + if (retry()) { + return; + } + } + ae = GrpcConversions.toAerospike(sre, policy, 1); + } + else { + ae = new AerospikeException(ResultCode.CLIENT_ERROR, t); + } + } + catch (AerospikeException ae2) { + ae = ae2; + } + catch (Throwable t2) { + ae = new AerospikeException(ResultCode.CLIENT_ERROR, t2); + } + + notifyFailure(ae); } @Override