Skip to content

Commit

Permalink
Merge pull request #106 from rickeyski/fluent-reactor
Browse files Browse the repository at this point in the history
convert reactor tests to fluent fixing #65
  • Loading branch information
rmichela authored Jul 12, 2018
2 parents 6eb0f71 + 5ae86bd commit c073723
Show file tree
Hide file tree
Showing 11 changed files with 47 additions and 52 deletions.
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,7 @@ Found a bug? Think you've got an awesome feature you want to add? We welcome co
it if master changes after you open your pull request.*

## Acceptance Criteria
We love contributions, but it's important that your pull request adhere to the standards that we maintain in this r
epository.
We love contributions, but it's important that your pull request adhere to the standards that we maintain in this repository.

- All tests must be passing
- All code changes require tests
Expand Down
18 changes: 9 additions & 9 deletions reactor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ Overview
Reactor-gRPC is a set of gRPC bindings for reactive programming with [Reactor](http://projectreactor.io/).

### Android support
Reactive gRPC supports Android to the same level of the underlying reactive technologies. Spring Reactor
does [not officially support Android](http://projectreactor.io/docs/core/release/reference/docs/index.html#prerequisites),
Reactive gRPC supports Android to the same level of the underlying reactive technologies. Spring Reactor
does [not officially support Android](http://projectreactor.io/docs/core/release/reference/docs/index.html#prerequisites),
however, "it should work fine with Android SDK 26 (Android O) and above."

Installation
Expand Down Expand Up @@ -52,10 +52,10 @@ protobuf {
Usage
=====
After installing the plugin, Reactor-gRPC service stubs will be generated along with your gRPC service stubs.

* To implement a service using an Reactor-gRPC service, subclass `Reactor[Name]Grpc.[Name]ImplBase` and override the Reactor-based
methods.

```java
ReactorGreeterGrpc.GreeterImplBase svc = new ReactorGreeterGrpc.GreeterImplBase() {
@Override
Expand Down Expand Up @@ -85,10 +85,10 @@ After installing the plugin, Reactor-gRPC service stubs will be generated along
Flux<HelloResponse> resp = stub.sayHelloBothStream(req);
resp.subscribe(...);
```

## Don't break the chain
Used on their own, the generated RxGrpc stub methods do not cleanly chain with other RxJava operators.
Using the `compose()` and `as()` methods of `Mono` and `Fluz` are preferred over direct invocation.
Using the `compose()` and `as()` methods of `Mono` and `Flux` are preferred over direct invocation.

#### One→One, Many→Many
```java
Expand All @@ -101,9 +101,9 @@ Flux<HelloResponse> fluxResponse = fluxRequest.compose(stub::sayHelloBothStream)
Mono<HelloResponse> monoResponse = fluxRequest.as(stub::sayHelloRequestStream);
Flux<HelloResponse> fluxResponse = monoRequest.as(stub::sayHelloResponseStream);
```

## Retrying streaming requests
`GrpcRetry` is used to transparently re-establish a streaming gRPC request in the event of a server error. During a
`GrpcRetry` is used to transparently re-establish a streaming gRPC request in the event of a server error. During a
retry, the upstream rx pipeline is re-subscribed to acquire a request message and the RPC call re-issued. The downstream
rx pipeline never sees the error.

Expand All @@ -112,7 +112,7 @@ Flux<HelloResponse> fluxResponse = fluxRequest.compose(GrpcRetry.ManyToMany.retr
```

For complex retry scenarios, use the `Retry` builder from <a href="https://github.com/reactor/reactor-addons/blob/master/reactor-extra/src/main/java/reactor/retry/Retry.java">Reactor Extras</a>.

Modules
=======

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void clientToServerBackpressure() {
.doOnNext(i -> updateNumberOfWaits(lastValueTime, numberOfWaits))
.map(BackpressureIntegrationTest::protoNum);

Mono<NumberProto.Number> reactorResponse = stub.requestPressure(reactorRequest);
Mono<NumberProto.Number> reactorResponse = reactorRequest.as(stub::requestPressure);

StepVerifier.create(reactorResponse)
.expectNextMatches(v -> v.getNumber(0) == NUMBER_OF_STREAM_ELEMENTS - 1)
Expand All @@ -103,7 +103,7 @@ public void serverToClientBackpressure() {

Mono<Empty> reactorRequest = Mono.just(Empty.getDefaultInstance());

Flux<NumberProto.Number> reactorResponse = stub.responsePressure(reactorRequest)
Flux<NumberProto.Number> reactorResponse = reactorRequest.as(stub::responsePressure)
.doOnNext(n -> System.out.println(n.getNumber(0) + " <--"))
.doOnNext(n -> waitIfValuesAreEqual(n.getNumber(0), 3));

Expand All @@ -121,7 +121,9 @@ public void bidiResponseBackpressure() {

ReactorNumbersGrpc.ReactorNumbersStub stub = ReactorNumbersGrpc.newReactorStub(serverRule.getChannel());

Flux<NumberProto.Number> reactorResponse = stub.twoWayResponsePressure(Flux.empty())
Flux<NumberProto.Number> reactorRequest = Flux.empty();

Flux<NumberProto.Number> reactorResponse = reactorRequest.compose(stub::twoWayResponsePressure)
.doOnNext(n -> System.out.println(n.getNumber(0) + " <--"))
.doOnNext(n -> waitIfValuesAreEqual(n.getNumber(0), 3));

Expand All @@ -145,7 +147,7 @@ public void bidiRequestBackpressure() {
.doOnNext(i -> updateNumberOfWaits(lastValueTime, numberOfWaits))
.map(BackpressureIntegrationTest::protoNum);

Flux<NumberProto.Number> reactorResponse = stub.twoWayRequestPressure(reactorRequest);
Flux<NumberProto.Number> reactorResponse = reactorRequest.compose(stub::twoWayRequestPressure);

StepVerifier.create(reactorResponse)
.expectNextMatches(v -> v.getNumber(0) == NUMBER_OF_STREAM_ELEMENTS - 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,7 @@ public void clientCanCancelServerStreamExplicitly() throws InterruptedException

AtomicInteger lastNumberConsumed = new AtomicInteger(Integer.MAX_VALUE);
ReactorNumbersGrpc.ReactorNumbersStub stub = ReactorNumbersGrpc.newReactorStub(serverRule.getChannel());
Flux<NumberProto.Number> test = stub
.responsePressure(Mono.just(Empty.getDefaultInstance()))
Flux<NumberProto.Number> test = Mono.just(Empty.getDefaultInstance()).as(stub::responsePressure)
.doOnNext(number -> {lastNumberConsumed.set(number.getNumber(0)); System.out.println("C: " + number.getNumber(0));})
.doOnError(throwable -> System.out.println(throwable.getMessage()))
.doOnComplete(() -> System.out.println("Completed"))
Expand All @@ -123,8 +122,7 @@ public void clientCanCancelServerStreamImplicitly() throws InterruptedException
serverRule.getServiceRegistry().addService(svc);

ReactorNumbersGrpc.ReactorNumbersStub stub = ReactorNumbersGrpc.newReactorStub(serverRule.getChannel());
Flux<NumberProto.Number> test = stub
.responsePressure(Mono.just(Empty.getDefaultInstance()))
Flux<NumberProto.Number> test = Mono.just(Empty.getDefaultInstance()).as(stub::responsePressure)
.doOnNext(number -> System.out.println(number.getNumber(0)))
.doOnError(throwable -> System.out.println(throwable.getMessage()))
.doOnComplete(() -> System.out.println("Completed"))
Expand Down Expand Up @@ -163,8 +161,7 @@ public void serverCanCancelClientStreamImplicitly() {
System.out.println("Client canceled");
});

Mono<NumberProto.Number> observer = stub
.requestPressure(request)
Mono<NumberProto.Number> observer = request.as(stub::requestPressure)
.doOnSuccess(number -> System.out.println(number.getNumber(0)))
.doOnError(throwable -> System.out.println(throwable.getMessage()));

Expand Down Expand Up @@ -203,8 +200,7 @@ public void serverCanCancelClientStreamExplicitly() {
System.out.println("Client canceled");
});

Mono<NumberProto.Number> observer = stub
.requestPressure(request)
Mono<NumberProto.Number> observer = request.as(stub::requestPressure)
.doOnSuccess(number -> System.out.println(number.getNumber(0)))
.doOnError(throwable -> System.out.println(throwable.getMessage()));

Expand Down Expand Up @@ -243,8 +239,7 @@ public void serverCanCancelClientStreamImplicitlyBidi() {
System.out.println("Client canceled");
});

Flux<NumberProto.Number> observer = stub
.twoWayPressure(request)
Flux<NumberProto.Number> observer = request.compose(stub::twoWayPressure)
.doOnNext(number -> System.out.println(number.getNumber(0)))
.doOnError(throwable -> System.out.println(throwable.getMessage()));

Expand Down Expand Up @@ -283,8 +278,7 @@ public void serverCanCancelClientStreamExplicitlyBidi() {
System.out.println("Client canceled");
});

Flux<NumberProto.Number> observer = stub
.twoWayPressure(request)
Flux<NumberProto.Number> observer = request.compose(stub::twoWayPressure)
.doOnNext(number -> System.out.println(number.getNumber(0)))
.doOnError(throwable -> System.out.println(throwable.getMessage()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public void stopServer() throws InterruptedException {
public void oneToOne() {
ReactorGreeterGrpc.ReactorGreeterStub stub = ReactorGreeterGrpc.newReactorStub(channel);
Mono<HelloRequest> req = Mono.just(HelloRequest.newBuilder().setName("reactorjava").build());
Mono<HelloResponse> resp = stub.sayHello(req);
Mono<HelloResponse> resp = req.compose(stub::sayHello);

AtomicReference<String> clientThreadName = new AtomicReference<>();

Expand All @@ -119,7 +119,7 @@ public void manyToMany() {
HelloRequest.newBuilder().setName("d").build(),
HelloRequest.newBuilder().setName("e").build());

Flux<HelloResponse> resp = stub.sayHelloBothStream(req);
Flux<HelloResponse> resp = req.compose(stub::sayHelloBothStream);

AtomicReference<String> clientThreadName = new AtomicReference<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,19 +106,19 @@ public void fourKindsOfRequestAtOnce() throws Exception {
// == MAKE REQUESTS ==
// One to One
Mono<HelloRequest> req1 = Mono.just(HelloRequest.newBuilder().setName("reactorjava").build());
Mono<HelloResponse> resp1 = stub.sayHello(req1);
Mono<HelloResponse> resp1 = req1.compose(stub::sayHello);

// One to Many
Mono<HelloRequest> req2 = Mono.just(HelloRequest.newBuilder().setName("reactorjava").build());
Flux<HelloResponse> resp2 = stub.sayHelloRespStream(req2);
Flux<HelloResponse> resp2 = req2.as(stub::sayHelloRespStream);

// Many to One
Flux<HelloRequest> req3 = Flux.just(
HelloRequest.newBuilder().setName("a").build(),
HelloRequest.newBuilder().setName("b").build(),
HelloRequest.newBuilder().setName("c").build());

Mono<HelloResponse> resp3 = stub.sayHelloReqStream(req3);
Mono<HelloResponse> resp3 = req3.as(stub::sayHelloReqStream);

// Many to Many
Flux<HelloRequest> req4 = Flux.just(
Expand All @@ -128,7 +128,7 @@ public void fourKindsOfRequestAtOnce() throws Exception {
HelloRequest.newBuilder().setName("d").build(),
HelloRequest.newBuilder().setName("e").build());

Flux<HelloResponse> resp4 = stub.sayHelloBothStream(req4);
Flux<HelloResponse> resp4 = req4.compose(stub::sayHelloBothStream);

// == VERIFY RESPONSES ==
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public void ClientSendsContext() {
ReactorGreeterGrpc.ReactorGreeterStub stub = ReactorGreeterGrpc.newReactorStub(channel);
Context.current()
.withValue(ctxKey, "ClientSendsContext")
.run(() -> StepVerifier.create(stub.sayHello(worldReq).map(HelloResponse::getMessage))
.run(() -> StepVerifier.create(worldReq.compose(stub::sayHello).map(HelloResponse::getMessage))
.expectNext("Hello World")
.verifyComplete());

Expand All @@ -131,7 +131,7 @@ public void ClientSendsContext() {
public void ClientGetsContext() {
ReactorGreeterGrpc.ReactorGreeterStub stub = ReactorGreeterGrpc.newReactorStub(channel);

Mono<HelloResponse> test = stub.sayHello(worldReq)
Mono<HelloResponse> test = worldReq.compose(stub::sayHello)
.doOnSuccess(resp -> {
Context ctx = Context.current();
assertThat(ctxKey.get(ctx)).isEqualTo("ClientGetsContext");
Expand All @@ -146,7 +146,7 @@ public void ClientGetsContext() {
public void ServerAcceptsContext() {
ReactorGreeterGrpc.ReactorGreeterStub stub = ReactorGreeterGrpc.newReactorStub(channel);

StepVerifier.create(stub.sayHello(worldReq).map(HelloResponse::getMessage))
StepVerifier.create(worldReq.compose(stub::sayHello).map(HelloResponse::getMessage))
.expectNext("Hello World")
.verifyComplete();
assertThat(svc.getReceivedCtxValue()).isEqualTo("ServerAcceptsContext");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public static void stopServer() throws InterruptedException {
public void oneToOne() throws IOException {
ReactorGreeterGrpc.ReactorGreeterStub stub = ReactorGreeterGrpc.newReactorStub(channel);
Mono<HelloRequest> req = Mono.just(HelloRequest.newBuilder().setName("reactorjava").build());
Mono<HelloResponse> resp = stub.sayHello(req);
Mono<HelloResponse> resp = req.compose(stub::sayHello);

StepVerifier.create(resp.map(HelloResponse::getMessage))
.expectNext("Hello reactorjava")
Expand All @@ -103,7 +103,7 @@ public void oneToOne() throws IOException {
public void oneToMany() throws IOException {
ReactorGreeterGrpc.ReactorGreeterStub stub = ReactorGreeterGrpc.newReactorStub(channel);
Mono<HelloRequest> req = Mono.just(HelloRequest.newBuilder().setName("reactorjava").build());
Flux<HelloResponse> resp = stub.sayHelloRespStream(req);
Flux<HelloResponse> resp = req.as(stub::sayHelloRespStream);

StepVerifier.create(resp.map(HelloResponse::getMessage))
.expectNext("Hello reactorjava", "Hi reactorjava", "Greetings reactorjava")
Expand All @@ -118,7 +118,7 @@ public void manyToOne() throws Exception {
HelloRequest.newBuilder().setName("b").build(),
HelloRequest.newBuilder().setName("c").build());

Mono<HelloResponse> resp = stub.sayHelloReqStream(req);
Mono<HelloResponse> resp = req.as(stub::sayHelloReqStream);

StepVerifier.create(resp.map(HelloResponse::getMessage))
.expectNext("Hello a and b and c")
Expand All @@ -135,7 +135,7 @@ public void manyToMany() throws Exception {
HelloRequest.newBuilder().setName("d").build(),
HelloRequest.newBuilder().setName("e").build());

Flux<HelloResponse> resp = stub.sayHelloBothStream(req);
Flux<HelloResponse> resp = req.compose(stub::sayHelloBothStream);

StepVerifier.create(resp.map(HelloResponse::getMessage))
.expectNext("Hello a and b", "Hello c and d", "Hello e")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public static void stopServer() throws InterruptedException {
public void oneToOne() {
ReactorGreeterGrpc.ReactorGreeterStub stub = ReactorGreeterGrpc.newReactorStub(channel);
Mono<String> reactorRequest = Mono.just("World");
Mono<String> reactorResponse = stub.sayHello(reactorRequest.map(this::toRequest)).map(this::fromResponse);
Mono<String> reactorResponse = reactorRequest.map(this::toRequest).compose(stub::sayHello).map(this::fromResponse);

StepVerifier.create(reactorResponse)
.expectNext("Hello World")
Expand All @@ -130,7 +130,7 @@ public void oneToOne() {
public void oneToMany() {
ReactorGreeterGrpc.ReactorGreeterStub stub = ReactorGreeterGrpc.newReactorStub(channel);
Mono<String> reactorRequest = Mono.just("World");
Flux<String> reactorResponse = stub.sayHelloRespStream(reactorRequest.map(this::toRequest)).map(this::fromResponse);
Flux<String> reactorResponse = reactorRequest.map(this::toRequest).as(stub::sayHelloRespStream).map(this::fromResponse);

StepVerifier.create(reactorResponse)
.expectNext("Hello World", "Hi World", "Greetings World")
Expand All @@ -141,7 +141,7 @@ public void oneToMany() {
public void manyToOne() {
ReactorGreeterGrpc.ReactorGreeterStub stub = ReactorGreeterGrpc.newReactorStub(channel);
Flux<String> reactorRequest = Flux.just("A", "B", "C");
Mono<String> reactorResponse = stub.sayHelloReqStream(reactorRequest.map(this::toRequest)).map(this::fromResponse);
Mono<String> reactorResponse = reactorRequest.map(this::toRequest).as(stub::sayHelloReqStream).map(this::fromResponse);

StepVerifier.create(reactorResponse)
.expectNext("Hello A and B and C")
Expand All @@ -152,7 +152,7 @@ public void manyToOne() {
public void manyToMany() {
ReactorGreeterGrpc.ReactorGreeterStub stub = ReactorGreeterGrpc.newReactorStub(channel);
Flux<String> reactorRequest = Flux.just("A", "B", "C", "D");
Flux<String> reactorResponse = stub.sayHelloBothStream(reactorRequest.map(this::toRequest)).map(this::fromResponse);
Flux<String> reactorResponse = reactorRequest.map(this::toRequest).compose(stub::sayHelloBothStream).map(this::fromResponse);

StepVerifier.create(reactorResponse)
.expectNext("Hello A and B", "Hello C and D")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public static void stopServer() {
@Test
public void oneToOne() {
ReactorGreeterGrpc.ReactorGreeterStub stub = ReactorGreeterGrpc.newReactorStub(channel);
Mono<HelloResponse> resp = stub.sayHello(Mono.just(HelloRequest.getDefaultInstance()));
Mono<HelloResponse> resp = Mono.just(HelloRequest.getDefaultInstance()).compose(stub::sayHello);

StepVerifier.create(resp)
.verifyErrorMatches(t -> t instanceof StatusRuntimeException && ((StatusRuntimeException)t).getStatus() == Status.INTERNAL);
Expand All @@ -77,7 +77,7 @@ public void oneToOne() {
@Test
public void oneToMany() {
ReactorGreeterGrpc.ReactorGreeterStub stub = ReactorGreeterGrpc.newReactorStub(channel);
Flux<HelloResponse> resp = stub.sayHelloRespStream(Mono.just(HelloRequest.getDefaultInstance()));
Flux<HelloResponse> resp = Mono.just(HelloRequest.getDefaultInstance()).as(stub::sayHelloRespStream);
Flux<HelloResponse> test = resp
.doOnNext(System.out::println)
.doOnError(throwable -> System.out.println(throwable.getMessage()))
Expand All @@ -91,15 +91,15 @@ public void oneToMany() {
@Test
public void manyToOne() {
ReactorGreeterGrpc.ReactorGreeterStub stub = ReactorGreeterGrpc.newReactorStub(channel);
Mono<HelloResponse> resp = stub.sayHelloReqStream(Flux.just(HelloRequest.getDefaultInstance()));
Mono<HelloResponse> resp = Flux.just(HelloRequest.getDefaultInstance()).as(stub::sayHelloReqStream);
StepVerifier.create(resp)
.verifyErrorMatches(t -> t instanceof StatusRuntimeException && ((StatusRuntimeException)t).getStatus() == Status.INTERNAL);
}

@Test
public void manyToMany() {
ReactorGreeterGrpc.ReactorGreeterStub stub = ReactorGreeterGrpc.newReactorStub(channel);
Flux<HelloResponse> resp = stub.sayHelloBothStream(Flux.just(HelloRequest.getDefaultInstance()));
Flux<HelloResponse> resp = Flux.just(HelloRequest.getDefaultInstance()).compose(stub::sayHelloBothStream);
StepVerifier.create(resp)
.verifyErrorMatches(t -> t instanceof StatusRuntimeException && ((StatusRuntimeException)t).getStatus() == Status.INTERNAL);
}
Expand Down
Loading

0 comments on commit c073723

Please sign in to comment.