Skip to content

Commit

Permalink
Merge branch 'main' into debug-logging
Browse files Browse the repository at this point in the history
  • Loading branch information
pnwpedro authored Oct 22, 2024
2 parents e6f7f49 + 962eae2 commit 4ca32e5
Show file tree
Hide file tree
Showing 12 changed files with 316 additions and 270 deletions.
328 changes: 162 additions & 166 deletions README.md

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion fauna/client/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from .client import Client, QueryOptions, StreamOptions, ChangeFeedOptions
from .client import Client, QueryOptions, StreamOptions, FeedOptions, FeedPage, FeedIterator
from .endpoints import Endpoints
from .headers import Header
96 changes: 50 additions & 46 deletions fauna/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@
from fauna.errors import FaunaError, ClientError, ProtocolError, \
RetryableFaunaException, NetworkError
from fauna.http.http_client import HTTPClient
from fauna.query import Query, Page, fql
from fauna.query.models import StreamToken
from fauna.query import EventSource, Query, Page, fql

logger = logging.getLogger("fauna")

Expand Down Expand Up @@ -74,14 +73,14 @@ class StreamOptions:


@dataclass
class ChangeFeedOptions:
class FeedOptions:
"""
A dataclass representing options available for a change feed.
A dataclass representing options available for an Event Feed.
* max_attempts - The maximum number of times to attempt a change feed query when a retryable exception is thrown.
* max_attempts - The maximum number of times to attempt an Event Feed query when a retryable exception is thrown.
* max_backoff - The maximum backoff in seconds for an individual retry.
* query_timeout - Controls the maximum amount of time Fauna will execute a query before returning a page of events.
* start_ts - The starting timestamp of the change feed, exclusive. If set, Fauna will return events starting after
* start_ts - The starting timestamp of the Event Feed, exclusive. If set, Fauna will return events starting after
the timestamp.
* cursor - The starting event cursor, exclusive. If set, Fauna will return events starting after the cursor.
* page_size - The desired number of events per page.
Expand Down Expand Up @@ -423,13 +422,13 @@ def _query(

def stream(
self,
fql: Union[StreamToken, Query],
fql: Union[EventSource, Query],
opts: StreamOptions = StreamOptions()
) -> "StreamIterator":
"""
Opens a Stream in Fauna and returns an iterator that consume Fauna events.
:param fql: A Query that returns a StreamToken or a StreamToken.
:param fql: An EventSource or a Query that returns an EventSource.
:param opts: (Optional) Stream Options.
:return: a :class:`StreamIterator`
Expand All @@ -445,34 +444,35 @@ def stream(
if isinstance(fql, Query):
if opts.cursor is not None:
raise ClientError(
"The 'cursor' configuration can only be used with a stream token.")
token = self.query(fql).data
"The 'cursor' configuration can only be used with an event source.")

source = self.query(fql).data
else:
token = fql
source = fql

if not isinstance(token, StreamToken):
err_msg = f"'fql' must be a StreamToken, or a Query that returns a StreamToken but was a {type(token)}."
if not isinstance(source, EventSource):
err_msg = f"'fql' must be an EventSource, or a Query that returns an EventSource but was a {type(source)}."
raise TypeError(err_msg)

headers = self._headers.copy()
headers[_Header.Format] = "tagged"
headers[_Header.Authorization] = self._auth.bearer()

return StreamIterator(self._session, headers, self._endpoint + "/stream/1",
self._max_attempts, self._max_backoff, opts, token)
self._max_attempts, self._max_backoff, opts, source)

def change_feed(
def feed(
self,
fql: Union[StreamToken, Query],
opts: ChangeFeedOptions = ChangeFeedOptions()
) -> "ChangeFeedIterator":
source: Union[EventSource, Query],
opts: FeedOptions = FeedOptions(),
) -> "FeedIterator":
"""
Opens a change feed in Fauna and returns an iterator that consume Fauna events.
Opens an Event Feed in Fauna and returns an iterator that consume Fauna events.
:param fql: A Query that returns a StreamToken or a StreamToken.
:param opts: (Optional) Change feed options.
:param source: An EventSource or a Query that returns an EventSource.
:param opts: (Optional) Event Feed options.
:return: a :class:`ChangeFeedIterator`
:return: a :class:`FeedIterator`
:raises ClientError: Invalid options provided
:raises NetworkError: HTTP Request failed in transit
Expand All @@ -482,13 +482,11 @@ def change_feed(
:raises TypeError: Invalid param types
"""

if isinstance(fql, Query):
token = self.query(fql).data
else:
token = fql
if isinstance(source, Query):
source = self.query(source).data

