diff --git a/README.md b/README.md index 40da5b0a..172e9fc1 100644 --- a/README.md +++ b/README.md @@ -75,16 +75,17 @@ package org.example; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import com.fauna.annotation.FaunaField; import com.fauna.client.Fauna; import com.fauna.client.FaunaClient; -import com.fauna.client.FaunaConfig; import com.fauna.exception.FaunaException; import com.fauna.query.builder.Query; import com.fauna.response.QuerySuccess; -import com.fauna.serialization.generic.PageOf; import com.fauna.types.Page; +import static com.fauna.codec.Generic.pageOf; +import static com.fauna.query.builder.Query.fql; + + public class App { // Define class for `Product` documents @@ -104,7 +105,7 @@ public class App { FaunaClient client = Fauna.client(); // Compose a query. - Query query = Query.fql(""" + Query query = fql(""" Product.sortedByPriceLowToHigh() { name, description, @@ -132,14 +133,14 @@ public class App { // Use `query()` to run a synchronous query. // Synchronous queries block the current thread until the query completes. // Accepts the query, expected result class, and a nullable set of query options. - QuerySuccess> result = client.query(query, new PageOf<>(Product.class)); + QuerySuccess> result = client.query(query, pageOf(Product.class)); printResults(result.getData()); } private static void runAsynchronousQuery(FaunaClient client, Query query) throws ExecutionException, InterruptedException { // Use `asyncQuery()` to run an asynchronous, non-blocking query. // Accepts the query, expected result class, and a nullable set of query options. - CompletableFuture>> futureResult = client.asyncQuery(query, new PageOf<>(Product.class)); + CompletableFuture>> futureResult = client.asyncQuery(query, pageOf(Product.class)); QuerySuccess> result = futureResult.get(); printResults(result.getData()); @@ -147,14 +148,14 @@ public class App { // Iterate through the products in the page. private static void printResults(Page page) { - for (Product product : page.data()) { + for (Product product : page.getData()) { System.out.println("Name: " + product.name); System.out.println("Description: " + product.description); System.out.println("Price: " + product.price); System.out.println("--------"); } // Print the `after` cursor to paginate through results. - System.out.println("After: " + page.after()); + System.out.println("After: " + page.getAfter()); } } ``` @@ -215,8 +216,8 @@ Use `fql` templates to compose FQL queries. To run the query, pass the template and an expected result class to `query()` or `asyncQuery()`: ```java -Query query = Query.fql("Product.sortedByPriceLowToHigh()"); -QuerySuccess> result = client.query(query, new PageOf<>(Product.class)); +Query query = fql("Product.sortedByPriceLowToHigh()"); +QuerySuccess> result = client.query(query, pageOf(Product.class)); ``` You can also pass a nullable set of [query options](#query-options) to `query()` @@ -224,15 +225,41 @@ or `asyncQuery()`. These options control how the query runs in Fauna. See [Query options](#query-options). -### Define a result class +### Define a custom class for your data + +Use annotations to map a Java class to a Fauna document or object shape: + +```java +import com.fauna.annotation.FaunaField; +import com.fauna.annotation.FaunaId; -You can use the `com.fauna.annotation` package to define a result class for a -Fauna document. The package provides annotations like `@FaunaField` and -`@FaunaIgnore` to map Fauna documents to Java classes and fields. +class Person { -Use the `com.fauna.serialization` package to handle deserialization for -generics, such as `PageOf`, `ListOf`, and `MapOf`. + @FaunaId + private String id; + private String firstName; + + @FaunaField( name = "dob") + private String dateOfBirth; +} +``` + +You can use the `com.fauna.annotation` package to modify encoding and decoding of +specific fields in classes used as arguments and results of queries. +* `@FaunaId`: Should only be used once per class and be associated with a field named `id` that represents the Fauna document ID. It's not encoded unless the `isClientGenerated` flag is `true`. +* `@FaunaTs`: Should only be used once per class and be associated with a field named `ts` that represents the timestamp of a document. It's not encoded. +* `@FaunaColl`: Typically goes unmodeled. Should only be used once per class and be associated with a field named `coll` that represents the collection field of a document. It will never be encoded. +* `@FaunaField`: Can be associated with any field to override its name in Fauna. +* `@FaunaIgnore`: Can be used to ignore fields during encoding and decoding. + +Use classes in the `com.fauna.codec` package to handle type erasure when the top-level result +of a query is a generic, including: +* `PageOf` where `T` is the element type. +* `ListOf` where `T` is the element type. +* `MapOf` where `T` is the value type. +* `OptionalOf` where `T` is the value type. +* `NullableOf` where `T` is the value type. This is specifically for cases when you return a Fauna document that may be null and want to receive a concrete `NullDoc` or `NonNullDoc` instead of catching a `NullDocumentException`. ### Variable interpolation @@ -402,3 +429,151 @@ QueryOptions options = QueryOptions.builder() QuerySuccess result = client.query(query, String.class, options); ``` + +## Event streaming + +The driver supports [event streaming](https://docs.fauna.com/fauna/current/learn/streaming). + +To get a stream token, append +[`toStream()`](https://docs.fauna.com/fauna/current/reference/reference/schema_entities/set/tostream) +or +[`changesOn()`](https://docs.fauna.com/fauna/current/reference/reference/schema_entities/set/changeson) +to a set from a [supported +source](https://docs.fauna.com/fauna/current/reference/streaming_reference/#supported-sources). + +To start and subscribe to the stream, use a stream token to create a +`StreamRequest` and pass the `StreamRequest` to `stream()` or `asyncStream()`: + +```java +// Get a stream token. +Query query = fql("Product.all().toStream() { name, stock }"); +QuerySuccess tokenResponse = client.query(query, StreamTokenResponse.class); +String streamToken = tokenResponse.getData().getToken(); + +// Create a StreamRequest. +StreamRequest request = new StreamRequest(streamToken); + +// Use stream() when you want to ensure the stream is ready before proceeding +// with other operations, or when working in a synchronous context. +FaunaStream stream = client.stream(request, Product.class); + +// Use asyncStream() when you want to start the stream operation without blocking, +// which is useful in asynchronous applications or when you need to perform other +// tasks while waiting for the stream to be established. +CompletableFuture> futureStream = client.asyncStream(request, Product.class); +``` + +Alternatively, you can pass an FQL query that returns a stream token to `stream()` or +`asyncStream()`: + +```java +Query query = fql("Product.all().toStream() { name, stock }"); +// Create and subscribe to a stream in one step. +// stream() example: +FaunaStream stream = client.stream(query, Product.class); +// asyncStream() example: +CompletableFuture> futureStream = client.asyncStream(query, Product.class); +``` + +### Create a subscriber class + +The methods return a `FaunaStream` publisher that lets you handle events as they +arrive. Create a class with the `Flow.Subscriber` interface to process +events: + +```java +package org.example; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Flow; +import java.util.concurrent.atomic.AtomicInteger; + +import com.fauna.client.Fauna; +import com.fauna.client.FaunaClient; +import com.fauna.client.FaunaStream; +import com.fauna.exception.FaunaException; +import static com.fauna.query.builder.Query.fql; +import com.fauna.response.StreamEvent; + +// Import the Product class for event data. +import org.example.Product; + +public class App { + public static void main(String[] args) throws InterruptedException { + try { + FaunaClient client = Fauna.client(); + + // Create a stream of all products. Project the name and stock. + FaunaStream stream = client.stream(fql("Product.all().toStream() { name, stock }"), Product.class); + + // Create a subscriber to handle stream events. + ProductSubscriber subscriber = new ProductSubscriber(); + stream.subscribe(subscriber); + + // Wait for the subscriber to complete. + subscriber.awaitCompletion(); + } catch (FaunaException e) { + System.err.println("Fauna error occurred: " + e.getMessage()); + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + static class ProductSubscriber implements Flow.Subscriber> { + private final AtomicInteger eventCount = new AtomicInteger(0); + private Flow.Subscription subscription; + private final int maxEvents; + private final CountDownLatch completionLatch = new CountDownLatch(1); + + public ProductSubscriber() { + // Stream closes after 3 events. + this.maxEvents = 3; + } + + @Override + public void onSubscribe(Flow.Subscription subscription) { + this.subscription = subscription; + subscription.request(1); + } + + @Override + public void onNext(StreamEvent event) { + // Handle each event... + int count = eventCount.incrementAndGet(); + System.out.println("Received event " + count + ":"); + System.out.println(" Cursor: " + event.getCursor()); + System.out.println(" Timestamp: " + event.getTimestamp()); + System.out.println(" Data: " + event.getData().orElse(null)); + + if (count >= maxEvents) { + System.out.println("Closing stream after " + maxEvents + " events"); + subscription.cancel(); + completionLatch.countDown(); + } else { + subscription.request(1); + } + } + + @Override + public void onError(Throwable throwable) { + System.err.println("Error in stream: " + throwable.getMessage()); + completionLatch.countDown(); + } + + @Override + public void onComplete() { + System.out.println("Stream completed."); + completionLatch.countDown(); + } + + public int getEventCount() { + return eventCount.get(); + } + + public void awaitCompletion() throws InterruptedException { + completionLatch.await(); + } + } +} +```