Skip to content
This repository has been archived by the owner on Mar 22, 2023. It is now read-only.

Commit

Permalink
avoids overloading the id field in CourierRequest
Browse files Browse the repository at this point in the history
introduces separate variant field to determine cancellation operation
adds unary server exit case
cleans up return values from the grpc client
adds assertion on grpc health check response body
  • Loading branch information
shamsimam committed Jul 10, 2019
1 parent 54711da commit 7959b54
Show file tree
Hide file tree
Showing 6 changed files with 362 additions and 253 deletions.
2 changes: 1 addition & 1 deletion containers/test-apps/courier/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>twosigma</groupId>
<artifactId>courier</artifactId>
<version>1.3.0</version>
<version>1.4.0</version>

<name>courier</name>
<url>https://github.com/twosigma/waiter/tree/master/test-apps/courier</url>
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,6 @@
import io.grpc.stub.StreamObserver;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.logging.Logger;

Expand All @@ -42,6 +36,15 @@
public class GrpcServer {

private final static Logger LOGGER = Logger.getLogger(GrpcServer.class.getName());

private static void sleep(final int durationMillis) {
try {
Thread.sleep(durationMillis);
} catch (final Exception ex) {
ex.printStackTrace();
}
}

private Server server;

void start(final int port) throws IOException {
Expand Down Expand Up @@ -89,13 +92,17 @@ public void sendPackage(final CourierRequest request, final StreamObserver<Couri
LOGGER.info("CancelHandler:sendPackage CourierRequest{" + "id=" + request.getId() + "} was cancelled");
});
}
if (request.getId().contains("SEND_ERROR")) {
if (Variant.SEND_ERROR.equals(request.getVariant())) {
final StatusRuntimeException error = Status.CANCELLED
.withCause(new RuntimeException(request.getId()))
.withDescription("Cancelled by server")
.asRuntimeException();
LOGGER.info("Sending cancelled by server error");
responseObserver.onError(error);
} else if (Variant.EXIT_PRE_RESPONSE.equals(request.getVariant())) {
sleep(1000);
LOGGER.info("Exiting server abruptly");
System.exit(1);
} else {
final CourierReply reply = CourierReply
.newBuilder()
Expand Down Expand Up @@ -123,21 +130,21 @@ public StreamObserver<CourierRequest> collectPackages(final StreamObserver<Couri
private long totalLength = 0;

@Override
public void onNext(final CourierRequest courierRequest) {
LOGGER.info("Received CourierRequest id=" + courierRequest.getId());
public void onNext(final CourierRequest request) {
LOGGER.info("Received CourierRequest id=" + request.getId());

numMessages += 1;
totalLength += courierRequest.getMessage().length();
LOGGER.severe("Summary of collected packages: numMessages=" + numMessages +
totalLength += request.getMessage().length();
LOGGER.info("Summary of collected packages: numMessages=" + numMessages +
" with totalLength=" + totalLength);

if (courierRequest.getId().contains("EXIT_PRE_RESPONSE")) {
if (Variant.EXIT_PRE_RESPONSE.equals(request.getVariant())) {
sleep(1000);
LOGGER.info("Exiting server abruptly");
System.exit(1);
} else if (courierRequest.getId().contains("SEND_ERROR")) {
} else if (Variant.SEND_ERROR.equals(request.getVariant())) {
final StatusRuntimeException error = Status.CANCELLED
.withCause(new RuntimeException(courierRequest.getId()))
.withCause(new RuntimeException(request.getId()))
.withDescription("Cancelled by server")
.asRuntimeException();
LOGGER.info("Sending cancelled by server error");
Expand All @@ -148,29 +155,21 @@ public void onNext(final CourierRequest courierRequest) {
.setNumMessages(numMessages)
.setTotalLength(totalLength)
.build();
LOGGER.info("Sending CourierSummary for id=" + courierRequest.getId());
LOGGER.info("Sending CourierSummary for id=" + request.getId());
responseObserver.onNext(courierSummary);
}

if (courierRequest.getId().contains("EXIT_POST_RESPONSE")) {
if (Variant.EXIT_POST_RESPONSE.equals(request.getVariant())) {
sleep(1000);
LOGGER.info("Exiting server abruptly");
System.exit(1);
}
}

private void sleep(final int durationMillis) {
try {
Thread.sleep(durationMillis);
} catch (Exception e) {
e.printStackTrace();
}
}

@Override
public void onError(final Throwable throwable) {
LOGGER.severe("Error in collecting packages: " + throwable.getMessage());
responseObserver.onError(throwable);
public void onError(final Throwable th) {
LOGGER.severe("Error in collecting packages: " + th.getMessage());
responseObserver.onError(th);
}

@Override
Expand Down
10 changes: 8 additions & 2 deletions containers/test-apps/courier/src/main/proto/courier.proto
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,18 @@ service Courier {
rpc CollectPackages (stream CourierRequest) returns (stream CourierSummary);
}

enum Variant {
NORMAL = 0;
SEND_ERROR = 1;
EXIT_PRE_RESPONSE = 2;
EXIT_POST_RESPONSE = 3;
}

message CourierRequest {
string id = 1;
string from = 2;
string message = 3;
Variant variant = 4;
}

message CourierReply {
Expand All @@ -44,6 +52,4 @@ message CourierReply {
message CourierSummary {
int64 num_messages = 1;
int64 total_length = 2;
string status_code = 3;
string status_description = 4;
}
Loading

0 comments on commit 7959b54

Please sign in to comment.