From 964360907fcc6fc80fdf7c99500ada9de4d5a246 Mon Sep 17 00:00:00 2001 From: James Rodewig Date: Tue, 5 Nov 2024 12:07:51 -0500 Subject: [PATCH 1/3] README: Event Streams refactor --- README.md | 50 ++++++++++++++++++++++++++++---------------------- 1 file changed, 28 insertions(+), 22 deletions(-) diff --git a/README.md b/README.md index 79c2d629..69661291 100644 --- a/README.md +++ b/README.md @@ -636,38 +636,39 @@ or [`eventsOn()`](https://docs.fauna.com/fauna/current/reference/reference/schema_entities/set/eventson) to a [supported Set](https://docs.fauna.com/fauna/current/reference/cdc/#sets). -To start and subscribe to an Event Stream, use an event source to create a -[`StreamRequest`](https://fauna.github.io/fauna-jvm/latest/com/fauna/stream/StreamRequest.html) -and pass it to `stream()` or `asyncStream()`: +To start and subscribe to the stream, pass an `EventSource` and related +`StreamOptions` to `stream()` or `asyncStream()`: ```java // Get an event source. Query query = fql("Product.all().eventSource() { name, stock }"); -QuerySuccess eventSourceResponse = client.query(query, StreamTokenResponse.class); -String eventSource = eventSourceResponse.getData().getToken(); +QuerySuccess tokenResponse = client.query(query, EventSourceResponse.class); +EventSource eventSource = EventSource.fromResponse(querySuccess.getData()); -// Create a StreamRequest. -StreamRequest request = new StreamRequest(eventSource); +// Calculate the timestamp for 10 minutes ago in microseconds. +long tenMinutesAgo = System.currentTimeMillis() * 1000 - (10 * 60 * 1000 * 1000); +StreamOptions streamOptions = StreamOptions.builder().startTimestamp(tenMinutesAgo).build(); -// Use stream() when you want to ensure the stream is ready before proceeding -// with other operations, or when working in a synchronous context. -FaunaStream stream = client.stream(request, Product.class); +// Example 1: Using `stream()` +FaunaStream stream = client.stream(eventSource, streamOptions, Product.class); -// Use asyncStream() when you want to start the stream operation without blocking, -// which is useful in asynchronous applications or when you need to perform other -// tasks while waiting for the stream to be established. -CompletableFuture> futureStream = client.asyncStream(request, Product.class); +// Example 2: Using `asyncStream()` +CompletableFuture> futureStream = client.asyncStream(source, streamOptions, Product.class); ``` +If changes occur between the creation of the event source and the stream +request, the stream replays and emits any related events. + Alternatively, you can pass an FQL query that returns an event source to `stream()` or `asyncStream()`: ```java Query query = fql("Product.all().eventSource() { name, stock }"); -// Create and subscribe to a stream in one step. -// stream() example: + +// Example 1: Using `stream()` FaunaStream stream = client.stream(query, Product.class); -// asyncStream() example: + +// Example 2: Using `asyncStream()` CompletableFuture> futureStream = client.asyncStream(query, Product.class); ``` @@ -687,18 +688,23 @@ import java.util.concurrent.atomic.AtomicInteger; import com.fauna.client.Fauna; import com.fauna.client.FaunaClient; +import com.fauna.client.FaunaConfig; +import com.fauna.event.FaunaEvent; import com.fauna.event.FaunaStream; import com.fauna.exception.FaunaException; + import static com.fauna.query.builder.Query.fql; -import com.fauna.event.StreamEvent; // Import the Product class for event data. import org.example.Product; -public class App { +public class EventStreamExample { public static void main(String[] args) throws InterruptedException { try { - FaunaClient client = Fauna.client(); + FaunaConfig config = FaunaConfig.builder() + .secret("FAUNA_SECRET") + .build(); + FaunaClient client = Fauna.client(config); // Create a stream of all products. Project the name and stock. FaunaStream stream = client.stream(fql("Product.all().eventSource() { name, stock }"), Product.class); @@ -717,7 +723,7 @@ public class App { } } - static class ProductSubscriber implements Flow.Subscriber> { + static class ProductSubscriber implements Flow.Subscriber> { private final AtomicInteger eventCount = new AtomicInteger(0); private Flow.Subscription subscription; private final int maxEvents; @@ -735,7 +741,7 @@ public class App { } @Override - public void onNext(StreamEvent event) { + public void onNext(FaunaEvent event) { // Handle each event... int count = eventCount.incrementAndGet(); System.out.println("Received event " + count + ":"); From a9814344565c6509cbc697d8d2e51878e850ba32 Mon Sep 17 00:00:00 2001 From: David Griffin Date: Tue, 5 Nov 2024 13:17:58 -0800 Subject: [PATCH 2/3] Avoid encouraging secrets in source code. --- README.md | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 69661291..29be14be 100644 --- a/README.md +++ b/README.md @@ -459,7 +459,6 @@ using `flatten()`. ```java import com.fauna.client.Fauna; import com.fauna.client.FaunaClient; -import com.fauna.client.FaunaConfig; import com.fauna.event.FeedIterator; import com.fauna.event.StreamEvent; @@ -471,10 +470,9 @@ import java.util.stream.Collectors; public class FeedExample { public static void main(String[] args) { - FaunaConfig config = new FaunaConfig.Builder() - .secret("FAUNA_SECRET") - .build(); - FaunaClient client = Fauna.client(config); + // By default, FaunaClient will look for a secret in the FAUNA_SECRET environment variable, but you + // can also inject it e.g. FaunaClient.client(FaunaConfig.builder().secret("secret").build()) + FaunaClient client = Fauna.client(); // Use `feed()` to create an event feed with a blocking request. FeedIterator syncIterator = client.feed( From 78480d58ef4ecf6f05e279e80bde602af4661c89 Mon Sep 17 00:00:00 2001 From: James Rodewig Date: Wed, 6 Nov 2024 12:50:32 -0500 Subject: [PATCH 3/3] Update README.md Co-authored-by: David Griffin --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 29be14be..d8c18e59 100644 --- a/README.md +++ b/README.md @@ -460,7 +460,7 @@ using `flatten()`. import com.fauna.client.Fauna; import com.fauna.client.FaunaClient; import com.fauna.event.FeedIterator; -import com.fauna.event.StreamEvent; +import com.fauna.event.FaunaEvent; import java.util.ArrayList; import java.util.List;