diff --git a/README.rst b/README.rst index 89ee1bfa..39499999 100644 --- a/README.rst +++ b/README.rst @@ -307,10 +307,140 @@ Change the default items per page using FQL's ``.pageSize()`` method. Event Streaming ------------------ -`Event Streaming `_ is currently available in the beta version of the driver: +The driver supports `Event Streaming `_. -- `Beta Python driver `_ -- `Beta Python driver docs `_ +Start a stream +~~~~~~~~~~~~~~ + +To get a stream token, append ``toStream()`` or ``changesOn()`` to a set from a +`supported source +`_. + + +To start and subscribe to the stream, pass the stream token to +``Client.stream()``: + +.. code-block:: python + + import fauna + + from fauna import fql + from fauna.client import Client, StreamOptions + + client = Client(secret='YOUR_FAUNA_SECRET') + + response = client.query(fql(''' + let set = Product.all() + { + initialPage: set.pageSize(10), + streamToken: set.toStream() + } + ''')) + + initialPage = response.data['initialPage'] + streamToken = response.data['streamToken'] + + client.stream(streamToken) + +You can also pass a query that produces a stream token directly to +``Client.stream()``: + +.. code-block:: python + + query = fql('Product.all().changesOn(.price, .quantity)') + + client.stream(query) + +Iterate on a stream +~~~~~~~~~~~~~~~~~~~ + +``Client.stream()`` returns an iterator that emits events as they occur. You can +use a generator expression to iterate through the events: + +.. code-block:: python + + query = fql('Product.all().changesOn(.price, .quantity)') + + with client.stream(query) as stream: + for event in stream: + eventType = event['type'] + if (eventType == 'add'): + print('Add event: ', event) + ## ... + elif (eventType == 'update'): + print('Update event: ', event) + ## ... + elif (eventType == 'remove'): + print('Remove event: ', event) + ## ... + +Close a stream +~~~~~~~~~~~~~~ + +Use ``.close()`` to close a stream: + +.. code-block:: python + + query = fql('Product.all().changesOn(.price, .quantity)') + + count = 0 + with client.stream(query) as stream: + for event in stream: + print('Stream event', event) + # ... + count+=1 + + if (count == 2): + stream.close() + +Error handling +~~~~~~~~~~~~~~ + +If a non-retryable error occurs when opening or processing a stream, Fauna +raises a ``FaunaException``: + +.. code-block:: python + + import fauna + + from fauna import fql + from fauna.client import Client + from fauna.errors import FaunaException + + client = Client(secret='YOUR_FAUNA_SECRET') + + try: + with client.stream(fql( + 'Product.all().changesOn(.price, .quantity)' + )) as stream: + for event in stream: + print(event) + # ... + except FaunaException as e: + print('error ocurred with stream: ', e) + +Stream options +~~~~~~~~~~~~~~ + +The client configuration sets default options for the ``Client.stream()`` +method. + +You can pass an ``StreamOptions`` object to override these defaults: + +.. code-block:: python + + options = StreamOptions( + max_attempts=5, + max_backoff=1, + start_ts=1710968002310000, + status_events=True + ) + + client.stream(fql('Product.all().toStream()'), options) + +For supported properties, see `Stream options +`_ +in the Fauna docs. Setup -----