Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

README: update define class content #133

Merged
merged 13 commits into from
Sep 10, 2024
198 changes: 186 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,16 +75,17 @@ package org.example;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

import com.fauna.annotation.FaunaField;
import com.fauna.client.Fauna;
import com.fauna.client.FaunaClient;
import com.fauna.client.FaunaConfig;
import com.fauna.exception.FaunaException;
import com.fauna.query.builder.Query;
import com.fauna.response.QuerySuccess;
import com.fauna.serialization.generic.PageOf;
import com.fauna.types.Page;

import com.fauna.codec.Generic.PageOf;
import static com.fauna.query.builder.Query.fql;


public class App {

// Define class for `Product` documents
Expand All @@ -104,7 +105,7 @@ public class App {
FaunaClient client = Fauna.client();

// Compose a query.
Query query = Query.fql("""
Query query = fql("""
Product.sortedByPriceLowToHigh() {
name,
description,
Expand Down Expand Up @@ -132,14 +133,14 @@ public class App {
// Use `query()` to run a synchronous query.
// Synchronous queries block the current thread until the query completes.
// Accepts the query, expected result class, and a nullable set of query options.
QuerySuccess<Page<Product>> result = client.query(query, new PageOf<>(Product.class));
QuerySuccess<Page<Product>> result = client.query(query, pageOf(Product.class));
jrodewig marked this conversation as resolved.
Show resolved Hide resolved
printResults(result.getData());
}

private static void runAsynchronousQuery(FaunaClient client, Query query) throws ExecutionException, InterruptedException {
// Use `asyncQuery()` to run an asynchronous, non-blocking query.
// Accepts the query, expected result class, and a nullable set of query options.
CompletableFuture<QuerySuccess<Page<Product>>> futureResult = client.asyncQuery(query, new PageOf<>(Product.class));
CompletableFuture<QuerySuccess<Page<Product>>> futureResult = client.asyncQuery(query, pageOf(Product.class));
jrodewig marked this conversation as resolved.
Show resolved Hide resolved

QuerySuccess<Page<Product>> result = futureResult.get();
printResults(result.getData());
Expand Down Expand Up @@ -224,15 +225,40 @@ or `asyncQuery()`. These options control how the query runs in Fauna. See [Query
options](#query-options).


### Define a result class
### Define a custom class for your data
These should be simple POJOs.
findgriffin marked this conversation as resolved.
Show resolved Hide resolved

You can use the `com.fauna.annotation` package to define a result class for a
Fauna document. The package provides annotations like `@FaunaField` and
`@FaunaIgnore` to map Fauna documents to Java classes and fields.
```java
import com.fauna.annotation.FaunaField;
import com.fauna.annotation.FaunaId;

Use the `com.fauna.serialization` package to handle deserialization for
generics, such as `PageOf`, `ListOf`, and `MapOf`.
class Person {

@FaunaId
private String id;

private String firstName;

@FaunaField( name = "dob")
private String dateOfBirth;
}
```

You can use the `com.fauna.annotation` package to modify encoding and decoding of
jrodewig marked this conversation as resolved.
Show resolved Hide resolved
specific fields in classes used as arguments and results of queries.
* `@FaunaId`: Should only be used once per class and be associated with a field named `id` if it represents the ID of a document. It will not be encoded unless the `isClientGenerated` flag is set.
findgriffin marked this conversation as resolved.
Show resolved Hide resolved
* `@FaunaTs`: Should only be used once per class and be associated with a field named `ts` if it represents the timestamp of a document. It will never be encoded.
findgriffin marked this conversation as resolved.
Show resolved Hide resolved
* `@FaunaColl`: Typically goes unmodeled. Should only be used once per class and be associated with a field named `coll` if represents the collection field of a document. It will never be encoded.
jrodewig marked this conversation as resolved.
Show resolved Hide resolved
* `@FaunaField`: Can be associated with any field to override its name in Fauna.
* `@FaunaIgnore`: Can be used to ignore fields during encoding and decoding.

In the `com.fauna.codec` package, you use classes to handle type erasure when the top-level result
of a query is a generic, including:
findgriffin marked this conversation as resolved.
Show resolved Hide resolved
* `PageOf<T>` where `T` is the element type.
* `ListOf<T>` where `T` is the element type.
* `MapOf<T>` where `T` is the value type.
* `OptionalOf<T>` where `T` is the value type.
* `NullableOf<T>` where `T` is the value type. This is specifically for cases when returning a Fauna Document that may be null and you want to receive a concrete NullDoc<T> or NonNullDoc<T> instead of catching a NullDocumentException.
findgriffin marked this conversation as resolved.
Show resolved Hide resolved

### Variable interpolation

Expand Down Expand Up @@ -402,3 +428,151 @@ QueryOptions options = QueryOptions.builder()

QuerySuccess result = client.query(query, String.class, options);
```

## Event streaming

The driver supports [event streaming](https://docs.fauna.com/fauna/current/learn/streaming).

To get a stream token, append
[`toStream()`](https://docs.fauna.com/fauna/current/reference/reference/schema_entities/set/tostream)
or
[`changesOn()`](https://docs.fauna.com/fauna/current/reference/reference/schema_entities/set/changeson)
to a set from a [supported
source](https://docs.fauna.com/fauna/current/reference/streaming_reference/#supported-sources

To start and subscribe to the stream, use a stream token to create a
`StreamRequest` and pass the `StreamRequest` to `stream()` or `asyncStream()`:

```java
// Get a stream token.
Query query = fql("Product.all().toStream() { name, stock }");
QuerySuccess<StreamTokenResponse> tokenResponse = client.query(query, StreamTokenResponse.class);
String streamToken = tokenResponse.getData().getToken();

// Create a StreamRequest.
StreamRequest request = new StreamRequest(streamToken);

// Use stream() when you want to ensure the stream is ready before proceeding
// with other operations, or when working in a synchronous context.
FaunaStream<Product> stream = client.stream(request, Product.class);

// Use asyncStream() when you want to start the stream operation without blocking,
// which is useful in asynchronous applications or when you need to perform other
// tasks while waiting for the stream to be established.
CompletableFuture<FaunaStream<Product>> futureStream = client.asyncStream(request, Product.class);
```

Alternatively, you also pass an FQL that returns a stream token to `stream()` or
`asyncStream()`:

```java
Query query = fql("Product.all().toStream() { name, stock }");
// Create and subscribe to a stream in one step.
// stream() example:
FaunaStream<Product> stream = client.stream(query, Product.class);
// asyncStream() example:
CompletableFuture<FaunaStream<Product>> futureStream = client.asyncStream(query, Product.class);
```

### Create a subscriber class

The methods return a `FaunaStream` publisher that lets you handle events as they
arrive. Create a class with the `Flow.Subscriber` interface to process
events:

```java
package org.example;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicInteger;

import com.fauna.client.Fauna;
import com.fauna.client.FaunaClient;
import com.fauna.client.FaunaStream;
import com.fauna.exception.FaunaException;
import static com.fauna.query.builder.Query.fql;
import com.fauna.response.StreamEvent;

// Import the Product class for event data.
import org.example.Product;

public class App {
public static void main(String[] args) throws InterruptedException {
try {
FaunaClient client = Fauna.client();

// Create a stream of all products. Project the name and stock.
FaunaStream<Product> stream = client.stream(fql("Product.all().toStream() { name, stock }"), Product.class);

// Create a subscriber to handle stream events.
ProductSubscriber subscriber = new ProductSubscriber();
stream.subscribe(subscriber);

// Wait for the subscriber to complete.
subscriber.awaitCompletion();
} catch (FaunaException e) {
System.err.println("Fauna error occurred: " + e.getMessage());
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

static class ProductSubscriber implements Flow.Subscriber<StreamEvent<Product>> {
private final AtomicInteger eventCount = new AtomicInteger(0);
private Flow.Subscription subscription;
private final int maxEvents;
private final CountDownLatch completionLatch = new CountDownLatch(1);

public ProductSubscriber() {
// Stream closes after 3 events.
this.maxEvents = 3;
}

@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}

@Override
public void onNext(StreamEvent<Product> event) {
// Handle each event...
int count = eventCount.incrementAndGet();
System.out.println("Received event " + count + ":");
System.out.println(" Cursor: " + event.getCursor());
System.out.println(" Timestamp: " + event.getTimestamp());
System.out.println(" Data: " + event.getData().orElse(null));

if (count >= maxEvents) {
System.out.println("Closing stream after " + maxEvents + " events");
subscription.cancel();
completionLatch.countDown();
} else {
subscription.request(1);
}
}

@Override
public void onError(Throwable throwable) {
System.err.println("Error in stream: " + throwable.getMessage());
completionLatch.countDown();
}

@Override
public void onComplete() {
System.out.println("Stream completed.");
completionLatch.countDown();
}

public int getEventCount() {
return eventCount.get();
}

public void awaitCompletion() throws InterruptedException {
completionLatch.await();
}
}
}
```
Loading