From de0207982a9b5757c828b11efb3199338247300d Mon Sep 17 00:00:00 2001 From: Shams Imam Date: Wed, 10 Jul 2019 15:59:44 -0500 Subject: [PATCH] initializes grpc client result Status to UNKNOWN instead of OK asserts on individual fields on the ping response --- containers/test-apps/courier/pom.xml | 2 +- .../twosigma/waiter/courier/GrpcClient.java | 36 ++++++++++--------- waiter/integration/waiter/grpc_test.clj | 6 ++-- waiter/project.clj | 2 +- 4 files changed, 25 insertions(+), 21 deletions(-) diff --git a/containers/test-apps/courier/pom.xml b/containers/test-apps/courier/pom.xml index 9aa459dee..49afff57d 100644 --- a/containers/test-apps/courier/pom.xml +++ b/containers/test-apps/courier/pom.xml @@ -6,7 +6,7 @@ twosigma courier - 1.4.1 + 1.4.3 courier https://github.com/twosigma/waiter/tree/master/test-apps/courier diff --git a/containers/test-apps/courier/src/main/java/com/twosigma/waiter/courier/GrpcClient.java b/containers/test-apps/courier/src/main/java/com/twosigma/waiter/courier/GrpcClient.java index e7959a804..4b29d7db5 100644 --- a/containers/test-apps/courier/src/main/java/com/twosigma/waiter/courier/GrpcClient.java +++ b/containers/test-apps/courier/src/main/java/com/twosigma/waiter/courier/GrpcClient.java @@ -177,10 +177,12 @@ public static RpcResult sendPackage(final String host, .setVariant(retrieveVariant(id)) .build(); - final AtomicReference status = new AtomicReference<>(Status.OK); + final AtomicReference status = new AtomicReference<>(); final AtomicReference response = new AtomicReference<>(); try { - response.set(futureStub.sendPackage(request).get()); + final CourierReply reply = futureStub.sendPackage(request).get(); + status.set(Status.OK); + response.set(reply); } catch (final StatusRuntimeException ex) { final Status errorStatus = ex.getStatus(); logFunction.apply("RPC failed, status: " + errorStatus); @@ -233,7 +235,7 @@ public static RpcResult> collectPackages(final String host, logFunction.apply("will try to send package from " + from + " ..."); - final AtomicReference status = new AtomicReference<>(Status.OK); + final AtomicReference status = new AtomicReference<>(); final AtomicReference> response = new AtomicReference<>(); final CompletableFuture> responsePromise = new CompletableFuture<>(); @@ -275,6 +277,7 @@ public void onError(final Throwable th) { @Override public void onCompleted() { logFunction.apply("completed collecting summaries"); + status.set(Status.OK); resolveResponsePromise(); } @@ -317,12 +320,12 @@ private void resolveResponsePromise() { collector.onCompleted(); response.set(responsePromise.get()); - } catch (final StatusRuntimeException e) { - logFunction.apply("RPC failed, status: " + e.getStatus()); - status.set(e.getStatus()); - } catch (final Exception e) { - logFunction.apply("RPC failed, message: " + e.getMessage()); - status.set(Status.UNKNOWN); + } catch (final StatusRuntimeException ex) { + logFunction.apply("RPC failed, status: " + ex.getStatus()); + status.set(ex.getStatus()); + } catch (final Exception ex) { + logFunction.apply("RPC failed, message: " + ex.getMessage()); + status.set(Status.UNKNOWN.withDescription(ex.getMessage())); } return new RpcResult<>(response.get(), status.get()); @@ -358,7 +361,7 @@ public static RpcResult aggregatePackages(final String host, logFunction.apply("will try to agggreate package from " + from + " ..."); - final AtomicReference status = new AtomicReference<>(Status.OK); + final AtomicReference status = new AtomicReference<>(); final AtomicReference response = new AtomicReference<>(); final CompletableFuture responsePromise = new CompletableFuture<>(); @@ -390,6 +393,7 @@ public void onError(final Throwable th) { @Override public void onCompleted() { logFunction.apply("completed aggregating summaries"); + status.set(Status.OK); resolveResponsePromise(); } @@ -429,12 +433,12 @@ private void resolveResponsePromise() { collector.onCompleted(); responsePromise.get(); - } catch (final StatusRuntimeException e) { - logFunction.apply("RPC failed, status: " + e.getStatus()); - status.set(e.getStatus()); - } catch (final Exception e) { - logFunction.apply("RPC failed, message: " + e.getMessage()); - status.set(Status.UNKNOWN); + } catch (final StatusRuntimeException ex) { + logFunction.apply("RPC failed, status: " + ex.getStatus()); + status.set(ex.getStatus()); + } catch (final Exception ex) { + logFunction.apply("RPC failed, message: " + ex.getMessage()); + status.set(Status.UNKNOWN.withDescription(ex.getMessage())); } return new RpcResult<>(response.get(), status.get()); diff --git a/waiter/integration/waiter/grpc_test.clj b/waiter/integration/waiter/grpc_test.clj index 5db2936cd..7f6d2bd13 100644 --- a/waiter/integration/waiter/grpc_test.clj +++ b/waiter/integration/waiter/grpc_test.clj @@ -77,9 +77,9 @@ (is (= "OK" (some-> ping-response :body)) (str ping-response)) (is (str/starts-with? (str (some-> ping-response :headers :server)) "courier-health-check") (str ping-response)) (assert-response-status ping-response 200) - (is (or (= {:exists? true :healthy? true :service-id service-id :status "Running"} service-state) - (= {:exists? true :healthy? false :service-id service-id :status "Starting"} service-state)) - (str service-state))) + (is (true? (:exists? service-state)) (str service-state)) + (is (= service-id (:service-id service-state)) (str service-state)) + (is (contains? #{"Running" "Starting"} (:status service-state)) (str service-state))) (assert-service-on-all-routers waiter-url service-id cookies) {:h2c-port h2c-port diff --git a/waiter/project.clj b/waiter/project.clj index 7bd6cae3b..e5145fced 100644 --- a/waiter/project.clj +++ b/waiter/project.clj @@ -32,7 +32,7 @@ :dependencies [[bidi "2.1.5" :exclusions [prismatic/schema ring/ring-core]] - [twosigma/courier "1.4.1" + [twosigma/courier "1.4.3" :exclusions [com.google.guava/guava io.grpc/grpc-core] :scope "test"] ;; avoids the following: