From 2a557cb1c5ba60a58e3770de58bf63ea1bb16962 Mon Sep 17 00:00:00 2001 From: Ryan Michela Date: Mon, 9 Jul 2018 22:55:02 -0700 Subject: [PATCH 1/4] Retry WIP --- .../java/demo/client/console/ChatClient.java | 12 +- .../java/demo/client/javafx/ChatClient.java | 39 ++- .../demo/client/javafx/GrpcRetryFlowable.java | 23 -- demos/reactive-grpc-chat/rxjava-chat/pom.xml | 2 +- reactor/reactor-grpc-stub/pom.xml | 6 + .../com/salesforce/reactorgrpc/GrpcRetry.java | 82 +++++++ .../reactorgrpc/stub/GrpcRetryTest.java | 167 +++++++++++++ rx-java/rxgrpc-stub/pom.xml | 6 + .../{stub => }/GrpcContextOnScheduleHook.java | 2 +- .../java/com/salesforce/rxgrpc/GrpcRetry.java | 154 ++++++++++++ .../stub/GrpcContextOnScheduleHookTest.java | 1 + .../salesforce/rxgrpc/stub/GrpcRetryTest.java | 228 ++++++++++++++++++ 12 files changed, 682 insertions(+), 40 deletions(-) delete mode 100644 demos/reactive-grpc-chat/rxjava-chat/javafx-client/src/main/java/demo/client/javafx/GrpcRetryFlowable.java create mode 100644 reactor/reactor-grpc-stub/src/main/java/com/salesforce/reactorgrpc/GrpcRetry.java create mode 100644 reactor/reactor-grpc-stub/src/test/java/com/salesforce/reactorgrpc/stub/GrpcRetryTest.java rename rx-java/rxgrpc-stub/src/main/java/com/salesforce/rxgrpc/{stub => }/GrpcContextOnScheduleHook.java (95%) create mode 100644 rx-java/rxgrpc-stub/src/main/java/com/salesforce/rxgrpc/GrpcRetry.java create mode 100644 rx-java/rxgrpc-stub/src/test/java/com/salesforce/rxgrpc/stub/GrpcRetryTest.java diff --git a/demos/reactive-grpc-chat/rxjava-chat/console-client/src/main/java/demo/client/console/ChatClient.java b/demos/reactive-grpc-chat/rxjava-chat/console-client/src/main/java/demo/client/console/ChatClient.java index 4ee0da91..274162d6 100644 --- a/demos/reactive-grpc-chat/rxjava-chat/console-client/src/main/java/demo/client/console/ChatClient.java +++ b/demos/reactive-grpc-chat/rxjava-chat/console-client/src/main/java/demo/client/console/ChatClient.java @@ -7,6 +7,7 @@ import io.grpc.ManagedChannelBuilder; import io.reactivex.Observable; import io.reactivex.Single; +import io.reactivex.disposables.CompositeDisposable; import io.reactivex.disposables.Disposable; import jline.console.ConsoleReader; @@ -20,6 +21,7 @@ */ public final class ChatClient { private static final int PORT = 9999; + private static final CompositeDisposable disposables = new CompositeDisposable(); private ChatClient() { } @@ -35,13 +37,13 @@ public static void main(String[] args) throws Exception { console.println("Type /quit to exit"); // Subscribe to incoming messages - Disposable chatSubscription = Single.just(Empty.getDefaultInstance()) + disposables.add(Single.just(Empty.getDefaultInstance()) .as(stub::getMessages) .filter(message -> !message.getAuthor().equals(author)) - .subscribe(message -> printLine(console, message.getAuthor(), message.getMessage())); + .subscribe(message -> printLine(console, message.getAuthor(), message.getMessage()))); // Publish outgoing messages - Observable + disposables.add(Observable // Send connection message .just(author + " joined.") // Send user input @@ -54,11 +56,11 @@ public static void main(String[] args) throws Exception { ChatClient::doNothing, throwable -> printLine(console, "ERROR", throwable.getMessage()), done::countDown - ); + )); // Wait for a signal to exit, then clean up done.await(); - chatSubscription.dispose(); + disposables.dispose(); channel.shutdown(); channel.awaitTermination(1, TimeUnit.SECONDS); console.getTerminal().restore(); diff --git a/demos/reactive-grpc-chat/rxjava-chat/javafx-client/src/main/java/demo/client/javafx/ChatClient.java b/demos/reactive-grpc-chat/rxjava-chat/javafx-client/src/main/java/demo/client/javafx/ChatClient.java index 1a96cf58..303947a4 100644 --- a/demos/reactive-grpc-chat/rxjava-chat/javafx-client/src/main/java/demo/client/javafx/ChatClient.java +++ b/demos/reactive-grpc-chat/rxjava-chat/javafx-client/src/main/java/demo/client/javafx/ChatClient.java @@ -1,12 +1,13 @@ package demo.client.javafx; import com.google.protobuf.Empty; +import com.salesforce.rxgrpc.stub.GrpcRetry; import demo.proto.ChatProto; import demo.proto.RxChatGrpc; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.reactivex.Single; -import io.reactivex.disposables.Disposable; +import io.reactivex.disposables.CompositeDisposable; import io.reactivex.rxjavafx.observables.JavaFxObservable; import io.reactivex.rxjavafx.sources.WindowEventSource; import javafx.application.Application; @@ -33,7 +34,7 @@ public class ChatClient extends Application { private String author; private ManagedChannel channel; private RxChatGrpc.RxChatStub stub; - private Disposable chatSubscription; + private CompositeDisposable disposables = new CompositeDisposable(); public static void main(String[] args) { launch(args); @@ -54,26 +55,44 @@ public void start(Stage primaryStage) { Scene scene = buildScene(); // Subscribe to incoming messages - chatSubscription = Single.just(Empty.getDefaultInstance()) - .as(stub::getMessages) + disposables.add(Single + // Trigger + .just(Empty.getDefaultInstance()) + // Invoke and Error handle + .as(GrpcRetry.OneToMany.retryAfter(stub::getMessages, 1, TimeUnit.SECONDS)) .map(this::fromMessage) - .subscribe(messages::appendText); + // Execute + .subscribe(messages::appendText)); // Publish arrival/departure message - WindowEventSource.fromWindowEvents(primaryStage, WindowEvent.ANY) + disposables.add(WindowEventSource + // Trigger + .fromWindowEvents(primaryStage, WindowEvent.ANY) .filter(event -> event.getEventType().equals(WindowEvent.WINDOW_SHOWING) | event.getEventType().equals(WindowEvent.WINDOW_HIDING)) + // Invoke .map(event -> event.getEventType().equals(WindowEvent.WINDOW_SHOWING) ? "joined" : "left") .map(this::toMessage) .flatMapSingle(stub::postMessage) - .subscribe(); + // Error handle + .onErrorReturnItem(Empty.getDefaultInstance()) + .repeat() + // Execute + .subscribe()); // Publish outgoing messages - JavaFxObservable.actionEventsOf(send) + disposables.add(JavaFxObservable + // Trigger + .actionEventsOf(send) + // Invoke .map(x -> message.getText()) .map(this::toMessage) .flatMapSingle(stub::postMessage) - .subscribe(ignore -> message.clear()); + // Error handle + .onErrorReturnItem(Empty.getDefaultInstance()) + .repeat() + // Execute + .subscribe(ignore -> message.clear())); primaryStage.setScene(scene); primaryStage.show(); @@ -81,7 +100,7 @@ public void start(Stage primaryStage) { @Override public void stop() throws Exception { - chatSubscription.dispose(); + disposables.dispose(); channel.shutdown(); channel.awaitTermination(1, TimeUnit.SECONDS); } diff --git a/demos/reactive-grpc-chat/rxjava-chat/javafx-client/src/main/java/demo/client/javafx/GrpcRetryFlowable.java b/demos/reactive-grpc-chat/rxjava-chat/javafx-client/src/main/java/demo/client/javafx/GrpcRetryFlowable.java deleted file mode 100644 index 2de496c0..00000000 --- a/demos/reactive-grpc-chat/rxjava-chat/javafx-client/src/main/java/demo/client/javafx/GrpcRetryFlowable.java +++ /dev/null @@ -1,23 +0,0 @@ -package demo.client.javafx; - -import io.reactivex.Flowable; -import org.reactivestreams.Subscriber; - -import java.util.function.Supplier; - -public class GrpcRetryFlowable extends Flowable { - private final Flowable retryFlowable; - - GrpcRetryFlowable(Supplier> flowableSupplier) { - this.retryFlowable = Flowable.defer(flowableSupplier::get).retry(); - } - - @Override - protected void subscribeActual(Subscriber s) { - retryFlowable.subscribe(s); - } - - public static Flowable doRetry(Flowable flowable) { - return new GrpcRetryFlowable<>(() -> flowable); - } -} diff --git a/demos/reactive-grpc-chat/rxjava-chat/pom.xml b/demos/reactive-grpc-chat/rxjava-chat/pom.xml index 6edee0c9..6bc1f767 100644 --- a/demos/reactive-grpc-chat/rxjava-chat/pom.xml +++ b/demos/reactive-grpc-chat/rxjava-chat/pom.xml @@ -16,7 +16,7 @@ 2.1.10 - 0.8.2 + 0.8.3-SNAPSHOT 0.8.0 1.12.0 3.5.1 diff --git a/reactor/reactor-grpc-stub/pom.xml b/reactor/reactor-grpc-stub/pom.xml index bc2c3a21..fc0a5154 100644 --- a/reactor/reactor-grpc-stub/pom.xml +++ b/reactor/reactor-grpc-stub/pom.xml @@ -36,6 +36,12 @@ ${reactor.version} test + + io.projectreactor.addons + reactor-extra + ${reactor.version} + test + junit junit diff --git a/reactor/reactor-grpc-stub/src/main/java/com/salesforce/reactorgrpc/GrpcRetry.java b/reactor/reactor-grpc-stub/src/main/java/com/salesforce/reactorgrpc/GrpcRetry.java new file mode 100644 index 00000000..2e121596 --- /dev/null +++ b/reactor/reactor-grpc-stub/src/main/java/com/salesforce/reactorgrpc/GrpcRetry.java @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2017, salesforce.com, inc. + * All rights reserved. + * Licensed under the BSD 3-Clause license. + * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause + */ + +package com.salesforce.reactorgrpc; + +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.time.Duration; +import java.util.function.Function; + +/** + * TODO. + */ +public final class GrpcRetry { + private GrpcRetry() { } + + /** + * TODO. + */ + public static final class OneToMany { + private OneToMany() { + } + + public static Function, Flux> retryWhen(final Function, Flux> operation, final Function, ? extends Publisher> whenFactory) { + return request -> Flux.defer(() -> operation.apply(request)).retryWhen(whenFactory); + } + + public static Function, Flux> retryAfter(final Function, Flux> operation, final Duration delay) { + return retryWhen(operation, errors -> errors.delayElements(delay)); + } + + public static Function, Flux> retryImmediately(final Function, Flux> operation) { + return retryWhen(operation, errors -> errors); + } + } + + /** + * TODO. + */ + public static final class ManyToMany { + private ManyToMany() { + } + + public static Function, Flux> retryWhen(final Function, Flux> operation, final Function, ? extends Publisher> whenFactory) { + return request -> Flux.defer(() -> operation.apply(request)).retryWhen(whenFactory); + } + + public static Function, Flux> retryAfter(final Function, Flux> operation, final Duration delay) { + return retryWhen(operation, errors -> errors.delayElements(delay)); + } + + public static Function, Flux> retryImmediately(final Function, Flux> operation) { + return retryWhen(operation, errors -> errors); + } + } + + /** + * TODO. + */ + public static final class ManyToOne { + private ManyToOne() { + } + + public static Function, Mono> retryWhen(final Function, Mono> operation, final Function, ? extends Publisher> whenFactory) { + return request -> Mono.defer(() -> operation.apply(request)).retryWhen(whenFactory); + } + + public static Function, Mono> retryAfter(final Function, Mono> operation, final Duration delay) { + return retryWhen(operation, errors -> errors.delayElements(delay)); + } + + public static Function, Mono> retryImmediately(final Function, Mono> operation) { + return retryWhen(operation, errors -> errors); + } + } +} diff --git a/reactor/reactor-grpc-stub/src/test/java/com/salesforce/reactorgrpc/stub/GrpcRetryTest.java b/reactor/reactor-grpc-stub/src/test/java/com/salesforce/reactorgrpc/stub/GrpcRetryTest.java new file mode 100644 index 00000000..21135474 --- /dev/null +++ b/reactor/reactor-grpc-stub/src/test/java/com/salesforce/reactorgrpc/stub/GrpcRetryTest.java @@ -0,0 +1,167 @@ +package com.salesforce.reactorgrpc.stub; + +import com.salesforce.reactorgrpc.GrpcRetry; +import org.junit.Test; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; +import reactor.core.publisher.Mono; +import reactor.core.publisher.MonoSink; +import reactor.retry.Retry; +import reactor.test.StepVerifier; + +import java.time.Duration; +import java.util.function.Consumer; +import java.util.function.Function; + +@SuppressWarnings("Duplicates") +public class GrpcRetryTest { + private Flux newThreeErrorFlux() { + return Flux.create(new Consumer>() { + int count = 3; + @Override + public void accept(FluxSink emitter) { + if (count > 0) { + emitter.error(new Throwable("Not yet!")); + count--; + } else { + emitter.next(0); + emitter.complete(); + } + } + }, FluxSink.OverflowStrategy.BUFFER); + } + + private Mono newThreeErrorMono() { + return Mono.create(new Consumer>() { + int count = 3; + @Override + public void accept(MonoSink emitter){ + if (count > 0) { + emitter.error(new Throwable("Not yet!")); + count--; + } else { + emitter.success(0); + } + } + }); + } + + @Test + public void noRetryMakesErrorFlowabable() { + Flux test = newThreeErrorFlux() + .as(flux -> flux); + + StepVerifier.create(test) + .expectErrorMessage("Not yet!") + .verify(Duration.ofSeconds(1)); + } + + @Test + public void noRetryMakesErrorSingle() { + Mono test = newThreeErrorMono() + .as(mono -> mono); + + StepVerifier.create(test) + .expectErrorMessage("Not yet!") + .verify(Duration.ofSeconds(1)); + } + + @Test + public void oneToManyRetryWhen() { + Flux test = newThreeErrorMono() + .>as(GrpcRetry.OneToMany.retryWhen(Mono::flux, Retry.any().retryMax(4))); + + StepVerifier.create(test) + .expectNext(0) + .expectComplete() + .verify(Duration.ofSeconds(1)); + } + + @Test + public void oneToManyRetryImmediately() { + Flux test = newThreeErrorMono() + .>as(GrpcRetry.OneToMany.retryImmediately(Mono::flux)); + + StepVerifier.create(test) + .expectNext(0) + .expectComplete() + .verify(Duration.ofSeconds(1)); + } + + @Test + public void oneToManyRetryAfter() { + Flux test = newThreeErrorMono() + .>as(GrpcRetry.OneToMany.retryAfter(Mono::flux, Duration.ofMillis(10))); + + StepVerifier.create(test) + .expectNext(0) + .expectComplete() + .verify(Duration.ofSeconds(1)); + } + + @Test + public void manyToManyRetryWhen() { + Flux test = newThreeErrorFlux() + .>as(GrpcRetry.ManyToMany.retryWhen(Function.identity(), Retry.any().retryMax(4))); + + StepVerifier.create(test) + .expectNext(0) + .expectComplete() + .verify(Duration.ofSeconds(1)); + } + + @Test + public void manyToManyRetryImmediately() { + Flux test = newThreeErrorFlux() + .>as(GrpcRetry.ManyToMany.retryImmediately(Function.identity())); + + StepVerifier.create(test) + .expectNext(0) + .expectComplete() + .verify(Duration.ofSeconds(1)); + } + + @Test + public void manyToManyRetryAfter() { + Flux test = newThreeErrorFlux() + .>as(GrpcRetry.ManyToMany.retryAfter(Function.identity(), Duration.ofMillis(10))); + + StepVerifier.create(test) + .expectNext(0) + .expectComplete() + .verify(Duration.ofSeconds(1)); + } + + @Test + public void manyToOneRetryWhen() { + Mono test = newThreeErrorFlux() + .>as(GrpcRetry.ManyToOne.retryWhen(Flux::single, Retry.any().retryMax(4))); + + StepVerifier.create(test) + .expectNext(0) + .expectComplete() + .verify(Duration.ofSeconds(1)); + } + + @Test + public void manyToOneRetryImmediately() { + Mono test = newThreeErrorFlux() + .>as(GrpcRetry.ManyToOne.retryImmediately(Flux::single)); + + StepVerifier.create(test) + .expectNext(0) + .expectComplete() + .verify(Duration.ofSeconds(1)); + } + + @Test + public void manyToOneRetryAfter() { + Mono test = newThreeErrorFlux() + .>as(GrpcRetry.ManyToOne.retryAfter(Flux::single, Duration.ofMillis(10))); + + StepVerifier.create(test) + .expectNext(0) + .expectComplete() + .verify(Duration.ofSeconds(1)); + } +} diff --git a/rx-java/rxgrpc-stub/pom.xml b/rx-java/rxgrpc-stub/pom.xml index f47697ee..08bd2601 100644 --- a/rx-java/rxgrpc-stub/pom.xml +++ b/rx-java/rxgrpc-stub/pom.xml @@ -36,6 +36,12 @@ rxjava ${rxjava.version} + + com.github.davidmoten + rxjava2-extras + 0.1.26 + test + junit junit diff --git a/rx-java/rxgrpc-stub/src/main/java/com/salesforce/rxgrpc/stub/GrpcContextOnScheduleHook.java b/rx-java/rxgrpc-stub/src/main/java/com/salesforce/rxgrpc/GrpcContextOnScheduleHook.java similarity index 95% rename from rx-java/rxgrpc-stub/src/main/java/com/salesforce/rxgrpc/stub/GrpcContextOnScheduleHook.java rename to rx-java/rxgrpc-stub/src/main/java/com/salesforce/rxgrpc/GrpcContextOnScheduleHook.java index 56f37228..664a8b0d 100644 --- a/rx-java/rxgrpc-stub/src/main/java/com/salesforce/rxgrpc/stub/GrpcContextOnScheduleHook.java +++ b/rx-java/rxgrpc-stub/src/main/java/com/salesforce/rxgrpc/GrpcContextOnScheduleHook.java @@ -5,7 +5,7 @@ * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause */ -package com.salesforce.rxgrpc.stub; +package com.salesforce.rxgrpc; import io.grpc.Context; import io.reactivex.functions.Function; diff --git a/rx-java/rxgrpc-stub/src/main/java/com/salesforce/rxgrpc/GrpcRetry.java b/rx-java/rxgrpc-stub/src/main/java/com/salesforce/rxgrpc/GrpcRetry.java new file mode 100644 index 00000000..1e59c422 --- /dev/null +++ b/rx-java/rxgrpc-stub/src/main/java/com/salesforce/rxgrpc/GrpcRetry.java @@ -0,0 +1,154 @@ +/* + * Copyright (c) 2017, salesforce.com, inc. + * All rights reserved. + * Licensed under the BSD 3-Clause license. + * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause + */ + +package com.salesforce.rxgrpc; + +import io.reactivex.*; +import io.reactivex.functions.Function; +import org.reactivestreams.Publisher; + +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +/** + * TODO. + */ +public final class GrpcRetry { + private GrpcRetry() { } + + /** + * TODO. + */ + public static final class OneToMany { + private OneToMany() { } + + public static SingleConverter> retryWhen(final Function, Flowable> operation, final Function, ? extends Publisher> handler) { + return new SingleConverter>() { + @Override + public Flowable apply(final Single request) { + return Flowable.defer(new Callable>() { + @Override + public Publisher call() throws Exception { + return operation.apply(request); + } + }).retryWhen(handler); + } + }; + } + + public static SingleConverter> retryAfter(final Function, Flowable> operation, final int delay, final TimeUnit unit) { + return retryWhen(operation, new Function, Publisher>() { + @Override + public Publisher apply(Flowable errors) { + return errors.flatMap(new Function>() { + @Override + public Publisher apply(Throwable error) { + return Flowable.timer(delay, unit); + } + }); + } + }); + } + + public static SingleConverter> retryImmediately(final Function, Flowable> operation) { + return retryWhen(operation, new Function, Publisher>() { + @Override + public Publisher apply(Flowable errors) { + return errors; + } + }); + } + } + + /** + * TODO. + */ + public static final class ManyToMany { + private ManyToMany() { } + + public static FlowableConverter> retryWhen(final Function, Flowable> operation, final Function, ? extends Publisher> handler) { + return new FlowableConverter>() { + @Override + public Flowable apply(final Flowable request) { + return Flowable.defer(new Callable>() { + @Override + public Publisher call() throws Exception { + return operation.apply(request); + } + }).retryWhen(handler); + } + }; + } + + public static FlowableConverter> retryAfter(final Function, Flowable> operation, final int delay, final TimeUnit unit) { + return retryWhen(operation, new Function, Publisher>() { + @Override + public Publisher apply(Flowable errors) { + return errors.flatMap(new Function>() { + @Override + public Publisher apply(Throwable error) { + return Flowable.timer(delay, unit); + } + }); + } + }); + } + + public static FlowableConverter> retryImmediately(final Function, Flowable> operation) { + return retryWhen(operation, new Function, Publisher>() { + @Override + public Publisher apply(Flowable errors) { + return errors; + } + }); + } + } + + /** + * TODO. + */ + public static final class ManyToOne { + private ManyToOne() { } + + public static FlowableConverter> retryWhen(final Function, Single> operation, final Function, ? extends Publisher> handler) { + return new FlowableConverter>() { + @Override + public Single apply(final Flowable request) { + return Single.defer(new Callable>() { + @Override + public SingleSource call() throws Exception { + return operation.apply(request); + } + }).retryWhen(handler); + } + }; + } + + public static FlowableConverter> retryAfter(final Function, Single> operation, final int delay, final TimeUnit unit) { + return retryWhen(operation, new Function, Publisher>() { + @Override + public Publisher apply(Flowable errors) { + return errors.flatMap(new Function>() { + @Override + public Publisher apply(Throwable error) { + return Flowable.timer(delay, unit); + } + }); + } + }); + } + + public static FlowableConverter> retryImmediately(final Function, Single> operation) { + return retryWhen(operation, new Function, Publisher>() { + @Override + public Publisher apply(Flowable errors) { + return errors; + } + }); + } + } +} diff --git a/rx-java/rxgrpc-stub/src/test/java/com/salesforce/rxgrpc/stub/GrpcContextOnScheduleHookTest.java b/rx-java/rxgrpc-stub/src/test/java/com/salesforce/rxgrpc/stub/GrpcContextOnScheduleHookTest.java index 32ecf26f..e4a695bf 100644 --- a/rx-java/rxgrpc-stub/src/test/java/com/salesforce/rxgrpc/stub/GrpcContextOnScheduleHookTest.java +++ b/rx-java/rxgrpc-stub/src/test/java/com/salesforce/rxgrpc/stub/GrpcContextOnScheduleHookTest.java @@ -7,6 +7,7 @@ package com.salesforce.rxgrpc.stub; +import com.salesforce.rxgrpc.GrpcContextOnScheduleHook; import io.grpc.Context; import io.reactivex.Observable; import io.reactivex.functions.Action; diff --git a/rx-java/rxgrpc-stub/src/test/java/com/salesforce/rxgrpc/stub/GrpcRetryTest.java b/rx-java/rxgrpc-stub/src/test/java/com/salesforce/rxgrpc/stub/GrpcRetryTest.java new file mode 100644 index 00000000..62399149 --- /dev/null +++ b/rx-java/rxgrpc-stub/src/test/java/com/salesforce/rxgrpc/stub/GrpcRetryTest.java @@ -0,0 +1,228 @@ +package com.salesforce.rxgrpc.stub; + +import com.github.davidmoten.rx2.RetryWhen; +import com.salesforce.rxgrpc.GrpcRetry; +import io.reactivex.*; +import io.reactivex.functions.Function; +import io.reactivex.observers.TestObserver; +import io.reactivex.subscribers.TestSubscriber; +import org.junit.Test; + +import java.util.concurrent.TimeUnit; + +@SuppressWarnings("Duplicates") +public class GrpcRetryTest { + private Flowable newThreeErrorFlowable() { + return Flowable.create(new FlowableOnSubscribe() { + int count = 3; + @Override + public void subscribe(FlowableEmitter emitter) throws Exception { + if (count > 0) { + emitter.onError(new Throwable("Not yet!")); + count--; + } else { + emitter.onNext(0); + emitter.onComplete(); + } + } + }, BackpressureStrategy.BUFFER); + } + + private Single newThreeErrorSingle() { + return Single.create(new SingleOnSubscribe() { + int count = 3; + @Override + public void subscribe(SingleEmitter emitter) throws Exception { + if (count > 0) { + emitter.onError(new Throwable("Not yet!")); + count--; + } else { + emitter.onSuccess(0); + } + } + }); + } + + @Test + public void noRetryMakesErrorFlowabable() { + TestSubscriber test = newThreeErrorFlowable() + .as(new FlowableConverter>() { + @Override + public Flowable apply(Flowable flowable) { + return flowable; + } + }) + .test(); + + test.awaitTerminalEvent(1, TimeUnit.SECONDS); + test.assertErrorMessage("Not yet!"); + } + + @Test + public void noRetryMakesErrorSingle() { + TestObserver test = newThreeErrorSingle() + .as(new SingleConverter>() { + @Override + public Single apply(Single single) { + return single; + } + }) + .test(); + + test.awaitTerminalEvent(1, TimeUnit.SECONDS); + test.assertErrorMessage("Not yet!"); + } + + @Test + public void oneToManyRetryWhen() { + TestSubscriber test = newThreeErrorSingle() + .as(GrpcRetry.OneToMany.retryWhen(new Function, Flowable>() { + @Override + public Flowable apply(Single single) { + return single.toFlowable(); + } + }, RetryWhen.maxRetries(3).build())) + .test(); + + test.awaitTerminalEvent(1, TimeUnit.SECONDS); + test.assertValues(0); + test.assertNoErrors(); + test.assertComplete(); + } + + @Test + public void oneToManyRetryImmediately() { + TestSubscriber test = newThreeErrorSingle() + .as(GrpcRetry.OneToMany.retryImmediately(new Function, Flowable>() { + @Override + public Flowable apply(Single single) { + return single.toFlowable(); + } + })) + .test(); + + test.awaitTerminalEvent(1, TimeUnit.SECONDS); + test.assertValues(0); + test.assertNoErrors(); + test.assertComplete(); + } + + @Test + public void oneToManyRetryAfter() { + TestSubscriber test = newThreeErrorSingle() + .as(GrpcRetry.OneToMany.retryAfter(new Function, Flowable>() { + @Override + public Flowable apply(Single single) { + return single.toFlowable(); + } + }, 10, TimeUnit.MILLISECONDS)) + .test(); + + test.awaitTerminalEvent(1, TimeUnit.SECONDS); + test.assertValues(0); + test.assertNoErrors(); + test.assertComplete(); + } + + @Test + public void manyToManyRetryWhen() { + TestSubscriber test = newThreeErrorFlowable() + .as(GrpcRetry.ManyToMany.retryWhen(new Function, Flowable>() { + @Override + public Flowable apply(Flowable flowable) { + return flowable; + } + }, RetryWhen.maxRetries(3).build())) + .test(); + + test.awaitTerminalEvent(1, TimeUnit.SECONDS); + test.assertValues(0); + test.assertNoErrors(); + test.assertComplete(); + } + + @Test + public void manyToManyRetryImmediately() { + TestSubscriber test = newThreeErrorFlowable() + .as(GrpcRetry.ManyToMany.retryImmediately(new Function, Flowable>() { + @Override + public Flowable apply(Flowable flowable) { + return flowable; + } + })) + .test(); + + test.awaitTerminalEvent(1, TimeUnit.SECONDS); + test.assertValues(0); + test.assertNoErrors(); + test.assertComplete(); + } + + @Test + public void manyToManyRetryAfter() { + TestSubscriber test = newThreeErrorFlowable() + .as(GrpcRetry.ManyToMany.retryAfter(new Function, Flowable>() { + @Override + public Flowable apply(Flowable flowable) { + return flowable; + } + }, 10, TimeUnit.MILLISECONDS)) + .test(); + + test.awaitTerminalEvent(1, TimeUnit.SECONDS); + test.assertValues(0); + test.assertNoErrors(); + test.assertComplete(); + } + + @Test + public void manyToOneRetryWhen() { + TestObserver test = newThreeErrorFlowable() + .as(GrpcRetry.ManyToOne.retryWhen(new Function, Single>() { + @Override + public Single apply(Flowable flowable) { + return flowable.singleOrError(); + } + }, RetryWhen.maxRetries(3).build())) + .test(); + + test.awaitTerminalEvent(1, TimeUnit.SECONDS); + test.assertValues(0); + test.assertNoErrors(); + test.assertComplete(); + } + + @Test + public void manyToOneRetryImmediately() { + TestObserver test = newThreeErrorFlowable() + .as(GrpcRetry.ManyToOne.retryImmediately(new Function, Single>() { + @Override + public Single apply(Flowable flowable) { + return flowable.singleOrError(); + } + })) + .test(); + + test.awaitTerminalEvent(1, TimeUnit.SECONDS); + test.assertValues(0); + test.assertNoErrors(); + test.assertComplete(); + } + + @Test + public void manyToOneRetryAfter() { + TestObserver test = newThreeErrorFlowable() + .as(GrpcRetry.ManyToOne.retryAfter(new Function, Single>() { + @Override + public Single apply(Flowable flowable) { + return flowable.singleOrError(); + } + }, 10, TimeUnit.MILLISECONDS)) + .test(); + + test.awaitTerminalEvent(1, TimeUnit.SECONDS); + test.assertValues(0); + test.assertNoErrors(); + test.assertComplete(); + } +} From 0106fe07468b013bc773bbc690798795bc42ad8e Mon Sep 17 00:00:00 2001 From: Ryan Michela Date: Tue, 10 Jul 2018 12:43:14 -0700 Subject: [PATCH 2/4] GrpcRetry javadoc --- .../com/salesforce/reactorgrpc/GrpcRetry.java | 95 +++++++++++++++++- .../java/com/salesforce/rxgrpc/GrpcRetry.java | 98 ++++++++++++++++++- 2 files changed, 185 insertions(+), 8 deletions(-) diff --git a/reactor/reactor-grpc-stub/src/main/java/com/salesforce/reactorgrpc/GrpcRetry.java b/reactor/reactor-grpc-stub/src/main/java/com/salesforce/reactorgrpc/GrpcRetry.java index 2e121596..a90b7e5f 100644 --- a/reactor/reactor-grpc-stub/src/main/java/com/salesforce/reactorgrpc/GrpcRetry.java +++ b/reactor/reactor-grpc-stub/src/main/java/com/salesforce/reactorgrpc/GrpcRetry.java @@ -15,66 +15,153 @@ import java.util.function.Function; /** - * TODO. + * {@code GrpcRetry} is used to transparently re-establish a streaming gRPC request in the event of a server error. + *

+ * During a retry, the upstream rx pipeline is re-subscribed to acquire a request message and the RPC call re-issued. + * The downstream rx pipeline never sees the error. */ public final class GrpcRetry { private GrpcRetry() { } /** - * TODO. + * {@link GrpcRetry} functions for streaming response gRPC operations. */ public static final class OneToMany { private OneToMany() { } + /** + * Retries a streaming gRPC call, using the same semantics as {@link Flux#retryWhen(Function)}. + * + * For easier use, use the Retry builder from + * Reactor Extras. + * + * @param operation the gRPC operation to retry, typically from a generated reactive-grpc stub class + * @param whenFactory receives a Publisher of notifications with which a user can complete or error, aborting the retry + * @param + * @param + * + * @see Flux#retryWhen(Function) + */ public static Function, Flux> retryWhen(final Function, Flux> operation, final Function, ? extends Publisher> whenFactory) { return request -> Flux.defer(() -> operation.apply(request)).retryWhen(whenFactory); } + /** + * Retries a streaming gRPC call with a fixed delay between retries. + * + * @param operation the gRPC operation to retry, typically from a generated reactive-grpc stub class + * @param delay the delay between retries + * @param + * @param + */ public static Function, Flux> retryAfter(final Function, Flux> operation, final Duration delay) { return retryWhen(operation, errors -> errors.delayElements(delay)); } + /** + * Retries a streaming gRPC call immediately. + * + * @param operation the gRPC operation to retry, typically from a generated reactive-grpc stub class + * @param + * @param + */ public static Function, Flux> retryImmediately(final Function, Flux> operation) { return retryWhen(operation, errors -> errors); } } /** - * TODO. + * {@link GrpcRetry} functions for bi-directional streaming gRPC operations. */ public static final class ManyToMany { private ManyToMany() { } + /** + * Retries a streaming gRPC call, using the same semantics as {@link Flux#retryWhen(Function)}. + * + * For easier use, use the Retry builder from + * Reactor Extras. + * + * @param operation the gRPC operation to retry, typically from a generated reactive-grpc stub class + * @param whenFactory receives a Publisher of notifications with which a user can complete or error, aborting the retry + * @param + * @param + * + * @see Flux#retryWhen(Function) + */ public static Function, Flux> retryWhen(final Function, Flux> operation, final Function, ? extends Publisher> whenFactory) { return request -> Flux.defer(() -> operation.apply(request)).retryWhen(whenFactory); } + /** + * Retries a streaming gRPC call with a fixed delay between retries. + * + * @param operation the gRPC operation to retry, typically from a generated reactive-grpc stub class + * @param delay the delay between retries + * @param + * @param + */ public static Function, Flux> retryAfter(final Function, Flux> operation, final Duration delay) { return retryWhen(operation, errors -> errors.delayElements(delay)); } + /** + * Retries a streaming gRPC call immediately. + * + * @param operation the gRPC operation to retry, typically from a generated reactive-grpc stub class + * @param + * @param + */ public static Function, Flux> retryImmediately(final Function, Flux> operation) { return retryWhen(operation, errors -> errors); } } /** - * TODO. + * {@link GrpcRetry} functions for streaming request gRPC operations. */ public static final class ManyToOne { private ManyToOne() { } + /** + * Retries a streaming gRPC call, using the same semantics as {@link Flux#retryWhen(Function)}. + * + * For easier use, use the Retry builder from + * Reactor Extras. + * + * @param operation the gRPC operation to retry, typically from a generated reactive-grpc stub class + * @param whenFactory receives a Publisher of notifications with which a user can complete or error, aborting the retry + * @param + * @param + * + * @see Flux#retryWhen(Function) + */ public static Function, Mono> retryWhen(final Function, Mono> operation, final Function, ? extends Publisher> whenFactory) { return request -> Mono.defer(() -> operation.apply(request)).retryWhen(whenFactory); } + /** + * Retries a streaming gRPC call with a fixed delay between retries. + * + * @param operation the gRPC operation to retry, typically from a generated reactive-grpc stub class + * @param delay the delay between retries + * @param + * @param + */ public static Function, Mono> retryAfter(final Function, Mono> operation, final Duration delay) { return retryWhen(operation, errors -> errors.delayElements(delay)); } + /** + * Retries a streaming gRPC call immediately. + * + * @param operation the gRPC operation to retry, typically from a generated reactive-grpc stub class + * @param + * @param + */ public static Function, Mono> retryImmediately(final Function, Mono> operation) { return retryWhen(operation, errors -> errors); } diff --git a/rx-java/rxgrpc-stub/src/main/java/com/salesforce/rxgrpc/GrpcRetry.java b/rx-java/rxgrpc-stub/src/main/java/com/salesforce/rxgrpc/GrpcRetry.java index 1e59c422..6caf100d 100644 --- a/rx-java/rxgrpc-stub/src/main/java/com/salesforce/rxgrpc/GrpcRetry.java +++ b/rx-java/rxgrpc-stub/src/main/java/com/salesforce/rxgrpc/GrpcRetry.java @@ -15,17 +15,33 @@ import java.util.concurrent.TimeUnit; /** - * TODO. + * {@code GrpcRetry} is used to transparently re-establish a streaming gRPC request in the event of a server error. + *

+ * During a retry, the upstream rx pipeline is re-subscribed to acquire a request message and the RPC call re-issued. + * The downstream rx pipeline never sees the error. */ public final class GrpcRetry { private GrpcRetry() { } /** - * TODO. + * {@link GrpcRetry} functions for streaming response gRPC operations. */ public static final class OneToMany { private OneToMany() { } + /** + * Retries a streaming gRPC call, using the same semantics as {@link Flowable#retryWhen(Function)}. + * + * For easier use, use the RetryWhen builder from + * RxJava2 Extras. + * + * @param operation the gRPC operation to retry, typically from a generated reactive-grpc stub class + * @param handler receives a Publisher of notifications with which a user can complete or error, aborting the retry + * @param + * @param + * + * @see Flowable#retryWhen(Function) + */ public static SingleConverter> retryWhen(final Function, Flowable> operation, final Function, ? extends Publisher> handler) { return new SingleConverter>() { @Override @@ -40,6 +56,15 @@ public Publisher call() throws Exception { }; } + /** + * Retries a streaming gRPC call with a fixed delay between retries. + * + * @param operation the gRPC operation to retry, typically from a generated reactive-grpc stub class + * @param delay the delay between retries + * @param unit the units to use for {@code delay} + * @param + * @param + */ public static SingleConverter> retryAfter(final Function, Flowable> operation, final int delay, final TimeUnit unit) { return retryWhen(operation, new Function, Publisher>() { @Override @@ -54,6 +79,13 @@ public Publisher apply(Throwable error) { }); } + /** + * Retries a streaming gRPC call immediately. + * + * @param operation the gRPC operation to retry, typically from a generated reactive-grpc stub class + * @param + * @param + */ public static SingleConverter> retryImmediately(final Function, Flowable> operation) { return retryWhen(operation, new Function, Publisher>() { @Override @@ -65,11 +97,24 @@ public Publisher apply(Flowable errors) { } /** - * TODO. + * {@link GrpcRetry} functions for bi-directional streaming gRPC operations. */ public static final class ManyToMany { private ManyToMany() { } + /** + * Retries a streaming gRPC call, using the same semantics as {@link Flowable#retryWhen(Function)}. + * + * For easier use, use the RetryWhen builder from + * RxJava2 Extras. + * + * @param operation the gRPC operation to retry, typically from a generated reactive-grpc stub class + * @param handler receives a Publisher of notifications with which a user can complete or error, aborting the retry + * @param + * @param + * + * @see Flowable#retryWhen(Function) + */ public static FlowableConverter> retryWhen(final Function, Flowable> operation, final Function, ? extends Publisher> handler) { return new FlowableConverter>() { @Override @@ -84,6 +129,15 @@ public Publisher call() throws Exception { }; } + /** + * Retries a streaming gRPC call with a fixed delay between retries. + * + * @param operation the gRPC operation to retry, typically from a generated reactive-grpc stub class + * @param delay the delay between retries + * @param unit the units to use for {@code delay} + * @param + * @param + */ public static FlowableConverter> retryAfter(final Function, Flowable> operation, final int delay, final TimeUnit unit) { return retryWhen(operation, new Function, Publisher>() { @Override @@ -98,6 +152,13 @@ public Publisher apply(Throwable error) { }); } + /** + * Retries a streaming gRPC call immediately. + * + * @param operation the gRPC operation to retry, typically from a generated reactive-grpc stub class + * @param + * @param + */ public static FlowableConverter> retryImmediately(final Function, Flowable> operation) { return retryWhen(operation, new Function, Publisher>() { @Override @@ -109,11 +170,24 @@ public Publisher apply(Flowable errors) { } /** - * TODO. + * {@link GrpcRetry} functions for streaming request gRPC operations. */ public static final class ManyToOne { private ManyToOne() { } + /** + * Retries a streaming gRPC call, using the same semantics as {@link Flowable#retryWhen(Function)}. + * + * For easier use, use the RetryWhen builder from + * RxJava2 Extras. + * + * @param operation the gRPC operation to retry, typically from a generated reactive-grpc stub class + * @param handler receives a Publisher of notifications with which a user can complete or error, aborting the retry + * @param + * @param + * + * @see Flowable#retryWhen(Function) + */ public static FlowableConverter> retryWhen(final Function, Single> operation, final Function, ? extends Publisher> handler) { return new FlowableConverter>() { @Override @@ -128,6 +202,15 @@ public SingleSource call() throws Exception { }; } + /** + * Retries a streaming gRPC call with a fixed delay between retries. + * + * @param operation the gRPC operation to retry, typically from a generated reactive-grpc stub class + * @param delay the delay between retries + * @param unit the units to use for {@code delay} + * @param + * @param + */ public static FlowableConverter> retryAfter(final Function, Single> operation, final int delay, final TimeUnit unit) { return retryWhen(operation, new Function, Publisher>() { @Override @@ -142,6 +225,13 @@ public Publisher apply(Throwable error) { }); } + /** + * Retries a streaming gRPC call immediately. + * + * @param operation the gRPC operation to retry, typically from a generated reactive-grpc stub class + * @param + * @param + */ public static FlowableConverter> retryImmediately(final Function, Single> operation) { return retryWhen(operation, new Function, Publisher>() { @Override From d4efd0c2f2f519ceb80c04cfd09cf15793eeb09a Mon Sep 17 00:00:00 2001 From: Ryan Michela Date: Tue, 10 Jul 2018 13:02:42 -0700 Subject: [PATCH 3/4] Prefer compose() for many-to-many services --- .../main/java/com/salesforce/reactorgrpc/GrpcRetry.java | 6 +++--- .../com/salesforce/reactorgrpc/stub/GrpcRetryTest.java | 6 +++--- .../src/main/java/com/salesforce/rxgrpc/GrpcRetry.java | 8 ++++---- .../java/com/salesforce/rxgrpc/stub/GrpcRetryTest.java | 6 +++--- 4 files changed, 13 insertions(+), 13 deletions(-) diff --git a/reactor/reactor-grpc-stub/src/main/java/com/salesforce/reactorgrpc/GrpcRetry.java b/reactor/reactor-grpc-stub/src/main/java/com/salesforce/reactorgrpc/GrpcRetry.java index a90b7e5f..4ad1335c 100644 --- a/reactor/reactor-grpc-stub/src/main/java/com/salesforce/reactorgrpc/GrpcRetry.java +++ b/reactor/reactor-grpc-stub/src/main/java/com/salesforce/reactorgrpc/GrpcRetry.java @@ -91,7 +91,7 @@ private ManyToMany() { * * @see Flux#retryWhen(Function) */ - public static Function, Flux> retryWhen(final Function, Flux> operation, final Function, ? extends Publisher> whenFactory) { + public static Function, ? extends Publisher> retryWhen(final Function, Flux> operation, final Function, ? extends Publisher> whenFactory) { return request -> Flux.defer(() -> operation.apply(request)).retryWhen(whenFactory); } @@ -103,7 +103,7 @@ public static Function, Flux> retryWhen(final Function * @param * @param */ - public static Function, Flux> retryAfter(final Function, Flux> operation, final Duration delay) { + public static Function, ? extends Publisher> retryAfter(final Function, Flux> operation, final Duration delay) { return retryWhen(operation, errors -> errors.delayElements(delay)); } @@ -114,7 +114,7 @@ public static Function, Flux> retryAfter(final Functio * @param * @param */ - public static Function, Flux> retryImmediately(final Function, Flux> operation) { + public static Function, ? extends Publisher> retryImmediately(final Function, Flux> operation) { return retryWhen(operation, errors -> errors); } } diff --git a/reactor/reactor-grpc-stub/src/test/java/com/salesforce/reactorgrpc/stub/GrpcRetryTest.java b/reactor/reactor-grpc-stub/src/test/java/com/salesforce/reactorgrpc/stub/GrpcRetryTest.java index 21135474..f935568c 100644 --- a/reactor/reactor-grpc-stub/src/test/java/com/salesforce/reactorgrpc/stub/GrpcRetryTest.java +++ b/reactor/reactor-grpc-stub/src/test/java/com/salesforce/reactorgrpc/stub/GrpcRetryTest.java @@ -102,7 +102,7 @@ public void oneToManyRetryAfter() { @Test public void manyToManyRetryWhen() { Flux test = newThreeErrorFlux() - .>as(GrpcRetry.ManyToMany.retryWhen(Function.identity(), Retry.any().retryMax(4))); + .compose(GrpcRetry.ManyToMany.retryWhen(Function.identity(), Retry.any().retryMax(4))); StepVerifier.create(test) .expectNext(0) @@ -113,7 +113,7 @@ public void manyToManyRetryWhen() { @Test public void manyToManyRetryImmediately() { Flux test = newThreeErrorFlux() - .>as(GrpcRetry.ManyToMany.retryImmediately(Function.identity())); + .compose(GrpcRetry.ManyToMany.retryImmediately(Function.identity())); StepVerifier.create(test) .expectNext(0) @@ -124,7 +124,7 @@ public void manyToManyRetryImmediately() { @Test public void manyToManyRetryAfter() { Flux test = newThreeErrorFlux() - .>as(GrpcRetry.ManyToMany.retryAfter(Function.identity(), Duration.ofMillis(10))); + .compose(GrpcRetry.ManyToMany.retryAfter(Function.identity(), Duration.ofMillis(10))); StepVerifier.create(test) .expectNext(0) diff --git a/rx-java/rxgrpc-stub/src/main/java/com/salesforce/rxgrpc/GrpcRetry.java b/rx-java/rxgrpc-stub/src/main/java/com/salesforce/rxgrpc/GrpcRetry.java index 6caf100d..5d4a0ebb 100644 --- a/rx-java/rxgrpc-stub/src/main/java/com/salesforce/rxgrpc/GrpcRetry.java +++ b/rx-java/rxgrpc-stub/src/main/java/com/salesforce/rxgrpc/GrpcRetry.java @@ -115,8 +115,8 @@ private ManyToMany() { } * * @see Flowable#retryWhen(Function) */ - public static FlowableConverter> retryWhen(final Function, Flowable> operation, final Function, ? extends Publisher> handler) { - return new FlowableConverter>() { + public static FlowableTransformer retryWhen(final Function, Flowable> operation, final Function, ? extends Publisher> handler) { + return new FlowableTransformer() { @Override public Flowable apply(final Flowable request) { return Flowable.defer(new Callable>() { @@ -138,7 +138,7 @@ public Publisher call() throws Exception { * @param * @param */ - public static FlowableConverter> retryAfter(final Function, Flowable> operation, final int delay, final TimeUnit unit) { + public static FlowableTransformer retryAfter(final Function, Flowable> operation, final int delay, final TimeUnit unit) { return retryWhen(operation, new Function, Publisher>() { @Override public Publisher apply(Flowable errors) { @@ -159,7 +159,7 @@ public Publisher apply(Throwable error) { * @param * @param */ - public static FlowableConverter> retryImmediately(final Function, Flowable> operation) { + public static FlowableTransformer retryImmediately(final Function, Flowable> operation) { return retryWhen(operation, new Function, Publisher>() { @Override public Publisher apply(Flowable errors) { diff --git a/rx-java/rxgrpc-stub/src/test/java/com/salesforce/rxgrpc/stub/GrpcRetryTest.java b/rx-java/rxgrpc-stub/src/test/java/com/salesforce/rxgrpc/stub/GrpcRetryTest.java index 62399149..ba6cf451 100644 --- a/rx-java/rxgrpc-stub/src/test/java/com/salesforce/rxgrpc/stub/GrpcRetryTest.java +++ b/rx-java/rxgrpc-stub/src/test/java/com/salesforce/rxgrpc/stub/GrpcRetryTest.java @@ -127,7 +127,7 @@ public Flowable apply(Single single) { @Test public void manyToManyRetryWhen() { TestSubscriber test = newThreeErrorFlowable() - .as(GrpcRetry.ManyToMany.retryWhen(new Function, Flowable>() { + .compose(GrpcRetry.ManyToMany.retryWhen(new Function, Flowable>() { @Override public Flowable apply(Flowable flowable) { return flowable; @@ -144,7 +144,7 @@ public Flowable apply(Flowable flowable) { @Test public void manyToManyRetryImmediately() { TestSubscriber test = newThreeErrorFlowable() - .as(GrpcRetry.ManyToMany.retryImmediately(new Function, Flowable>() { + .compose(GrpcRetry.ManyToMany.retryImmediately(new Function, Flowable>() { @Override public Flowable apply(Flowable flowable) { return flowable; @@ -161,7 +161,7 @@ public Flowable apply(Flowable flowable) { @Test public void manyToManyRetryAfter() { TestSubscriber test = newThreeErrorFlowable() - .as(GrpcRetry.ManyToMany.retryAfter(new Function, Flowable>() { + .compose(GrpcRetry.ManyToMany.retryAfter(new Function, Flowable>() { @Override public Flowable apply(Flowable flowable) { return flowable; From 8267053cb3c7ed5c63b44c9b92680839c184fafa Mon Sep 17 00:00:00 2001 From: Ryan Michela Date: Tue, 10 Jul 2018 14:32:41 -0700 Subject: [PATCH 4/4] Documentation --- reactor/README.md | 27 +++++++++++++++++++++++++++ rx-java/README.md | 11 +++++++++++ 2 files changed, 38 insertions(+) diff --git a/reactor/README.md b/reactor/README.md index 299e9a65..4571e978 100644 --- a/reactor/README.md +++ b/reactor/README.md @@ -86,6 +86,33 @@ After installing the plugin, Reactor-gRPC service stubs will be generated along resp.subscribe(...); ``` +## Don't break the chain +Used on their own, the generated RxGrpc stub methods do not cleanly chain with other RxJava operators. +Using the `compose()` and `as()` methods of `Mono` and `Fluz` are preferred over direct invocation. + +#### One→One, Many→Many +```java +Mono monoResponse = monoRequest.compose(stub::sayHello); +Flux fluxResponse = fluxRequest.compose(stub::sayHelloBothStream); +``` + +#### One→Many, Many→One +```java +Mono monoResponse = fluxRequest.as(stub::sayHelloRequestStream); +Flux fluxResponse = monoRequest.as(stub::sayHelloResponseStream); +``` + +## Retrying streaming requests +`GrpcRetry` is used to transparently re-establish a streaming gRPC request in the event of a server error. During a +retry, the upstream rx pipeline is re-subscribed to acquire a request message and the RPC call re-issued. The downstream +rx pipeline never sees the error. + +```java +Flux fluxResponse = fluxRequest.compose(GrpcRetry.ManyToMany.retry(stub::sayHelloBothStream)); +``` + +For complex retry scenarios, use the `Retry` builder from Reactor Extras. + Modules ======= diff --git a/rx-java/README.md b/rx-java/README.md index 7e4afbcb..0156f726 100644 --- a/rx-java/README.md +++ b/rx-java/README.md @@ -102,6 +102,17 @@ Single singleResponse = flowableRequest.as(stub::sayHelloRequestS Flowable flowableResponse = singleRequest.as(stub::sayHelloResponseStream); ``` +## Retrying streaming requests +`GrpcRetry` is used to transparently re-establish a streaming gRPC request in the event of a server error. During a +retry, the upstream rx pipeline is re-subscribed to acquire a request message and the RPC call re-issued. The downstream +rx pipeline never sees the error. + +```java +Flowable flowableResponse = flowableRequest.compose(GrpcRetry.ManyToMany.retry(stub::sayHelloBothStream)); +``` + +For complex retry scenarios, use the `RetryWhen` builder from RxJava2 Extras. + ## gRPC Context propagation Because the non-blocking nature of RX, RX-Java tends to switch between threads a lot. GRPC stores its context in the Thread context and is therefore often lost when RX