Skip to content

Commit

Permalink
Merge pull request #167 from fauna/update-event-feeds-readme
Browse files Browse the repository at this point in the history
README: Update Event Feeds info
  • Loading branch information
findgriffin authored Nov 6, 2024
2 parents 9847fcf + 70a8569 commit 14ba81b
Showing 1 changed file with 192 additions and 79 deletions.
271 changes: 192 additions & 79 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,6 @@ import java.util.concurrent.ExecutionException;

import com.fauna.client.Fauna;
import com.fauna.client.FaunaClient;
import com.fauna.client.FaunaConfig;
import com.fauna.exception.FaunaException;
import com.fauna.exception.ServiceException;
import com.fauna.query.builder.Query;
Expand All @@ -345,8 +344,7 @@ import com.fauna.response.QuerySuccess;
public class App {
public static void main(String[] args) {
try {
FaunaConfig config = FaunaConfig.builder().secret("FAUNA_SECRET").build();
FaunaClient client = Fauna.client(config);
FaunaClient client = Fauna.client();

Query query = fql("'Hello world'");

Expand Down Expand Up @@ -446,113 +444,213 @@ or
[`eventsOn()`](https://docs.fauna.com/fauna/current/reference/fql-api/schema-entities/set/eventson/)
to a [supported Set](https://docs.fauna.com/fauna/current/reference/cdc/#sets).

To get paginated events, pass the event source query or a `FeedRequest` to
`feed()` or `asyncFeed()`.
To get an event feed, you can use one of the following methods:

* `feed()`: Synchronously fetches an event feed and returns a `FeedIterator`
that you can use to iterate through the pages of events.

`feed()` returns an iterator, `FeedIterator<E>`, that emits pages of events.
Similarly, `asyncFeed()` returns `CompletableFuture<FeedIterator<E>>`.
* `asyncFeed()`: Asynchronously fetches an event feed and returns a
`CompletableFuture<FeedIterator>` that you can use to iterate through the
pages of events.

You can use a `forEachRemaining()` loop to process each page of events.
Alternatively, you can iterate through individual events instead of pages
using `flatten()`.
* `poll()`: Asynchronously fetches a single page of events from the event feed
and returns a `CompletableFuture<FeedPage>` that you can use to handle each
page individually. You can repeatedly call `poll()` to get successive pages.

You can use `flatten()` on a `FeedIterator` to iterate through events rather
than pages.

```java
import com.fauna.client.Fauna;
import com.fauna.client.FaunaClient;
import com.fauna.client.FaunaConfig;
import com.fauna.event.FeedIterator;
import com.fauna.event.StreamEvent;
import com.fauna.event.EventSource;
import com.fauna.event.FeedOptions;
import com.fauna.event.FeedPage;
import com.fauna.event.EventSourceResponse;
import com.fauna.response.QuerySuccess;
import com.fauna.event.FaunaEvent;

import java.util.ArrayList;
import java.util.List;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

public class FeedExample {
import static com.fauna.query.builder.Query.fql;

// Import the Product class for event data.
import org.example.Product;

public class EventFeedExample {
private static void printEventDetails(FaunaEvent<Product> event) {
System.out.println("Event Details:");
System.out.println(" Type: " + event.getType());
System.out.println(" Cursor: " + event.getCursor());

event.getTimestamp().ifPresent(ts ->
System.out.println(" Timestamp: " + ts)
);

event.getData().ifPresent(product ->
System.out.println(" Product: " + product.toString())
);

if (event.getStats() != null) {
System.out.println(" Stats: " + event.getStats());
}

if (event.getError() != null) {
System.out.println(" Error: " + event.getError());
}

System.out.println("-------------------");
}

public static void main(String[] args) {
FaunaConfig config = new FaunaConfig.Builder()
.secret("FAUNA_SECRET")
FaunaClient client = Fauna.client();

long tenMinutesAgo = System.currentTimeMillis() * 1000 - (10 * 60 * 1000 * 1000);
FeedOptions options = FeedOptions.builder()
.startTs(tenMinutesAgo)
.pageSize(10)
.build();
FaunaClient client = Fauna.client(config);

// Use `feed()` to create an event feed with a blocking request.
// Example 1: Using `feed()`
FeedIterator<Product> syncIterator = client.feed(
fql("Product.all().eventsOn(.price, .stock)"),
Product.class // Class for the type returned in events
options,
Product.class
);

// Handle each page of events
System.out.println("----------------------");
System.out.println("`feed()` results:");
System.out.println("----------------------");
syncIterator.forEachRemaining(page -> {
// Handle each event
for (StreamEvent<Product> event : page.getEvents()) {
System.out.println("Event: " + event);
for (FaunaEvent<Product> event : page.getEvents()) {
printEventDetails(event);
}
});

// Get an event source.
Query query = fql("Product.all().eventSource() { name, stock }");
QuerySuccess<StreamTokenResponse> eventSourceResponse = client.query(query, StreamTokenResponse.class);
String eventSource = eventSourceResponse.getData().getToken();
// Example 2: Using `asyncFeed()`
CompletableFuture<FeedIterator<Product>> iteratorFuture = client.asyncFeed(
fql("Product.all().eventsOn(.price, .stock)"),
options,
Product.class
);

// Create a FeedRequest.
FeedRequest feedRequest = FeedRequest.builder(eventSource)
.pageSize(10)
.build();
FeedIterator<Product> iterator = iteratorFuture.join();
System.out.println("----------------------");
System.out.println("`asyncFeed()` results:");
System.out.println("----------------------");
iterator.forEachRemaining(page -> {
for (FaunaEvent<Product> event : page.getEvents()) {
printEventDetails(event);
}
});

// Use `asyncFeed()` to get an event feed with a non-blocking request.
CompletableFuture<FeedIterator<Product>> futureFeed = client.asyncFeed(
feedRequest,
Product.class // Class for the type returned in events
// Example 3: Using `flatten()` on a `FeedIterator`
FeedIterator<Product> flattenedIterator = client.feed(
fql("Product.all().eventSource()"),
options,
Product.class
);

futureFeed.thenAccept(iterator -> {
// Flatten pages to a single list of events.
List<StreamEvent<Product>> events = iterator.flatten().collect(Collectors.toList());
System.out.println("Received " + events.size() + " events from asyncFeed.");
}).exceptionally(ex -> {
System.err.println("Error initializing async feed: " + ex.getMessage());
return null;
});
Iterator<FaunaEvent<Product>> eventIterator = flattenedIterator.flatten();
List<FaunaEvent<Product>> allEvents = new ArrayList<>();
eventIterator.forEachRemaining(allEvents::add);
System.out.println("----------------------");
System.out.println("`flatten()` results:");
System.out.println("----------------------");
for (FaunaEvent<Product> event : allEvents) {
printEventDetails(event);
}

// Example 4: Using `poll()`
QuerySuccess<EventSourceResponse> sourceQuery = client.query(
fql("Product.all().eventSource()"),
EventSourceResponse.class
);
EventSource source = EventSource.fromResponse(sourceQuery.getData());

CompletableFuture<FeedPage<Product>> pageFuture = client.poll(
source,
options,
Product.class
);

while (pageFuture != null) {
FeedPage<Product> page = pageFuture.join();
List<FaunaEvent<Product>> events = page.getEvents();

System.out.println("----------------------");
System.out.println("`poll()` results:");
System.out.println("----------------------");
for (FaunaEvent<Product> event : events) {
printEventDetails(event);
}

if (page.hasNext()) {
FeedOptions nextPageOptions = options.nextPage(page);
pageFuture = client.poll(source, nextPageOptions, Product.class);
} else {
pageFuture = null;
}
}
}
}
```

If changes occur between the creation of the event source and the request, the
feed replays and emits any related events. In most cases, you’ll get events
after a specific start time or event cursor.
If you pass an event source directly to `feed()` or `poll()` and changes occur
between the creation of the event source and the Event Feed request, the feed
replays and emits any related events.

In most cases, you'll get events after a specific start time or cursor.

### Get events after a specific start time

When you first poll an event source using an Event Feed, you usually pass
a `startTs` argument to `feed()` or `asyncFeed()`.
When you first poll an event source using an Event Feed, you usually include a
`startTs` (start timestamp) in the `FeedOptions` passed to `feed()`,
`asyncFeed()`, or `poll()`.

`startTs` is an integer representing a time in microseconds since the Unix
epoch. The request returns events that occurred after the specified timestamp
(exclusive).

```java
// Get an event source.
Query query = fql("Product.all().eventSource() { name, stock }");
QuerySuccess<StreamTokenResponse> eventSourceResponse = client.query(query, StreamTokenResponse.class);
String eventSource = eventSourceResponse.getData().getToken();
Query query = fql("Product.all().eventsOn(.price, .stock)");

// Calculate the timestamp for 10 minutes ago in microseconds.
long tenMinutesAgo = System.currentTimeMillis() * 1000 - (10 * 60 * 1000 * 1000);

// Create a FeedRequest.
FeedRequest feedRequest = FeedRequest.builder(eventSource)
.startTs(tenMinutesAgo)
.pageSize(10)
.build();
FeedOptions options = FeedOptions.builder()
.startTs(tenMinutesAgo)
.pageSize(10)
.build();

// Example 1: Using `feed()`
FeedIterator<Product> syncIterator = client.feed(
feedRequest,
query,
options,
Product.class
);

// Example 2: Using `asyncFeed()`
CompletableFuture<FeedIterator<Product>> iteratorFuture = client.asyncFeed(
query,
options,
Product.class
);

CompletableFuture<FeedIterator<Product>> futureFeed = client.asyncFeed(
feedRequest,
// Example 3: Using `poll()`
QuerySuccess<EventSourceResponse> sourceQuery = client.query(
query,
EventSourceResponse.class
);
EventSource source = EventSource.fromResponse(sourceQuery.getData());

CompletableFuture<FeedPage<Product>> pageFuture = client.poll(
source,
options,
Product.class
);
```
Expand All @@ -561,27 +659,41 @@ CompletableFuture<FeedIterator<Product>> futureFeed = client.asyncFeed(

After the initial request, you usually get subsequent events using the cursor
for the last page or event. To get events after a cursor (exclusive), include
the cursor in a `FeedRequest` passed to `feed()` or `asyncFeed()`:
the cursor in the `FeedOptions` passed to passed to `feed()`,
`asyncFeed()`, or `poll()`.

```java
// Get an event source.
Query query = fql("Product.all().eventSource() { name, stock }");
QuerySuccess<StreamTokenResponse> tokenResponse = client.query(query, StreamTokenResponse.class);
String eventSource = tokenResponse.getData().getToken();
Query query = fql("Product.all().eventsOn(.price, .stock)");

// Create a FeedRequest.
FeedRequest feedRequest = FeedRequest.builder(eventSource)
.cursor("gsGabc456")
.pageSize(10)
.build();
FeedOptions options = FeedOptions.builder()
.cursor("gsGabc456") // Cursor for the last page
.pageSize(10)
.build();

// Example 1: Using `feed()`
FeedIterator<Product> syncIterator = client.feed(
feedRequest,
query,
options,
Product.class
);

// Example 2: Using `asyncFeed()`
CompletableFuture<FeedIterator<Product>> iteratorFuture = client.asyncFeed(
query,
options,
Product.class
);

CompletableFuture<FeedIterator<Product>> futureFeed = client.asyncFeed(
feedRequest,
// Example 3: Using `poll()`
QuerySuccess<EventSourceResponse> sourceQuery = client.query(
query,
EventSourceResponse.class
);
EventSource source = EventSource.fromResponse(sourceQuery.getData());

CompletableFuture<FeedPage<Product>> pageFuture = client.poll(
source,
options,
Product.class
);
```
Expand All @@ -593,19 +705,20 @@ Exceptions can be raised in two different places:
* While fetching a page
* While iterating a page's events

This distinction lets ignore errors originating from event processing.
For example:
This distinction lets ignore errors originating from event processing. For
example:

```java
try {
FeedIterator<Product> iterator = client.feed(
FeedIterator<Product> syncIterator = client.feed(
fql("Product.all().map(.details.toUpperCase()).eventSource()"),
options,
Product.class
);

iterator.forEachRemaining(page -> {
syncIterator.forEachRemaining(page -> {
try {
for (StreamEvent<Product> event : page.getEvents()) {
for (FaunaEvent<Product> event : page.getEvents()) {
// Event-specific handling.
System.out.println("Event: " + event);
}
Expand Down

0 comments on commit 14ba81b

Please sign in to comment.