Skip to content

Commit

Permalink
Merge pull request #105 from salesforce/feature/retry
Browse files Browse the repository at this point in the history
Add gRPC request retry helpers
  • Loading branch information
rmichela authored Jul 10, 2018
2 parents d494495 + 8267053 commit 6eb0f71
Show file tree
Hide file tree
Showing 14 changed files with 897 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.grpc.ManagedChannelBuilder;
import io.reactivex.Observable;
import io.reactivex.Single;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.disposables.Disposable;
import jline.console.ConsoleReader;

Expand All @@ -20,6 +21,7 @@
*/
public final class ChatClient {
private static final int PORT = 9999;
private static final CompositeDisposable disposables = new CompositeDisposable();

private ChatClient() { }

Expand All @@ -35,13 +37,13 @@ public static void main(String[] args) throws Exception {
console.println("Type /quit to exit");

// Subscribe to incoming messages
Disposable chatSubscription = Single.just(Empty.getDefaultInstance())
disposables.add(Single.just(Empty.getDefaultInstance())
.as(stub::getMessages)
.filter(message -> !message.getAuthor().equals(author))
.subscribe(message -> printLine(console, message.getAuthor(), message.getMessage()));
.subscribe(message -> printLine(console, message.getAuthor(), message.getMessage())));

// Publish outgoing messages
Observable
disposables.add(Observable
// Send connection message
.just(author + " joined.")
// Send user input
Expand All @@ -54,11 +56,11 @@ public static void main(String[] args) throws Exception {
ChatClient::doNothing,
throwable -> printLine(console, "ERROR", throwable.getMessage()),
done::countDown
);
));

// Wait for a signal to exit, then clean up
done.await();
chatSubscription.dispose();
disposables.dispose();
channel.shutdown();
channel.awaitTermination(1, TimeUnit.SECONDS);
console.getTerminal().restore();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package demo.client.javafx;

import com.google.protobuf.Empty;
import com.salesforce.rxgrpc.stub.GrpcRetry;
import demo.proto.ChatProto;
import demo.proto.RxChatGrpc;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.reactivex.Single;
import io.reactivex.disposables.Disposable;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.rxjavafx.observables.JavaFxObservable;
import io.reactivex.rxjavafx.sources.WindowEventSource;
import javafx.application.Application;
Expand All @@ -33,7 +34,7 @@ public class ChatClient extends Application {
private String author;
private ManagedChannel channel;
private RxChatGrpc.RxChatStub stub;
private Disposable chatSubscription;
private CompositeDisposable disposables = new CompositeDisposable();

public static void main(String[] args) {
launch(args);
Expand All @@ -54,34 +55,52 @@ public void start(Stage primaryStage) {
Scene scene = buildScene();

// Subscribe to incoming messages
chatSubscription = Single.just(Empty.getDefaultInstance())
.as(stub::getMessages)
disposables.add(Single
// Trigger
.just(Empty.getDefaultInstance())
// Invoke and Error handle
.as(GrpcRetry.OneToMany.retryAfter(stub::getMessages, 1, TimeUnit.SECONDS))
.map(this::fromMessage)
.subscribe(messages::appendText);
// Execute
.subscribe(messages::appendText));

// Publish arrival/departure message
WindowEventSource.fromWindowEvents(primaryStage, WindowEvent.ANY)
disposables.add(WindowEventSource
// Trigger
.fromWindowEvents(primaryStage, WindowEvent.ANY)
.filter(event -> event.getEventType().equals(WindowEvent.WINDOW_SHOWING) |
event.getEventType().equals(WindowEvent.WINDOW_HIDING))
// Invoke
.map(event -> event.getEventType().equals(WindowEvent.WINDOW_SHOWING) ? "joined" : "left")
.map(this::toMessage)
.flatMapSingle(stub::postMessage)
.subscribe();
// Error handle
.onErrorReturnItem(Empty.getDefaultInstance())
.repeat()
// Execute
.subscribe());

// Publish outgoing messages
JavaFxObservable.actionEventsOf(send)
disposables.add(JavaFxObservable
// Trigger
.actionEventsOf(send)
// Invoke
.map(x -> message.getText())
.map(this::toMessage)
.flatMapSingle(stub::postMessage)
.subscribe(ignore -> message.clear());
// Error handle
.onErrorReturnItem(Empty.getDefaultInstance())
.repeat()
// Execute
.subscribe(ignore -> message.clear()));

primaryStage.setScene(scene);
primaryStage.show();
}

@Override
public void stop() throws Exception {
chatSubscription.dispose();
disposables.dispose();
channel.shutdown();
channel.awaitTermination(1, TimeUnit.SECONDS);
}
Expand Down

This file was deleted.

2 changes: 1 addition & 1 deletion demos/reactive-grpc-chat/rxjava-chat/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

<properties>
<rxjava.version>2.1.10</rxjava.version>
<reactive.grpc.version>0.8.2</reactive.grpc.version>
<reactive.grpc.version>0.8.3-SNAPSHOT</reactive.grpc.version>
<grpc.contrib.version>0.8.0</grpc.contrib.version>
<grpc.version>1.12.0</grpc.version>
<protoc.version>3.5.1</protoc.version>
Expand Down
27 changes: 27 additions & 0 deletions reactor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,33 @@ After installing the plugin, Reactor-gRPC service stubs will be generated along
resp.subscribe(...);
```

## Don't break the chain
Used on their own, the generated RxGrpc stub methods do not cleanly chain with other RxJava operators.
Using the `compose()` and `as()` methods of `Mono` and `Fluz` are preferred over direct invocation.

#### One→One, Many→Many
```java
Mono<HelloResponse> monoResponse = monoRequest.compose(stub::sayHello);
Flux<HelloResponse> fluxResponse = fluxRequest.compose(stub::sayHelloBothStream);
```

#### One→Many, Many→One
```java
Mono<HelloResponse> monoResponse = fluxRequest.as(stub::sayHelloRequestStream);
Flux<HelloResponse> fluxResponse = monoRequest.as(stub::sayHelloResponseStream);
```

## Retrying streaming requests
`GrpcRetry` is used to transparently re-establish a streaming gRPC request in the event of a server error. During a
retry, the upstream rx pipeline is re-subscribed to acquire a request message and the RPC call re-issued. The downstream
rx pipeline never sees the error.

```java
Flux<HelloResponse> fluxResponse = fluxRequest.compose(GrpcRetry.ManyToMany.retry(stub::sayHelloBothStream));
```

For complex retry scenarios, use the `Retry` builder from <a href="https://github.com/reactor/reactor-addons/blob/master/reactor-extra/src/main/java/reactor/retry/Retry.java">Reactor Extras</a>.

Modules
=======

Expand Down
6 changes: 6 additions & 0 deletions reactor/reactor-grpc-stub/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@
<version>${reactor.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor.addons</groupId>
<artifactId>reactor-extra</artifactId>
<version>${reactor.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
/*
* Copyright (c) 2017, salesforce.com, inc.
* All rights reserved.
* Licensed under the BSD 3-Clause license.
* For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause
*/

package com.salesforce.reactorgrpc;

import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.util.function.Function;

/**
* {@code GrpcRetry} is used to transparently re-establish a streaming gRPC request in the event of a server error.
* <p>
* During a retry, the upstream rx pipeline is re-subscribed to acquire a request message and the RPC call re-issued.
* The downstream rx pipeline never sees the error.
*/
public final class GrpcRetry {
private GrpcRetry() { }

/**
* {@link GrpcRetry} functions for streaming response gRPC operations.
*/
public static final class OneToMany {
private OneToMany() {
}

/**
* Retries a streaming gRPC call, using the same semantics as {@link Flux#retryWhen(Function)}.
*
* For easier use, use the Retry builder from
* <a href="https://github.com/reactor/reactor-addons/blob/master/reactor-extra/src/main/java/reactor/retry/Retry.java">Reactor Extras</a>.
*
* @param operation the gRPC operation to retry, typically from a generated reactive-grpc stub class
* @param whenFactory receives a Publisher of notifications with which a user can complete or error, aborting the retry
* @param <I>
* @param <O>
*
* @see Flux#retryWhen(Function)
*/
public static <I, O> Function<? super Mono<I>, Flux<O>> retryWhen(final Function<Mono<I>, Flux<O>> operation, final Function<Flux<Throwable>, ? extends Publisher<?>> whenFactory) {
return request -> Flux.defer(() -> operation.apply(request)).retryWhen(whenFactory);
}

/**
* Retries a streaming gRPC call with a fixed delay between retries.
*
* @param operation the gRPC operation to retry, typically from a generated reactive-grpc stub class
* @param delay the delay between retries
* @param <I>
* @param <O>
*/
public static <I, O> Function<? super Mono<I>, Flux<O>> retryAfter(final Function<Mono<I>, Flux<O>> operation, final Duration delay) {
return retryWhen(operation, errors -> errors.delayElements(delay));
}

/**
* Retries a streaming gRPC call immediately.
*
* @param operation the gRPC operation to retry, typically from a generated reactive-grpc stub class
* @param <I>
* @param <O>
*/
public static <I, O> Function<? super Mono<I>, Flux<O>> retryImmediately(final Function<Mono<I>, Flux<O>> operation) {
return retryWhen(operation, errors -> errors);
}
}

/**
* {@link GrpcRetry} functions for bi-directional streaming gRPC operations.
*/
public static final class ManyToMany {
private ManyToMany() {
}

/**
* Retries a streaming gRPC call, using the same semantics as {@link Flux#retryWhen(Function)}.
*
* For easier use, use the Retry builder from
* <a href="https://github.com/reactor/reactor-addons/blob/master/reactor-extra/src/main/java/reactor/retry/Retry.java">Reactor Extras</a>.
*
* @param operation the gRPC operation to retry, typically from a generated reactive-grpc stub class
* @param whenFactory receives a Publisher of notifications with which a user can complete or error, aborting the retry
* @param <I>
* @param <O>
*
* @see Flux#retryWhen(Function)
*/
public static <I, O> Function<? super Flux<I>, ? extends Publisher<O>> retryWhen(final Function<Flux<I>, Flux<O>> operation, final Function<Flux<Throwable>, ? extends Publisher<?>> whenFactory) {
return request -> Flux.defer(() -> operation.apply(request)).retryWhen(whenFactory);
}

/**
* Retries a streaming gRPC call with a fixed delay between retries.
*
* @param operation the gRPC operation to retry, typically from a generated reactive-grpc stub class
* @param delay the delay between retries
* @param <I>
* @param <O>
*/
public static <I, O> Function<? super Flux<I>, ? extends Publisher<O>> retryAfter(final Function<Flux<I>, Flux<O>> operation, final Duration delay) {
return retryWhen(operation, errors -> errors.delayElements(delay));
}

/**
* Retries a streaming gRPC call immediately.
*
* @param operation the gRPC operation to retry, typically from a generated reactive-grpc stub class
* @param <I>
* @param <O>
*/
public static <I, O> Function<? super Flux<I>, ? extends Publisher<O>> retryImmediately(final Function<Flux<I>, Flux<O>> operation) {
return retryWhen(operation, errors -> errors);
}
}

/**
* {@link GrpcRetry} functions for streaming request gRPC operations.
*/
public static final class ManyToOne {
private ManyToOne() {
}

/**
* Retries a streaming gRPC call, using the same semantics as {@link Flux#retryWhen(Function)}.
*
* For easier use, use the Retry builder from
* <a href="https://github.com/reactor/reactor-addons/blob/master/reactor-extra/src/main/java/reactor/retry/Retry.java">Reactor Extras</a>.
*
* @param operation the gRPC operation to retry, typically from a generated reactive-grpc stub class
* @param whenFactory receives a Publisher of notifications with which a user can complete or error, aborting the retry
* @param <I>
* @param <O>
*
* @see Flux#retryWhen(Function)
*/
public static <I, O> Function<? super Flux<I>, Mono<O>> retryWhen(final Function<Flux<I>, Mono<O>> operation, final Function<Flux<Throwable>, ? extends Publisher<?>> whenFactory) {
return request -> Mono.defer(() -> operation.apply(request)).retryWhen(whenFactory);
}

/**
* Retries a streaming gRPC call with a fixed delay between retries.
*
* @param operation the gRPC operation to retry, typically from a generated reactive-grpc stub class
* @param delay the delay between retries
* @param <I>
* @param <O>
*/
public static <I, O> Function<? super Flux<I>, Mono<O>> retryAfter(final Function<Flux<I>, Mono<O>> operation, final Duration delay) {
return retryWhen(operation, errors -> errors.delayElements(delay));
}

/**
* Retries a streaming gRPC call immediately.
*
* @param operation the gRPC operation to retry, typically from a generated reactive-grpc stub class
* @param <I>
* @param <O>
*/
public static <I, O> Function<? super Flux<I>, Mono<O>> retryImmediately(final Function<Flux<I>, Mono<O>> operation) {
return retryWhen(operation, errors -> errors);
}
}
}
Loading

0 comments on commit 6eb0f71

Please sign in to comment.