if not isinstance(token, StreamToken):
err_msg = f"'fql' must be a StreamToken, or a Query that returns a StreamToken but was a {type(token)}."
if not isinstance(source, EventSource):
err_msg = f"'source' must be an EventSource, or a Query that returns an EventSource but was a {type(source)}."
raise TypeError(err_msg)

headers = self._headers.copy()
Expand All @@ -501,10 +499,8 @@ def change_feed(
elif self._query_timeout_ms is not None:
headers[Header.QueryTimeoutMs] = str(self._query_timeout_ms)

return ChangeFeedIterator(self._session, headers,
self._endpoint + "/changefeed/1",
self._max_attempts, self._max_backoff, opts,
token)
return FeedIterator(self._session, headers, self._endpoint + "/feed/1",
self._max_attempts, self._max_backoff, opts, source)

def _check_protocol(self, response_json: Any, status_code):
# TODO: Logic to validate wire protocol belongs elsewhere.
Expand Down Expand Up @@ -544,19 +540,23 @@ class StreamIterator:

def __init__(self, http_client: HTTPClient, headers: Dict[str, str],
endpoint: str, max_attempts: int, max_backoff: int,
opts: StreamOptions, token: StreamToken):
opts: StreamOptions, source: EventSource):
self._http_client = http_client
self._headers = headers
self._endpoint = endpoint
self._max_attempts = max_attempts
self._max_backoff = max_backoff
self._opts = opts
self._token = token
self._source = source
self._stream = None
self.last_ts = None
self.last_cursor = None
self._ctx = self._create_stream()

if opts.start_ts is not None and opts.cursor is not None:
err_msg = "Only one of 'start_ts' or 'cursor' can be defined in the StreamOptions."
raise TypeError(err_msg)

def __enter__(self):
return self

Expand Down Expand Up @@ -626,7 +626,7 @@ def _retry_stream(self):
raise RetryableFaunaException

def _create_stream(self):
data: Dict[str, Any] = {"token": self._token.token}
data: Dict[str, Any] = {"token": self._source.token}
if self.last_cursor is not None:
data["cursor"] = self.last_cursor
elif self._opts.cursor is not None:
Expand All @@ -642,7 +642,7 @@ def close(self):
self._stream.close()


class ChangeFeedPage:
class FeedPage:

def __init__(self, events: List[Any], cursor: str, stats: QueryStats):
self._events = events
Expand All @@ -659,20 +659,24 @@ def __iter__(self) -> Iterator[Any]:
yield event


class ChangeFeedIterator:
"""A class to provide an iterator on top of change feed pages."""
class FeedIterator:
"""A class to provide an iterator on top of Event Feed pages."""

def __init__(self, http: HTTPClient, headers: Dict[str, str], endpoint: str,
max_attempts: int, max_backoff: int, opts: ChangeFeedOptions,
token: StreamToken):
max_attempts: int, max_backoff: int, opts: FeedOptions,
source: EventSource):
self._http = http
self._headers = headers
self._endpoint = endpoint
self._max_attempts = opts.max_attempts or max_attempts
self._max_backoff = opts.max_backoff or max_backoff
self._request: Dict[str, Any] = {"token": token.token}
self._request: Dict[str, Any] = {"token": source.token}
self._is_done = False

if opts.start_ts is not None and opts.cursor is not None:
err_msg = "Only one of 'start_ts' or 'cursor' can be defined in the FeedOptions."
raise TypeError(err_msg)

if opts.page_size is not None:
self._request["page_size"] = opts.page_size

