Skip to content
This repository has been archived by the owner on Mar 22, 2023. It is now read-only.

adds support for propagating grpc server-side cancellations #844

Merged
merged 11 commits into from
Jul 11, 2019

Conversation

shamsimam
Copy link
Contributor

Jet PR: twosigma/jet#32

Changes proposed in this PR

  • adds support for propagating grpc server-side cancellations

Why are we making these changes?

gRPC headers/trailers including errors codes should be propagated correctly to the client.

@shamsimam shamsimam self-assigned this Jul 9, 2019
@shamsimam shamsimam force-pushed the grpc-server-side-cancel branch 2 times, most recently from 4907cf3 to 6dd5f97 Compare July 9, 2019 22:26
@shamsimam shamsimam requested a review from sradack July 9, 2019 23:03
@shamsimam shamsimam force-pushed the grpc-server-side-cancel branch from 24f8236 to 1162f2e Compare July 10, 2019 06:49
Copy link
Contributor

@sradack sradack left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ending this review as I just saw more commits come in.

@@ -148,6 +150,15 @@ public static CourierReply sendPackage(final String host,
} catch (final StatusRuntimeException e) {
logFunction.apply("RPC failed, status: " + e.getStatus());
return null;
} catch (ExecutionException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
} catch (ExecutionException e) {
} catch (final ExecutionException e) {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -148,6 +150,15 @@ public static CourierReply sendPackage(final String host,
} catch (final StatusRuntimeException e) {
logFunction.apply("RPC failed, status: " + e.getStatus());
return null;
} catch (ExecutionException e) {
final Status status = Status.fromThrowable(e.getCause());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is getCause() necessary? Status.fromThrowable() searches recursively through causes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

} catch (ExecutionException e) {
final Status status = Status.fromThrowable(e.getCause());
logFunction.apply("RPC execution failed: " + status);
return CourierReply
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we avoid packing into CourierReply? CourierReply is meant to represent a message from the server.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -214,6 +227,16 @@ public void onError(final Throwable throwable) {
logFunction.apply("releasing semaphore after receiving error");
lockStep.release();
}
if (throwable instanceof StatusRuntimeException) {
final StatusRuntimeException exception = (StatusRuntimeException) throwable;
final CourierSummary response = CourierSummary
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we avoid packing into CourierSummary? CourierSummary is meant to represent a message from the server.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

if (i >= cancelThreshold) {
logFunction.apply("cancelling sending messages");
awaitChannelTermination.set(false);
channel.shutdownNow();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also test explicit cancels. I found three ways:

final Context.CancellableContext cancellableContext = Context.current().withCancellation();
cancellableContext.run(() -> {
    futureStub.collectPackages(new StreamObserver<CourierSummary>() {
        // ...
    });
});
// later...
cancellableContext.cancel(new RuntimeException("cancelling grpc request"));
ListenableFuture<CourierReply> future = futureStub.sendPackage(request);
future.cancel(false);
collector.onError(Status.CANCELLED.asRuntimeException());

https://groups.google.com/forum/#!topic/grpc-io/quToVM4NhdQ
https://groups.google.com/forum/#!topic/grpc-io/MdDADccRpAQ

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the client-side streaming, but server-side cancellation case.
The above examples are for client-side cancellation, they will be added in the client-side cancellation PR.

(let [trailers-map (async/<! trailers)
modified-trailers (merge grpc-headers trailers-map)]
(log/info "response trailers:" trailers-map)
(when (seq grpc-headers)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is (seq grpc-headers) always true?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I added the if (seq grpc-headers) later which makes these seq checks unnecessary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed.

(log/info "response trailers:" trailers-map)
(when (seq grpc-headers)
(log/info "attaching grpc headers into trailer:" grpc-headers))
(when (seq modified-trailers)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is (seq modified-trailers) always true?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I added the if (seq grpc-headers) later which makes these seq checks unnecessary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed.

;; mark the request as successful, grpc failures are reported in the headers
(deliver reservation-status-promise :success)
;; stop writing any content in the body
(async/close! body)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't body being read by stream-http-response? Can we make this comment more clear?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comments, we want to avoid writing data to body in stream-http-response

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stream-http-response writes to resp-chan. The body here is the body coming back from the backend.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stream-http-response reads from body, closing body triggers request completion of the request by closing resp-chan.

(log/info "eagerly closing response body as grpc status is" grpc-status)
(when abort-ch
;; disallow aborting the request
(async/close! abort-ch))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we say why we are disallowing aborting the request?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comment, we have deemed the request as success and will trigger normal completion by closing the body channel.

(deliver reservation-status-promise :success)
;; stop writing any content in the body
(async/close! body)
(async/close! error-chan))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we close the error-chan?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comments, we want to avoid blocking in stream-http-response

@shamsimam shamsimam force-pushed the grpc-server-side-cancel branch 2 times, most recently from f4448b9 to c430a6b Compare July 10, 2019 19:44
@shamsimam shamsimam force-pushed the grpc-server-side-cancel branch from c430a6b to 542ba84 Compare July 10, 2019 21:05
asserts on individual fields on the ping response
@shamsimam shamsimam force-pushed the grpc-server-side-cancel branch from 542ba84 to de02079 Compare July 11, 2019 00:44
public static RpcResult<CourierReply> sendPackage(final String host,
final int port,
final Map<String, Object> headers,
final String id,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we avoid overloading id with the actions to take? Can we avoid updating the signatures of these functions when we want to make a change on the Clojure side? This request is aspirational and not required for this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do in a later PR. Created #854.

}

@Override
public void onCompleted() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we confirm onCompleted and onError are mutually exclusive?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, here are logs from two runs where onCompleted () is not called when there is an error:

When there is an error:

INFO: completed sending packages
INFO: status received from server:Status{code=CANCELLED, description=Cancelled by server, cause=null}
INFO: trailers received from server:Metadata(content-type=application/grpc)
INFO: error in aggregating summaries io.grpc.StatusRuntimeException: CANCELLED: Cancelled by server
INFO: client result: null
INFO: shutting down channel

Successful completion:

INFO: completed sending packages
INFO: headers received from server:Metadata(content-type=application/grpc,x-cid=cid-aggregate-packages-success.1562836825027,grpc-encoding=identity,grpc-accept-encoding=gzip)
INFO: received response CourierSummary{count=10, length=90}
INFO: status received from server:Status{code=OK, description=null, cause=null}
INFO: trailers received from server:Metadata()
INFO: completed aggregating summaries
INFO: client result: num_messages: 10, total_length: 90
INFO: shutting down channel

;; mark the request as successful, grpc failures are reported in the headers
(deliver reservation-status-promise :success)
;; stop writing any content in the body
(async/close! body)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stream-http-response writes to resp-chan. The body here is the body coming back from the backend.

avoids warnings due to presence of overloaded methods in grpc client
avoids warnings due to presence of overloaded methods in grpc client
adds correlation-id to logs from grpc client
@shamsimam shamsimam force-pushed the grpc-server-side-cancel branch from bc96170 to 1809d61 Compare July 11, 2019 10:41
@shamsimam
Copy link
Contributor Author

I have updated the courier client code to avoid race in reporting the error status.

@shamsimam
Copy link
Contributor Author

Fyi, consistently green runs internally since the client race fix.

@sradack sradack merged commit dd6bd41 into master Jul 11, 2019
@sradack sradack deleted the grpc-server-side-cancel branch July 11, 2019 19:14
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants