diff --git a/proxy/pom.xml b/proxy/pom.xml
index 319cb25cc..be91cffc8 100644
--- a/proxy/pom.xml
+++ b/proxy/pom.xml
@@ -22,7 +22,7 @@
com.aerospike
aerospike-proxy-stub
- 1.0.2-SNAPSHOT
+ 1.1.0
diff --git a/proxy/src/com/aerospike/client/proxy/CommandProxy.java b/proxy/src/com/aerospike/client/proxy/CommandProxy.java
index a722dca80..684b4802f 100644
--- a/proxy/src/com/aerospike/client/proxy/CommandProxy.java
+++ b/proxy/src/com/aerospike/client/proxy/CommandProxy.java
@@ -27,6 +27,7 @@
import com.aerospike.client.proxy.grpc.GrpcConversions;
import com.aerospike.client.proxy.grpc.GrpcStreamingCall;
import com.aerospike.client.util.Util;
+import com.aerospike.proxy.client.InfoGrpc;
import com.aerospike.proxy.client.Kvs;
import com.google.protobuf.ByteString;
@@ -57,7 +58,7 @@ public CommandProxy(
this.numExpectedResponses = numExpectedResponses;
}
- final void execute() {
+ void execute() {
if (policy.totalTimeout > 0) {
deadlineNanos = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(policy.totalTimeout);
sendTimeoutMillis = (policy.socketTimeout > 0 && policy.socketTimeout < policy.totalTimeout)?
diff --git a/proxy/src/com/aerospike/client/proxy/InfoCommandProxy.java b/proxy/src/com/aerospike/client/proxy/InfoCommandProxy.java
index a5e352d4f..9daad38fe 100644
--- a/proxy/src/com/aerospike/client/proxy/InfoCommandProxy.java
+++ b/proxy/src/com/aerospike/client/proxy/InfoCommandProxy.java
@@ -1,35 +1,41 @@
package com.aerospike.client.proxy;
import com.aerospike.client.AerospikeException;
-import com.aerospike.client.Info;
+import com.aerospike.client.Log;
import com.aerospike.client.ResultCode;
-import com.aerospike.client.cluster.Node;
import com.aerospike.client.command.Command;
import com.aerospike.client.listener.InfoListener;
import com.aerospike.client.policy.InfoPolicy;
import com.aerospike.client.policy.Policy;
-import com.aerospike.client.policy.ScanPolicy;
import com.aerospike.client.proxy.grpc.GrpcCallExecutor;
import com.aerospike.client.proxy.grpc.GrpcConversions;
+import com.aerospike.client.proxy.grpc.GrpcStreamingCall;
+import com.aerospike.proxy.client.AboutGrpc;
import com.aerospike.proxy.client.Kvs;
import com.aerospike.proxy.client.InfoGrpc;
+import io.grpc.*;
+import io.grpc.stub.StreamObserver;
-import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
public class InfoCommandProxy extends SingleCommandProxy {
private final InfoListener listener;
private final String[] commands;
- private Map map;
-
private final InfoPolicy infoPolicy;
+ private final GrpcCallExecutor executor;
+ private final MethodDescriptor methodDescriptor;
+ final Policy policy;
public InfoCommandProxy(GrpcCallExecutor executor, InfoListener listener, InfoPolicy policy, String... commands) {
super(InfoGrpc.getInfoMethod(), executor, createPolicy(policy));
+ this.executor = executor;
this.infoPolicy = policy;
this.listener = listener;
this.commands = commands;
+ this.methodDescriptor = InfoGrpc.getInfoMethod();
+ this.policy = createPolicy(policy);
}
private static Policy createPolicy(InfoPolicy policy) {
@@ -64,9 +70,68 @@ void writeCommand(Command command) {
// Nothing to do since there is no Aerospike payload.
}
+ @Override
+ void execute(){
+ executeCommand();
+ }
+
+ private void executeCommand() {
+ Kvs.AerospikeRequestPayload.Builder builder = getRequestBuilder();
+
+ ManagedChannel channel = executor.getChannel();
+ InfoGrpc.InfoBlockingStub stub = InfoGrpc.newBlockingStub(channel);
+ try{
+ Kvs.AerospikeRequestPayload request = builder.build();
+ Kvs.AerospikeResponsePayload response = stub.info(request);
+ inDoubt |= response.getInDoubt();
+ onResponse(response);
+ }catch (Throwable t) {
+ inDoubt = true;
+ onFailure(t);
+ } finally {
+ // Shut down the channel
+ channel.shutdown();
+ }
+ }
+
+
@Override
void onFailure(AerospikeException ae) {
- listener.onFailure(ae);
+
+ }
+
+ @Override
+ void onFailure(Throwable t) {
+ AerospikeException ae;
+
+ try {
+ if (t instanceof AerospikeException) {
+ ae = (AerospikeException)t;
+ ae.setPolicy(policy);
+ }
+ else if (t instanceof StatusRuntimeException) {
+ StatusRuntimeException sre = (StatusRuntimeException)t;
+ Status.Code code = sre.getStatus().getCode();
+
+ if (code == Status.Code.UNAVAILABLE) {
+ if (retry()) {
+ return;
+ }
+ }
+ ae = GrpcConversions.toAerospike(sre, policy, 1);
+ }
+ else {
+ ae = new AerospikeException(ResultCode.CLIENT_ERROR, t);
+ }
+ }
+ catch (AerospikeException ae2) {
+ ae = ae2;
+ }
+ catch (Throwable t2) {
+ ae = new AerospikeException(ResultCode.CLIENT_ERROR, t2);
+ }
+
+ notifyFailure(ae);
}
@Override