Expand All @@ -681,19 +685,19 @@ def __init__(self, http: HTTPClient, headers: Dict[str, str], endpoint: str,
elif opts.start_ts is not None:
self._request["start_ts"] = opts.start_ts

def __iter__(self) -> Iterator[ChangeFeedPage]:
def __iter__(self) -> Iterator[FeedPage]:
self._is_done = False
return self

def __next__(self) -> ChangeFeedPage:
def __next__(self) -> FeedPage:
if self._is_done:
raise StopIteration

retryable = Retryable[Any](self._max_attempts, self._max_backoff,
self._next_page)
return retryable.run().response

def _next_page(self) -> ChangeFeedPage:
def _next_page(self) -> FeedPage:
with self._http.request(
method="POST",
url=self._endpoint,
Expand All @@ -712,8 +716,8 @@ def _next_page(self) -> ChangeFeedPage:
if "start_ts" in self._request:
del self._request["start_ts"]

return ChangeFeedPage(decoded["events"], decoded["cursor"],
QueryStats(decoded["stats"]))
return FeedPage(decoded["events"], decoded["cursor"],
QueryStats(decoded["stats"]))

def flatten(self) -> Iterator:
"""A generator that yields events instead of pages of events."""
Expand Down
8 changes: 4 additions & 4 deletions fauna/encoding/decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from iso8601 import parse_date

from fauna.query.models import Module, DocumentReference, Document, NamedDocument, NamedDocumentReference, Page, \
NullDocument, StreamToken
NullDocument, EventSource


class FaunaDecoder:
Expand Down Expand Up @@ -45,7 +45,7 @@ class FaunaDecoder:
+--------------------+---------------+
| Page | @set |
+--------------------+---------------+
| StreamToken | @stream |
| EventSource | @stream |
+--------------------+---------------+
"""
Expand All @@ -64,7 +64,7 @@ def decode(obj: Any):
- { "@ref": ... } decodes to a DocumentReference or NamedDocumentReference
- { "@mod": ... } decodes to a Module
- { "@set": ... } decodes to a Page
- { "@stream": ... } decodes to a StreamToken
- { "@stream": ... } decodes to an EventSource
- { "@bytes": ... } decodes to a bytearray
:param obj: the object to decode
Expand Down Expand Up @@ -176,6 +176,6 @@ def _decode_dict(dct: dict, escaped: bool):
return Page(data=data, after=after)

if "@stream" in dct:
return StreamToken(dct["@stream"])
return EventSource(dct["@stream"])

return {k: FaunaDecoder._decode(v) for k, v in dct.items()}
10 changes: 5 additions & 5 deletions fauna/encoding/encoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from typing import Any, Optional, List, Union

from fauna.query.models import DocumentReference, Module, Document, NamedDocument, NamedDocumentReference, NullDocument, \
StreamToken
EventSource
from fauna.query.query_builder import Query, Fragment, LiteralFragment, ValueFragment

_RESERVED_TAGS = [
Expand Down Expand Up @@ -62,7 +62,7 @@ class FaunaEncoder:
+-------------------------------+---------------+
| TemplateFragment | string |
+-------------------------------+---------------+
| StreamToken | string |
| EventSource | string |
+-------------------------------+---------------+
"""
Expand All @@ -82,7 +82,7 @@ def encode(obj: Any) -> Any:
- Query encodes to { "fql": [...] }
- ValueFragment encodes to { "value": <encoded_val> }
- LiteralFragment encodes to a string
- StreamToken encodes to a string
- EventSource encodes to a string
:raises ValueError: If value cannot be encoded, cannot be encoded safely, or there's a circular reference.
:param obj: the object to decode
Expand Down Expand Up @@ -163,7 +163,7 @@ def from_query_interpolation_builder(obj: Query):
return {"fql": [FaunaEncoder.from_fragment(f) for f in obj.fragments]}

@staticmethod
def from_streamtoken(obj: StreamToken):
def from_streamtoken(obj: EventSource):
return {"@stream": obj.token}

@staticmethod
Expand Down Expand Up @@ -208,7 +208,7 @@ def _encode(o: Any, _markers: Optional[List] = None):
return FaunaEncoder._encode_dict(o, _markers)
elif isinstance(o, Query):
return FaunaEncoder.from_query_interpolation_builder(o)
elif isinstance(o, StreamToken):
elif isinstance(o, EventSource):
return FaunaEncoder.from_streamtoken(o)
else:
raise ValueError(f"Object {o} of type {type(o)} cannot be encoded")
Expand Down
2 changes: 1 addition & 1 deletion fauna/query/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
from .models import Document, DocumentReference, NamedDocument, NamedDocumentReference, NullDocument, Module, Page
from .models import Document, DocumentReference, EventSource, NamedDocument, NamedDocumentReference, NullDocument, Module, Page
from .query_builder import fql, Query
23 changes: 20 additions & 3 deletions fauna/query/models.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,25 @@
import warnings
from collections.abc import Mapping
from datetime import datetime
from typing import Union, Iterator, Any, Optional, List


# NB. Override __getattr__ and __dir__ to deprecate StreamToken usages. Based
# on: https://peps.python.org/pep-0562/
def __getattr__(name):
if name == "StreamToken":
warnings.warn(
"StreamToken is deprecated. Prefer fauna.query.EventSource instead.",
DeprecationWarning,
stacklevel=2)
return EventSource
return super.__getattr__(name) # pyright: ignore


def __dir__():
return super.__dir__() + "StreamToken" # pyright: ignore


class Page:
"""A class representing a Set in Fauna."""

Expand Down Expand Up @@ -36,14 +53,14 @@ def __ne__(self, other):
return not self.__eq__(other)


class StreamToken:
"""A class represeting a Stream in Fauna."""
class EventSource:
"""A class represeting an EventSource in Fauna."""

def __init__(self, token: str):
self.token = token

def __eq__(self, other):
return isinstance(other, StreamToken) and self.token == other.token
return isinstance(other, EventSource) and self.token == other.token

def __hash__(self):
return hash(self.token)
Expand Down
Loading

0 comments on commit 4ca32e5

Please sign in to comment.