-
Notifications
You must be signed in to change notification settings - Fork 17
adds support for propagating grpc server-side cancellations #844
Conversation
4907cf3
to
6dd5f97
Compare
24f8236
to
1162f2e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ending this review as I just saw more commits come in.
@@ -148,6 +150,15 @@ public static CourierReply sendPackage(final String host, | |||
} catch (final StatusRuntimeException e) { | |||
logFunction.apply("RPC failed, status: " + e.getStatus()); | |||
return null; | |||
} catch (ExecutionException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
} catch (ExecutionException e) { | |
} catch (final ExecutionException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
@@ -148,6 +150,15 @@ public static CourierReply sendPackage(final String host, | |||
} catch (final StatusRuntimeException e) { | |||
logFunction.apply("RPC failed, status: " + e.getStatus()); | |||
return null; | |||
} catch (ExecutionException e) { | |||
final Status status = Status.fromThrowable(e.getCause()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is getCause()
necessary? Status.fromThrowable()
searches recursively through causes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The snipped was inspired from grpc-java example on error handling:
https://github.com/grpc/grpc-java/blob/d648e8f2df091351c373cb75ad9f47e94aa2fca6/examples/src/main/java/io/grpc/examples/errorhandling/ErrorHandlingClient.java#L103
} catch (ExecutionException e) { | ||
final Status status = Status.fromThrowable(e.getCause()); | ||
logFunction.apply("RPC execution failed: " + status); | ||
return CourierReply |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we avoid packing into CourierReply
? CourierReply
is meant to represent a message from the server.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
@@ -214,6 +227,16 @@ public void onError(final Throwable throwable) { | |||
logFunction.apply("releasing semaphore after receiving error"); | |||
lockStep.release(); | |||
} | |||
if (throwable instanceof StatusRuntimeException) { | |||
final StatusRuntimeException exception = (StatusRuntimeException) throwable; | |||
final CourierSummary response = CourierSummary |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we avoid packing into CourierSummary
? CourierSummary
is meant to represent a message from the server.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
if (i >= cancelThreshold) { | ||
logFunction.apply("cancelling sending messages"); | ||
awaitChannelTermination.set(false); | ||
channel.shutdownNow(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also test explicit cancels. I found three ways:
final Context.CancellableContext cancellableContext = Context.current().withCancellation();
cancellableContext.run(() -> {
futureStub.collectPackages(new StreamObserver<CourierSummary>() {
// ...
});
});
// later...
cancellableContext.cancel(new RuntimeException("cancelling grpc request"));
ListenableFuture<CourierReply> future = futureStub.sendPackage(request);
future.cancel(false);
collector.onError(Status.CANCELLED.asRuntimeException());
https://groups.google.com/forum/#!topic/grpc-io/quToVM4NhdQ
https://groups.google.com/forum/#!topic/grpc-io/MdDADccRpAQ
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added the client-side streaming, but server-side cancellation case.
The above examples are for client-side cancellation, they will be added in the client-side cancellation PR.
(let [trailers-map (async/<! trailers) | ||
modified-trailers (merge grpc-headers trailers-map)] | ||
(log/info "response trailers:" trailers-map) | ||
(when (seq grpc-headers) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is (seq grpc-headers)
always true?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I added the if (seq grpc-headers)
later which makes these seq
checks unnecessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed.
(log/info "response trailers:" trailers-map) | ||
(when (seq grpc-headers) | ||
(log/info "attaching grpc headers into trailer:" grpc-headers)) | ||
(when (seq modified-trailers) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is (seq modified-trailers)
always true?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I added the if (seq grpc-headers)
later which makes these seq
checks unnecessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed.
;; mark the request as successful, grpc failures are reported in the headers | ||
(deliver reservation-status-promise :success) | ||
;; stop writing any content in the body | ||
(async/close! body) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't body
being read by stream-http-response
? Can we make this comment more clear?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added comments, we want to avoid writing data to body in stream-http-response
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stream-http-response
writes to resp-chan
. The body
here is the body coming back from the backend.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stream-http-response
reads from body
, closing body
triggers request completion of the request by closing resp-chan
.
(log/info "eagerly closing response body as grpc status is" grpc-status) | ||
(when abort-ch | ||
;; disallow aborting the request | ||
(async/close! abort-ch)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we say why we are disallowing aborting the request?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added comment, we have deemed the request as success and will trigger normal completion by closing the body channel.
(deliver reservation-status-promise :success) | ||
;; stop writing any content in the body | ||
(async/close! body) | ||
(async/close! error-chan)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we close the error-chan
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added comments, we want to avoid blocking in stream-http-response
f4448b9
to
c430a6b
Compare
introduces separate variant field to determine cancellation operation adds unary server exit case cleans up return values from the grpc client adds assertion on grpc health check response body
…th unary responses
c430a6b
to
542ba84
Compare
asserts on individual fields on the ping response
542ba84
to
de02079
Compare
public static RpcResult<CourierReply> sendPackage(final String host, | ||
final int port, | ||
final Map<String, Object> headers, | ||
final String id, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we avoid overloading id
with the actions to take? Can we avoid updating the signatures of these functions when we want to make a change on the Clojure side? This request is aspirational and not required for this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do in a later PR. Created #854.
} | ||
|
||
@Override | ||
public void onCompleted() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we confirm onCompleted
and onError
are mutually exclusive?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, here are logs from two runs where onCompleted ()
is not called when there is an error:
When there is an error:
INFO: completed sending packages
INFO: status received from server:Status{code=CANCELLED, description=Cancelled by server, cause=null}
INFO: trailers received from server:Metadata(content-type=application/grpc)
INFO: error in aggregating summaries io.grpc.StatusRuntimeException: CANCELLED: Cancelled by server
INFO: client result: null
INFO: shutting down channel
Successful completion:
INFO: completed sending packages
INFO: headers received from server:Metadata(content-type=application/grpc,x-cid=cid-aggregate-packages-success.1562836825027,grpc-encoding=identity,grpc-accept-encoding=gzip)
INFO: received response CourierSummary{count=10, length=90}
INFO: status received from server:Status{code=OK, description=null, cause=null}
INFO: trailers received from server:Metadata()
INFO: completed aggregating summaries
INFO: client result: num_messages: 10, total_length: 90
INFO: shutting down channel
;; mark the request as successful, grpc failures are reported in the headers | ||
(deliver reservation-status-promise :success) | ||
;; stop writing any content in the body | ||
(async/close! body) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stream-http-response
writes to resp-chan
. The body
here is the body coming back from the backend.
avoids warnings due to presence of overloaded methods in grpc client
avoids warnings due to presence of overloaded methods in grpc client adds correlation-id to logs from grpc client
bc96170
to
1809d61
Compare
I have updated the courier client code to avoid race in reporting the error status. |
Fyi, consistently green runs internally since the client race fix. |
Jet PR: twosigma/jet#32
Changes proposed in this PR
Why are we making these changes?
gRPC headers/trailers including errors codes should be propagated correctly to the client.