Skip to content

Commit

Permalink
Merge pull request #168 from fauna/readme-stream-refactor
Browse files Browse the repository at this point in the history
README: Update for Event Streams refactor
  • Loading branch information
findgriffin authored Nov 6, 2024
2 parents 14ba81b + 91f9dff commit ebc3719
Showing 1 changed file with 28 additions and 22 deletions.
50 changes: 28 additions & 22 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -749,38 +749,39 @@ or
[`eventsOn()`](https://docs.fauna.com/fauna/current/reference/reference/schema_entities/set/eventson)
to a [supported Set](https://docs.fauna.com/fauna/current/reference/cdc/#sets).

To start and subscribe to an Event Stream, use an event source to create a
[`StreamRequest`](https://fauna.github.io/fauna-jvm/latest/com/fauna/stream/StreamRequest.html)
and pass it to `stream()` or `asyncStream()`:
To start and subscribe to the stream, pass an `EventSource` and related
`StreamOptions` to `stream()` or `asyncStream()`:

```java
// Get an event source.
Query query = fql("Product.all().eventSource() { name, stock }");
QuerySuccess<StreamTokenResponse> eventSourceResponse = client.query(query, StreamTokenResponse.class);
String eventSource = eventSourceResponse.getData().getToken();
QuerySuccess<EventSourceResponse> tokenResponse = client.query(query, EventSourceResponse.class);
EventSource eventSource = EventSource.fromResponse(querySuccess.getData());

// Create a StreamRequest.
StreamRequest request = new StreamRequest(eventSource);
// Calculate the timestamp for 10 minutes ago in microseconds.
long tenMinutesAgo = System.currentTimeMillis() * 1000 - (10 * 60 * 1000 * 1000);
StreamOptions streamOptions = StreamOptions.builder().startTimestamp(tenMinutesAgo).build();

// Use stream() when you want to ensure the stream is ready before proceeding
// with other operations, or when working in a synchronous context.
FaunaStream<Product> stream = client.stream(request, Product.class);
// Example 1: Using `stream()`
FaunaStream<Product> stream = client.stream(eventSource, streamOptions, Product.class);

// Use asyncStream() when you want to start the stream operation without blocking,
// which is useful in asynchronous applications or when you need to perform other
// tasks while waiting for the stream to be established.
CompletableFuture<FaunaStream<Product>> futureStream = client.asyncStream(request, Product.class);
// Example 2: Using `asyncStream()`
CompletableFuture<FaunaStream<Product>> futureStream = client.asyncStream(source, streamOptions, Product.class);
```

If changes occur between the creation of the event source and the stream
request, the stream replays and emits any related events.

Alternatively, you can pass an FQL query that returns an event source to `stream()` or
`asyncStream()`:

```java
Query query = fql("Product.all().eventSource() { name, stock }");
// Create and subscribe to a stream in one step.
// stream() example:

// Example 1: Using `stream()`
FaunaStream<Product> stream = client.stream(query, Product.class);
// asyncStream() example:

// Example 2: Using `asyncStream()`
CompletableFuture<FaunaStream<Product>> futureStream = client.asyncStream(query, Product.class);
```

Expand All @@ -800,18 +801,23 @@ import java.util.concurrent.atomic.AtomicInteger;

import com.fauna.client.Fauna;
import com.fauna.client.FaunaClient;
import com.fauna.client.FaunaConfig;
import com.fauna.event.FaunaEvent;
import com.fauna.event.FaunaStream;
import com.fauna.exception.FaunaException;

import static com.fauna.query.builder.Query.fql;
import com.fauna.event.StreamEvent;

// Import the Product class for event data.
import org.example.Product;

public class App {
public class EventStreamExample {
public static void main(String[] args) throws InterruptedException {
try {
FaunaClient client = Fauna.client();
FaunaConfig config = FaunaConfig.builder()
.secret("FAUNA_SECRET")
.build();
FaunaClient client = Fauna.client(config);

// Create a stream of all products. Project the name and stock.
FaunaStream<Product> stream = client.stream(fql("Product.all().eventSource() { name, stock }"), Product.class);
Expand All @@ -830,7 +836,7 @@ public class App {
}
}

static class ProductSubscriber implements Flow.Subscriber<StreamEvent<Product>> {
static class ProductSubscriber implements Flow.Subscriber<FaunaEvent<Product>> {
private final AtomicInteger eventCount = new AtomicInteger(0);
private Flow.Subscription subscription;
private final int maxEvents;
Expand All @@ -848,7 +854,7 @@ public class App {
}

@Override
public void onNext(StreamEvent<Product> event) {
public void onNext(FaunaEvent<Product> event) {
// Handle each event...
int count = eventCount.incrementAndGet();
System.out.println("Received event " + count + ":");
Expand Down

0 comments on commit ebc3719

Please sign in to comment.