diff --git a/containers/test-apps/courier/pom.xml b/containers/test-apps/courier/pom.xml
index 49afff57d..2fd71e94a 100644
--- a/containers/test-apps/courier/pom.xml
+++ b/containers/test-apps/courier/pom.xml
@@ -6,7 +6,7 @@
twosigma
courier
- 1.4.3
+ 1.4.4
courier
https://github.com/twosigma/waiter/tree/master/test-apps/courier
diff --git a/containers/test-apps/courier/src/main/java/com/twosigma/waiter/courier/GrpcClient.java b/containers/test-apps/courier/src/main/java/com/twosigma/waiter/courier/GrpcClient.java
index 4b29d7db5..db95a970b 100644
--- a/containers/test-apps/courier/src/main/java/com/twosigma/waiter/courier/GrpcClient.java
+++ b/containers/test-apps/courier/src/main/java/com/twosigma/waiter/courier/GrpcClient.java
@@ -452,24 +452,6 @@ private void resolveResponsePromise() {
}
}
- public static RpcResult> collectPackages(final String host,
- final int port,
- final Map headers,
- final String idPrefix,
- final String from,
- final List messages,
- final int interMessageSleepMs,
- final boolean lockStepMode,
- final int cancelThreshold) throws InterruptedException {
-
- final List ids = new ArrayList<>(messages.size());
- for (int i = 0; i < messages.size(); i++) {
- ids.add(idPrefix + i);
- }
-
- return collectPackages(host, port, headers, ids, from, messages, interMessageSleepMs, lockStepMode, cancelThreshold);
- }
-
/**
* Greet server. If provided, the first element of {@code args} is the name to use in the
* greeting.
@@ -533,8 +515,9 @@ private static void runSendPackageSendError(final String host, final int port) t
private static void runCollectPackagesSuccess(final String host, final int port) throws InterruptedException {
final HashMap headers = new HashMap<>();
headers.put("x-cid", "cid-collect-packages-success." + System.currentTimeMillis());
+ final List ids = IntStream.range(0, 10).mapToObj(i -> "id-" + i).collect(Collectors.toList());
final List messages = IntStream.range(0, 10).mapToObj(i -> "message-" + i).collect(Collectors.toList());
- final RpcResult> rpcResult = collectPackages(host, port, headers, "id-", "User", messages, 100, true, messages.size() + 1);
+ final RpcResult> rpcResult = collectPackages(host, port, headers, ids, "User", messages, 100, true, messages.size() + 1);
final List courierSummaries = rpcResult.result();
logFunction.apply("collectPackages[success] summary = " + courierSummaries);
final Status status = rpcResult.status();
diff --git a/waiter/integration/waiter/grpc_test.clj b/waiter/integration/waiter/grpc_test.clj
index 7f6d2bd13..0e9c64ce7 100644
--- a/waiter/integration/waiter/grpc_test.clj
+++ b/waiter/integration/waiter/grpc_test.clj
@@ -19,7 +19,8 @@
[clojure.tools.logging :as log]
[clojure.walk :as walk]
[waiter.util.client-tools :refer :all])
- (:import (com.twosigma.waiter.courier GrpcClient)
+ (:import (com.twosigma.waiter.courier CourierReply CourierSummary GrpcClient)
+ (io.grpc Status)
(java.util.function Function)))
;; initialize logging on the grpc client
@@ -97,19 +98,26 @@
(let [id (rand-name "m")
from (rand-name "f")
content (rand-str 1000)
- request-headers (assoc request-headers "x-cid" (rand-name))
+ correlation-id (rand-name)
+ request-headers (assoc request-headers "x-cid" correlation-id)
rpc-result (GrpcClient/sendPackage host h2c-port request-headers id from content)
- reply (.result rpc-result)
- status (.status rpc-result)]
- (is reply)
+ ^CourierReply reply (.result rpc-result)
+ ^Status status (.status rpc-result)
+ assertion-message (str (cond-> {:correlation-id correlation-id
+ :service-id service-id}
+ reply (assoc :reply {:id (.getId reply)
+ :response (.getResponse reply)})
+ status (assoc :status {:code (-> status .getCode str)
+ :description (.getDescription status)})))]
+ (is reply assertion-message)
(when reply
- (is (= id (.getId reply)))
- (is (= content (.getMessage reply)))
- (is (= "received" (.getResponse reply))))
- (is status)
+ (is (= id (.getId reply)) assertion-message)
+ (is (= content (.getMessage reply)) assertion-message)
+ (is (= "received" (.getResponse reply)) assertion-message))
+ (is status assertion-message)
(when status
- (is (= "OK" (-> status .getCode str)) (str status))
- (is (str/blank? (.getDescription status)) (str status)))))))))
+ (is (= "OK" (-> status .getCode str)) assertion-message)
+ (is (str/blank? (.getDescription status)) assertion-message))))))))
(deftest ^:parallel ^:integration-fast test-grpc-unary-call-server-cancellation
(testing-using-waiter-url
@@ -121,17 +129,23 @@
(let [id (str (rand-name "m") ".SEND_ERROR")
from (rand-name "f")
content (rand-str 1000)
- request-cid (rand-name)
- _ (log/info "cid:" request-cid)
- request-headers (assoc request-headers "x-cid" request-cid)
+ correlation-id (rand-name)
+ _ (log/info "cid:" correlation-id)
+ request-headers (assoc request-headers "x-cid" correlation-id)
rpc-result (GrpcClient/sendPackage host h2c-port request-headers id from content)
- reply (.result rpc-result)
- status (.status rpc-result)]
- (is (nil? reply))
- (is status)
+ ^CourierReply reply (.result rpc-result)
+ ^Status status (.status rpc-result)
+ assertion-message (str (cond-> {:correlation-id correlation-id
+ :service-id service-id}
+ reply (assoc :reply {:id (.getId reply)
+ :response (.getResponse reply)})
+ status (assoc :status {:code (-> status .getCode str)
+ :description (.getDescription status)})))]
+ (is (nil? reply) assertion-message)
+ (is status assertion-message)
(when status
- (is (= "CANCELLED" (-> status .getCode str)) (str status))
- (is (= "Cancelled by server" (.getDescription status)) (str status)))))))))
+ (is (= "CANCELLED" (-> status .getCode str)) assertion-message)
+ (is (= "Cancelled by server" (.getDescription status)) assertion-message))))))))
(deftest ^:parallel ^:integration-fast test-grpc-unary-call-server-exit
(testing-using-waiter-url
@@ -143,16 +157,22 @@
(let [id (str (rand-name "m") ".EXIT_PRE_RESPONSE")
from (rand-name "f")
content (rand-str 1000)
- request-cid (rand-name)
- _ (log/info "cid:" request-cid)
- request-headers (assoc request-headers "x-cid" request-cid)
+ correlation-id (rand-name)
+ _ (log/info "cid:" correlation-id)
+ request-headers (assoc request-headers "x-cid" correlation-id)
rpc-result (GrpcClient/sendPackage host h2c-port request-headers id from content)
- reply (.result rpc-result)
- status (.status rpc-result)]
- (is (nil? reply))
- (is status)
+ ^CourierReply reply (.result rpc-result)
+ ^Status status (.status rpc-result)
+ assertion-message (str (cond-> {:correlation-id correlation-id
+ :service-id service-id}
+ reply (assoc :reply {:id (.getId reply)
+ :response (.getResponse reply)})
+ status (assoc :status {:code (-> status .getCode str)
+ :description (.getDescription status)})))]
+ (is (nil? reply) assertion-message)
+ (is status assertion-message)
(when status
- (is (contains? #{"UNAVAILABLE" "INTERNAL"} (-> status .getCode str)) (str status)))))))))
+ (is (contains? #{"UNAVAILABLE" "INTERNAL"} (-> status .getCode str)) assertion-message))))))))
(deftest ^:parallel ^:integration-fast test-grpc-bidi-streaming-successful
(testing-using-waiter-url
@@ -167,41 +187,63 @@
(log/info "starting streaming to and from server - independent mode test")
(let [cancel-threshold (inc num-messages)
from (rand-name "f")
- collect-cid (str (rand-name) "-independent-complete")
- request-headers (assoc request-headers "x-cid" collect-cid)
+ correlation-id (str (rand-name) "-independent-complete")
+ request-headers (assoc request-headers "x-cid" correlation-id)
+ ids (map #(str "id-inde-" %) (range num-messages))
rpc-result (GrpcClient/collectPackages
- host h2c-port request-headers "m-" from messages 10 false cancel-threshold)
+ host h2c-port request-headers ids from messages 10 false cancel-threshold)
summaries (.result rpc-result)
- status (.status rpc-result)]
- (log/info collect-cid "collecting independent packages...")
- (is (= (count messages) (count summaries)))
+ ^Status status (.status rpc-result)
+ assertion-message (str (cond-> {:correlation-id correlation-id
+ :service-id service-id
+ :summaries (map (fn [^CourierSummary s]
+ {:num-messages (.getNumMessages s)
+ :total-length (.getTotalLength s)})
+ summaries)}
+ status (assoc :status {:code (-> status .getCode str)
+ :description (.getDescription status)})))]
+ (log/info correlation-id "collecting independent packages...")
+ (is (= (count messages) (count summaries)) assertion-message)
(when (seq summaries)
- (is (= (range 1 (inc (count messages))) (map #(.getNumMessages %) summaries)))
- (is (= (reductions + (map count messages)) (map #(.getTotalLength %) summaries))))
- (is status)
+ (is (= (range 1 (inc (count messages))) (map #(.getNumMessages ^CourierSummary %) summaries))
+ assertion-message)
+ (is (= (reductions + (map count messages)) (map #(.getTotalLength ^CourierSummary %) summaries))
+ assertion-message))
+ (is status assertion-message)
(when status
- (is (= "OK" (-> status .getCode str)) (str status))
- (is (str/blank? (.getDescription status)) (str status)))))
+ (is (= "OK" (-> status .getCode str)) assertion-message)
+ (is (str/blank? (.getDescription status)) assertion-message))))
(testing (str "lock-step mode " max-message-length " messages completion")
(log/info "starting streaming to and from server - lock-step mode test")
(let [cancel-threshold (inc num-messages)
from (rand-name "f")
- collect-cid (str (rand-name) "-lock-step-complete")
- request-headers (assoc request-headers "x-cid" collect-cid)
+ correlation-id (str (rand-name) "-lock-step-complete")
+ request-headers (assoc request-headers "x-cid" correlation-id)
+ ids (map #(str "id-lock-" %) (range num-messages))
rpc-result (GrpcClient/collectPackages
- host h2c-port request-headers "m-" from messages 1 true cancel-threshold)
+ host h2c-port request-headers ids from messages 1 true cancel-threshold)
summaries (.result rpc-result)
- status (.status rpc-result)]
- (log/info collect-cid "collecting lock-step packages...")
- (is (= (count messages) (count summaries)))
+ ^Status status (.status rpc-result)
+ assertion-message (str (cond-> {:correlation-id correlation-id
+ :service-id service-id
+ :summaries (map (fn [^CourierSummary s]
+ {:num-messages (.getNumMessages s)
+ :total-length (.getTotalLength s)})
+ summaries)}
+ status (assoc :status {:code (-> status .getCode str)
+ :description (.getDescription status)})))]
+ (log/info correlation-id "collecting lock-step packages...")
+ (is (= (count messages) (count summaries)) assertion-message)
(when (seq summaries)
- (is (= (range 1 (inc (count messages))) (map #(.getNumMessages %) summaries)))
- (is (= (reductions + (map count messages)) (map #(.getTotalLength %) summaries))))
- (is status)
+ (is (= (range 1 (inc (count messages))) (map #(.getNumMessages ^CourierSummary %) summaries))
+ assertion-message)
+ (is (= (reductions + (map count messages)) (map #(.getTotalLength ^CourierSummary %) summaries))
+ assertion-message))
+ (is status assertion-message)
(when status
- (is (= "OK" (-> status .getCode str)) (str status))
- (is (str/blank? (.getDescription status)) (str status)))))))))))
+ (is (= "OK" (-> status .getCode str)) assertion-message)
+ (is (str/blank? (.getDescription status)) assertion-message))))))))))
(deftest ^:parallel ^:integration-slow test-grpc-bidi-streaming-server-exit
(testing-using-waiter-url
@@ -211,26 +253,26 @@
(let [messages (doall (repeatedly num-messages #(rand-str (inc (rand-int max-message-length)))))]
(dotimes [iteration num-iterations]
(doseq [mode ["EXIT_PRE_RESPONSE" "EXIT_POST_RESPONSE"]]
- (testing (str "lock-step mode " max-message-length " messages exits pre-response")
+ (testing (str "lock-step mode " max-message-length " messages " mode)
(let [{:keys [h2c-port host request-headers service-id]} (start-courier-instance waiter-url)]
(with-service-cleanup
service-id
(let [exit-index (* iteration (/ num-messages num-iterations))
- collect-cid (str (rand-name) "." mode "." exit-index "-" num-messages "." max-message-length)
- _ (log/info "collect packages cid" collect-cid "for"
+ correlation-id (str (rand-name) "." mode "." exit-index "-" num-messages "." max-message-length)
+ _ (log/info "collect packages cid" correlation-id "for"
{:iteration iteration :max-message-length max-message-length})
from (rand-name "f")
ids (map #(str "id-" (cond-> % (= % exit-index) (str "." mode))) (range num-messages))
- request-headers (assoc request-headers "x-cid" collect-cid)
+ request-headers (assoc request-headers "x-cid" correlation-id)
rpc-result (GrpcClient/collectPackages
host h2c-port request-headers ids from messages 1 true (inc num-messages))
message-summaries (.result rpc-result)
- status (.status rpc-result)
- assertion-message (str (cond-> {:collect-cid collect-cid
+ ^Status status (.status rpc-result)
+ assertion-message (str (cond-> {:correlation-id correlation-id
:exit-index exit-index
:iteration iteration
:service-id service-id
- :summaries (map (fn [s]
+ :summaries (map (fn [^CourierSummary s]
{:num-messages (.getNumMessages s)
:total-length (.getTotalLength s)})
message-summaries)}
@@ -247,7 +289,7 @@
(is (= (reductions + (map count (take expected-summary-count messages)))
(map #(.getTotalLength %) message-summaries))
assertion-message))
- (is status)
+ (is status assertion-message)
(when status
(is (contains? #{"UNAVAILABLE" "INTERNAL"} (-> status .getCode str)) assertion-message)))))))))))))
@@ -264,17 +306,17 @@
(with-service-cleanup
service-id
(let [error-index (* iteration (/ num-messages num-iterations))
- collect-cid (str (rand-name) "." mode "." error-index "-" num-messages "." max-message-length)
+ correlation-id (str (rand-name) "." mode "." error-index "-" num-messages "." max-message-length)
from (rand-name "f")
ids (map #(str "id-" (cond-> % (= % error-index) (str "." mode))) (range num-messages))
- request-headers (assoc request-headers "x-cid" collect-cid)
- _ (log/info "collect packages cid" collect-cid "for"
+ request-headers (assoc request-headers "x-cid" correlation-id)
+ _ (log/info "collect packages cid" correlation-id "for"
{:iteration iteration :max-message-length max-message-length})
rpc-result (GrpcClient/collectPackages
host h2c-port request-headers ids from messages 1 true (inc num-messages))
message-summaries (.result rpc-result)
- status (.status rpc-result)
- assertion-message (str (cond-> {:collect-cid collect-cid
+ ^Status status (.status rpc-result)
+ assertion-message (str (cond-> {:correlation-id correlation-id
:error-index error-index
:iteration iteration
:service-id service-id
@@ -294,7 +336,7 @@
(is (= (reductions + (map count (take expected-summary-count messages)))
(map #(.getTotalLength %) message-summaries))
assertion-message))
- (is status)
+ (is status assertion-message)
(when status
(is (= "CANCELLED" (-> status .getCode str)) assertion-message)
(is (= "Cancelled by server" (.getDescription status)) assertion-message))))))))))))
@@ -312,22 +354,28 @@
(log/info "starting streaming to and from server - independent mode test")
(let [cancel-threshold (inc num-messages)
from (rand-name "f")
- aggregate-cid (rand-name)
- request-headers (assoc request-headers "x-cid" aggregate-cid)
+ correlation-id (rand-name)
+ request-headers (assoc request-headers "x-cid" correlation-id)
ids (map #(str "id-" %) (range num-messages))
rpc-result (GrpcClient/aggregatePackages
host h2c-port request-headers ids from messages 10 cancel-threshold)
- summary (.result rpc-result)
- status (.status rpc-result)]
- (log/info aggregate-cid "aggregated packages...")
+ ^CourierSummary summary (.result rpc-result)
+ ^Status status (.status rpc-result)
+ assertion-message (str (cond-> {:correlation-id correlation-id
+ :service-id service-id}
+ summary (assoc :summary {:num-messages (.getNumMessages summary)
+ :total-length (.getTotalLength summary)})
+ status (assoc :status {:code (-> status .getCode str)
+ :description (.getDescription status)})))]
+ (log/info correlation-id "aggregated packages...")
(is summary)
(when summary
- (is (= (count messages) (.getNumMessages summary)))
- (is (= (reduce + (map count messages)) (.getTotalLength summary))))
+ (is (= (count messages) (.getNumMessages summary)) assertion-message)
+ (is (= (reduce + (map count messages)) (.getTotalLength summary)) assertion-message))
(is status)
(when status
- (is (= "OK" (-> status .getCode str)) (str status))
- (is (str/blank? (.getDescription status)) (str status)))))))))))
+ (is (= "OK" (-> status .getCode str)) assertion-message)
+ (is (str/blank? (.getDescription status)) assertion-message))))))))))
(deftest ^:parallel ^:integration-slow test-grpc-client-streaming-server-exit
(testing-using-waiter-url
@@ -337,22 +385,22 @@
(let [messages (doall (repeatedly num-messages #(rand-str (inc (rand-int max-message-length)))))]
(dotimes [iteration num-iterations]
(doseq [mode ["EXIT_PRE_RESPONSE" "EXIT_POST_RESPONSE"]]
- (testing (str "lock-step mode " max-message-length " messages exits pre-response")
+ (testing (str "lock-step mode " max-message-length " messages " mode)
(let [{:keys [h2c-port host request-headers service-id]} (start-courier-instance waiter-url)]
(with-service-cleanup
service-id
(let [exit-index (* iteration (/ num-messages num-iterations))
- aggregate-cid (str (rand-name) "." mode "." exit-index "-" num-messages "." max-message-length)
- _ (log/info "aggregate packages cid" aggregate-cid "for"
+ correlation-id (str (rand-name) "." mode "." exit-index "-" num-messages "." max-message-length)
+ _ (log/info "aggregate packages cid" correlation-id "for"
{:iteration iteration :max-message-length max-message-length})
from (rand-name "f")
ids (map #(str "id-" (cond-> % (= % exit-index) (str "." mode))) (range num-messages))
- request-headers (assoc request-headers "x-cid" aggregate-cid)
+ request-headers (assoc request-headers "x-cid" correlation-id)
rpc-result (GrpcClient/aggregatePackages
host h2c-port request-headers ids from messages 1 (inc num-messages))
- message-summary (.result rpc-result)
- status (.status rpc-result)
- assertion-message (str (cond-> {:aggregate-cid aggregate-cid
+ ^CourierSummary message-summary (.result rpc-result)
+ ^Status status (.status rpc-result)
+ assertion-message (str (cond-> {:correlation-id correlation-id
:exit-index exit-index
:iteration iteration
:service-id service-id}
@@ -361,8 +409,8 @@
status (assoc :status {:code (-> status .getCode str)
:description (.getDescription status)})))]
(log/info "result" assertion-message)
- (is (nil? message-summary))
- (is status)
+ (is (nil? message-summary) assertion-message)
+ (is status assertion-message)
(when status
(is (contains? #{"UNAVAILABLE" "INTERNAL"} (-> status .getCode str)) assertion-message)))))))))))))
@@ -379,17 +427,17 @@
(with-service-cleanup
service-id
(let [error-index (* iteration (/ num-messages num-iterations))
- aggregate-cid (str (rand-name) "." mode "." error-index "-" num-messages "." max-message-length)
+ correlation-id (str (rand-name) "." mode "." error-index "-" num-messages "." max-message-length)
from (rand-name "f")
ids (map #(str "id-" (cond-> % (= % error-index) (str "." mode))) (range num-messages))
- request-headers (assoc request-headers "x-cid" aggregate-cid)
- _ (log/info "aggregate packages cid" aggregate-cid "for"
+ request-headers (assoc request-headers "x-cid" correlation-id)
+ _ (log/info "aggregate packages cid" correlation-id "for"
{:iteration iteration :max-message-length max-message-length})
rpc-result (GrpcClient/aggregatePackages
host h2c-port request-headers ids from messages 1 (inc num-messages))
- message-summary (.result rpc-result)
- status (.status rpc-result)
- assertion-message (str (cond-> {:aggregate-cid aggregate-cid
+ ^CourierSummary message-summary (.result rpc-result)
+ ^Status status (.status rpc-result)
+ assertion-message (str (cond-> {:correlation-id correlation-id
:error-index error-index
:iteration iteration
:service-id service-id}
@@ -398,9 +446,8 @@
status (assoc :status {:code (-> status .getCode str)
:description (.getDescription status)})))]
(log/info "result" assertion-message)
- (is (nil? message-summary))
- (is status)
+ (is (nil? message-summary) assertion-message)
+ (is status assertion-message)
(when status
(is (= "CANCELLED" (-> status .getCode str)) assertion-message)
(is (= "Cancelled by server" (.getDescription status)) assertion-message))))))))))))
-
diff --git a/waiter/project.clj b/waiter/project.clj
index e5145fced..9789fc3a7 100644
--- a/waiter/project.clj
+++ b/waiter/project.clj
@@ -32,7 +32,7 @@
:dependencies [[bidi "2.1.5"
:exclusions [prismatic/schema ring/ring-core]]
- [twosigma/courier "1.4.3"
+ [twosigma/courier "1.4.4"
:exclusions [com.google.guava/guava io.grpc/grpc-core]
:scope "test"]
;; avoids the following: