diff --git a/README.md b/README.md index c63fc2ed..010bfdfc 100644 --- a/README.md +++ b/README.md @@ -749,38 +749,39 @@ or [`eventsOn()`](https://docs.fauna.com/fauna/current/reference/reference/schema_entities/set/eventson) to a [supported Set](https://docs.fauna.com/fauna/current/reference/cdc/#sets). -To start and subscribe to an Event Stream, use an event source to create a -[`StreamRequest`](https://fauna.github.io/fauna-jvm/latest/com/fauna/stream/StreamRequest.html) -and pass it to `stream()` or `asyncStream()`: +To start and subscribe to the stream, pass an `EventSource` and related +`StreamOptions` to `stream()` or `asyncStream()`: ```java // Get an event source. Query query = fql("Product.all().eventSource() { name, stock }"); -QuerySuccess eventSourceResponse = client.query(query, StreamTokenResponse.class); -String eventSource = eventSourceResponse.getData().getToken(); +QuerySuccess tokenResponse = client.query(query, EventSourceResponse.class); +EventSource eventSource = EventSource.fromResponse(querySuccess.getData()); -// Create a StreamRequest. -StreamRequest request = new StreamRequest(eventSource); +// Calculate the timestamp for 10 minutes ago in microseconds. +long tenMinutesAgo = System.currentTimeMillis() * 1000 - (10 * 60 * 1000 * 1000); +StreamOptions streamOptions = StreamOptions.builder().startTimestamp(tenMinutesAgo).build(); -// Use stream() when you want to ensure the stream is ready before proceeding -// with other operations, or when working in a synchronous context. -FaunaStream stream = client.stream(request, Product.class); +// Example 1: Using `stream()` +FaunaStream stream = client.stream(eventSource, streamOptions, Product.class); -// Use asyncStream() when you want to start the stream operation without blocking, -// which is useful in asynchronous applications or when you need to perform other -// tasks while waiting for the stream to be established. -CompletableFuture> futureStream = client.asyncStream(request, Product.class); +// Example 2: Using `asyncStream()` +CompletableFuture> futureStream = client.asyncStream(source, streamOptions, Product.class); ``` +If changes occur between the creation of the event source and the stream +request, the stream replays and emits any related events. + Alternatively, you can pass an FQL query that returns an event source to `stream()` or `asyncStream()`: ```java Query query = fql("Product.all().eventSource() { name, stock }"); -// Create and subscribe to a stream in one step. -// stream() example: + +// Example 1: Using `stream()` FaunaStream stream = client.stream(query, Product.class); -// asyncStream() example: + +// Example 2: Using `asyncStream()` CompletableFuture> futureStream = client.asyncStream(query, Product.class); ``` @@ -800,18 +801,23 @@ import java.util.concurrent.atomic.AtomicInteger; import com.fauna.client.Fauna; import com.fauna.client.FaunaClient; +import com.fauna.client.FaunaConfig; +import com.fauna.event.FaunaEvent; import com.fauna.event.FaunaStream; import com.fauna.exception.FaunaException; + import static com.fauna.query.builder.Query.fql; -import com.fauna.event.StreamEvent; // Import the Product class for event data. import org.example.Product; -public class App { +public class EventStreamExample { public static void main(String[] args) throws InterruptedException { try { - FaunaClient client = Fauna.client(); + FaunaConfig config = FaunaConfig.builder() + .secret("FAUNA_SECRET") + .build(); + FaunaClient client = Fauna.client(config); // Create a stream of all products. Project the name and stock. FaunaStream stream = client.stream(fql("Product.all().eventSource() { name, stock }"), Product.class); @@ -830,7 +836,7 @@ public class App { } } - static class ProductSubscriber implements Flow.Subscriber> { + static class ProductSubscriber implements Flow.Subscriber> { private final AtomicInteger eventCount = new AtomicInteger(0); private Flow.Subscription subscription; private final int maxEvents; @@ -848,7 +854,7 @@ public class App { } @Override - public void onNext(StreamEvent event) { + public void onNext(FaunaEvent event) { // Handle each event... int count = eventCount.incrementAndGet(); System.out.println("Received event " + count + ":");