From 646612222f2373e29c69a9983efa9c68dfafa5ce Mon Sep 17 00:00:00 2001 From: Darren Cunningham Date: Thu, 17 Oct 2024 16:16:20 -0400 Subject: [PATCH 01/24] rename event stream resources --- README.md | 10 +++++----- client.go | 2 +- stream_test.go | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 1229a91..f16e2f0 100644 --- a/README.md +++ b/README.md @@ -340,11 +340,11 @@ func main() { } ``` -In query results, the driver represents event sources as `fauna.EventSource` +In query results, the driver represents stream tokens as `fauna.Stream` values. -To start a stream from a query result, call `Stream()` and pass the -`fauna.EventSource`. This lets you output a stream alongside normal query +To start a stream from a query result, call `Subscribe()` on a +`fauna.Stream` value. This lets you output a stream alongside normal query results: ```go @@ -415,9 +415,9 @@ func main() { ### Stream options The [client configuration](#client-configuration) sets default query options for -`StreamFromQuery()` and `Stream()`. +`Stream()`. To override these options, see [query -The `StreamFromQuery()` and `Stream()` methods accept +The `Subscribe()` method accepts the `fauna.StartTime` and `fauna.EventCursor` [StreamOptFn](https://pkg.go.dev/github.com/fauna/fauna-go/v2#StreamOptFn) functions as arguments. diff --git a/client.go b/client.go index 27c3020..c5e6b4c 100644 --- a/client.go +++ b/client.go @@ -335,7 +335,7 @@ func (c *Client) StreamFromQuery(fql *Query, streamOpts []StreamOptFn, opts ...Q return c.Stream(stream, streamOpts...) } - return nil, fmt.Errorf("query should return a fauna.EventSource but got %T", res.Data) + return nil, fmt.Errorf("expected query to return a fauna.Stream but got %T", res.Data) } // Stream initiates a stream subscription for the given stream value. diff --git a/stream_test.go b/stream_test.go index 29cf189..668fe33 100644 --- a/stream_test.go +++ b/stream_test.go @@ -45,7 +45,7 @@ func TestStreaming(t *testing.T) { t.Run("Fails on non-streamable values", func(t *testing.T) { streamQ, _ := fauna.FQL(`"I'm a string"`, nil) events, err := client.StreamFromQuery(streamQ, nil) - require.ErrorContains(t, err, "query should return a fauna.EventSource but got string") + require.ErrorContains(t, err, "expected query to return a fauna.Stream but got string") require.Nil(t, events) }) }) From 474b87c6fd665e7d2de587bc8474027a11438250 Mon Sep 17 00:00:00 2001 From: Darren Cunningham Date: Wed, 23 Oct 2024 10:34:34 -0400 Subject: [PATCH 02/24] touchups --- client.go | 2 +- stream_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/client.go b/client.go index c5e6b4c..5726de7 100644 --- a/client.go +++ b/client.go @@ -335,7 +335,7 @@ func (c *Client) StreamFromQuery(fql *Query, streamOpts []StreamOptFn, opts ...Q return c.Stream(stream, streamOpts...) } - return nil, fmt.Errorf("expected query to return a fauna.Stream but got %T", res.Data) + return nil, fmt.Errorf("expected query to return a fauna.StreamFromQuery but got %T", res.Data) } // Stream initiates a stream subscription for the given stream value. diff --git a/stream_test.go b/stream_test.go index 668fe33..f95d22c 100644 --- a/stream_test.go +++ b/stream_test.go @@ -45,7 +45,7 @@ func TestStreaming(t *testing.T) { t.Run("Fails on non-streamable values", func(t *testing.T) { streamQ, _ := fauna.FQL(`"I'm a string"`, nil) events, err := client.StreamFromQuery(streamQ, nil) - require.ErrorContains(t, err, "expected query to return a fauna.Stream but got string") + require.ErrorContains(t, err, "expected query to return a fauna.StreamFromQuery but got string") require.Nil(t, events) }) }) From 5170b1badf415e8bfdf2cb99a8dd997752dad56b Mon Sep 17 00:00:00 2001 From: Darren Cunningham Date: Thu, 17 Oct 2024 16:16:20 -0400 Subject: [PATCH 03/24] rename event stream resources --- client.go | 2 +- stream_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/client.go b/client.go index 5726de7..27c3020 100644 --- a/client.go +++ b/client.go @@ -335,7 +335,7 @@ func (c *Client) StreamFromQuery(fql *Query, streamOpts []StreamOptFn, opts ...Q return c.Stream(stream, streamOpts...) } - return nil, fmt.Errorf("expected query to return a fauna.StreamFromQuery but got %T", res.Data) + return nil, fmt.Errorf("query should return a fauna.EventSource but got %T", res.Data) } // Stream initiates a stream subscription for the given stream value. diff --git a/stream_test.go b/stream_test.go index f95d22c..29cf189 100644 --- a/stream_test.go +++ b/stream_test.go @@ -45,7 +45,7 @@ func TestStreaming(t *testing.T) { t.Run("Fails on non-streamable values", func(t *testing.T) { streamQ, _ := fauna.FQL(`"I'm a string"`, nil) events, err := client.StreamFromQuery(streamQ, nil) - require.ErrorContains(t, err, "expected query to return a fauna.StreamFromQuery but got string") + require.ErrorContains(t, err, "query should return a fauna.EventSource but got string") require.Nil(t, events) }) }) From cb1f201d17ca74befadad2abf93185c7d6d44326 Mon Sep 17 00:00:00 2001 From: Darren Cunningham Date: Fri, 18 Oct 2024 14:38:03 -0400 Subject: [PATCH 04/24] add event feed --- README.md | 3 +- client.go | 48 ++++++++++-- config.go | 11 +++ event_feed.go | 84 +++++++++++++++++++++ event_feed_example_test.go | 47 ++++++++++++ event_feed_test.go | 150 +++++++++++++++++++++++++++++++++++++ request.go | 37 +++++++++ serializer.go | 10 +++ serializer_test.go | 38 ++++++++++ 9 files changed, 419 insertions(+), 9 deletions(-) create mode 100644 event_feed.go create mode 100644 event_feed_example_test.go create mode 100644 event_feed_test.go diff --git a/README.md b/README.md index f16e2f0..8c6f6f4 100644 --- a/README.md +++ b/README.md @@ -441,7 +441,8 @@ the API reference. ## Debug logging -To enable debug logging set the `FAUNA_DEBUG` environment variable to an integer for the value of the desired [slog.Level](https://pkg.go.dev/log/slog#Level). +## Event Feeds (beta) + For Go versions 1.21 and earlier, the driver uses a [log.Logger](https://pkg.go.dev/log#Logger). For 1.22+, the driver uses the [slog.Logger](https://pkg.go.dev/log/slog#Logger). You can optionally define your own Logger. diff --git a/client.go b/client.go index 27c3020..4f7c6a1 100644 --- a/client.go +++ b/client.go @@ -72,7 +72,7 @@ type Client struct { maxBackoff time.Duration // lazily cached URLs - queryURL, streamURL *url.URL + queryURL, streamURL, feedURL *url.URL logger Logger } @@ -204,20 +204,20 @@ func (c *Client) parseQueryURL() (*url.URL, error) { return c.queryURL, nil } -func (c *Client) parseStreamURL() (*url.URL, error) { - if c.streamURL == nil { - if streamURL, err := url.Parse(c.url); err != nil { +func (c *Client) parseFeedURL() (*url.URL, error) { + if c.feedURL == nil { + if feedURL, err := url.Parse(c.url); err != nil { return nil, err } else { - c.streamURL = streamURL.JoinPath("stream", "1") + c.feedURL = feedURL.JoinPath("changefeed", "1") } } - if c.streamURL == nil { - return nil, fmt.Errorf("stream url is not set") + if c.feedURL == nil { + return nil, fmt.Errorf("feed url is not set") } - return c.streamURL, nil + return c.feedURL, nil } func (c *Client) doWithRetry(req *http.Request) (attempts int, r *http.Response, err error) { @@ -423,3 +423,35 @@ func (c *Client) String() string { func (c *Client) setHeader(key, val string) { c.headers[key] = val } + +func (c *Client) FeedFromQuery(fql *Query, opts ...QueryOptFn) (*EventFeed, error) { + res, err := c.Query(fql, opts...) + if err != nil { + return nil, err + } + + token, ok := res.Data.(EventSource) + if !ok { + return nil, fmt.Errorf("query should return a fauna.EventSource but got %T", res.Data) + } + + return newEventFeed(c, token) +} + +func (c *Client) FeedFromQueryWithOptions(fql *Query, feedOpts []FeedOptFn, opts ...QueryOptFn) (*EventFeed, error) { + res, err := c.Query(fql, opts...) + if err != nil { + return nil, err + } + + token, ok := res.Data.(EventSource) + if !ok { + return nil, fmt.Errorf("query should return a fauna.EventSource but got %T", res.Data) + } + + return newEventFeed(c, token, feedOpts...) +} + +func (c *Client) Feed(stream EventSource, opts ...FeedOptFn) (*EventFeed, error) { + return newEventFeed(c, stream, opts...) +} diff --git a/config.go b/config.go index 5f557ce..93f97cc 100644 --- a/config.go +++ b/config.go @@ -167,3 +167,14 @@ func argsStringFromMap(input map[string]string, currentArgs ...string) string { return strings.ReplaceAll(params.Encode(), "&", ",") } + +// FeedOptFn function to set options on the [Client.NewEventFeed] +type FeedOptFn func(req *feedRequest) + +func EventFeedCursor(cursor string) FeedOptFn { + return func(req *feedRequest) { req.Cursor = cursor } +} + +func EventFeedStartTime(ts int64) FeedOptFn { + return func(req *feedRequest) { req.StartTS = ts } +} diff --git a/event_feed.go b/event_feed.go new file mode 100644 index 0000000..f4ebb9f --- /dev/null +++ b/event_feed.go @@ -0,0 +1,84 @@ +package fauna + +import ( + "encoding/json" +) + +type EventFeed struct { + client *Client + + stream EventSource + opts []FeedOptFn + + decoder *json.Decoder + + lastCursor string + + closed bool +} + +func newEventFeed(client *Client, token EventSource, opts ...FeedOptFn) (*EventFeed, error) { + feed := &EventFeed{ + client: client, + stream: token, + opts: opts, + } + + if err := feed.reconnect(opts...); err != nil { + return nil, err + } + + return feed, nil +} + +func (ef *EventFeed) reconnect(opts ...FeedOptFn) error { + req := feedRequest{ + apiRequest: apiRequest{ + ef.client.ctx, + ef.client.headers, + }, + Stream: ef.stream, + Cursor: ef.lastCursor, + } + + if (opts != nil) && (len(opts) > 0) { + ef.opts = append(ef.opts, opts...) + } + + for _, optFn := range ef.opts { + optFn(&req) + } + + byteStream, err := req.do(ef.client) + if err != nil { + return err + } + + ef.decoder = json.NewDecoder(byteStream) + + return nil +} + +// FeedResponse represents the response from the EventFeed.Events +type FeedResponse struct { + Events []Event `json:"events"` + Cursor string `json:"cursor"` + HasNext bool `json:"has_next"` + Stats Stats `json:"stats"` +} + +// Events return the next FeedResponse from the EventFeed +func (ef *EventFeed) Events() (*FeedResponse, error) { + var response FeedResponse + if err := ef.reconnect(); err != nil { + return nil, err + } + + if err := ef.decoder.Decode(&response); err != nil { + return nil, err + } + + ef.lastCursor = response.Cursor + + return &response, nil +} diff --git a/event_feed_example_test.go b/event_feed_example_test.go new file mode 100644 index 0000000..ca7e843 --- /dev/null +++ b/event_feed_example_test.go @@ -0,0 +1,47 @@ +package fauna_test + +import ( + "fmt" + "log" + + "github.com/fauna/fauna-go/v2" +) + +func ExampleEventFeed_Events() { + client := fauna.NewClient("secret", fauna.DefaultTimeouts(), fauna.URL(fauna.EndpointLocal)) + + query, queryErr := fauna.FQL(`Collection.byName("ChangeFeedTest")?.delete() +Collection.create({ name: "ChangeFeedTest" }) +ChangeFeedTest.all().toStream()`, nil) + if queryErr != nil { + log.Fatal(queryErr.Error()) + } + + feed, feedErr := client.FeedFromQuery(query) + if feedErr != nil { + log.Fatal(feedErr.Error()) + } + + addOne, _ := fauna.FQL(`ChangeFeedTest.create({ foo: 'bar' })`, nil) + _, addOneErr := client.Query(addOne) + if addOneErr != nil { + log.Fatal(addOneErr.Error()) + } + + for { + res, eventErr := feed.Events() + if eventErr != nil { + log.Fatal(eventErr.Error()) + } + + for _, event := range res.Events { + fmt.Println(event.Type) + } + + if !res.HasNext { + break + } + } + + // Output: add +} diff --git a/event_feed_test.go b/event_feed_test.go new file mode 100644 index 0000000..644366f --- /dev/null +++ b/event_feed_test.go @@ -0,0 +1,150 @@ +package fauna_test + +import ( + "testing" + "time" + + "github.com/fauna/fauna-go/v2" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestEventFeed(t *testing.T) { + t.Setenv(fauna.EnvFaunaEndpoint, fauna.EndpointLocal) + t.Setenv(fauna.EnvFaunaSecret, "secret") + + client, clientErr := fauna.NewDefaultClient() + require.NoError(t, clientErr) + + resetCollection(t, client) + + t.Run("returns errors correctly", func(t *testing.T) { + t.Run("should error when query doesn't return a stream token", func(t *testing.T) { + query, queryErr := fauna.FQL(`42`, nil) + require.NoError(t, queryErr) + + _, feedErr := client.FeedFromQuery(query) + require.ErrorContains(t, feedErr, "query should return a fauna.EventSource but got int") + }) + }) + + t.Run("can use change feeds from a query", func(t *testing.T) { + query, queryErr := fauna.FQL(`ChangeFeedTest.all().toStream()`, nil) + require.NoError(t, queryErr, "failed to create a query for stream token") + + feed, feedErr := client.FeedFromQuery(query) + require.NoError(t, feedErr, "failed to init events feed") + + createOne(t, client, feed) + createMultipleDocs(t, client, feed, 5, 20) + }) + + t.Run("can get events from EventSource", func(t *testing.T) { + t.Run("can get an EventSource", func(t *testing.T) { + streamToken := getEventSource(t, client) + require.NotNil(t, streamToken, "failed to get stream token") + }) + + t.Run("get events from an EventSource", func(t *testing.T) { + streamToken := getEventSource(t, client) + + feed, feedErr := client.Feed(streamToken) + require.NoError(t, feedErr, "failed to init events feed") + + createOne(t, client, feed) + createMultipleDocs(t, client, feed, 5, 20) + }) + }) + + t.Run("can get events from history", func(t *testing.T) { + resetCollection(t, client) + + createOne(t, client, nil) + + streamToken := getEventSource(t, client) + require.NotNil(t, streamToken, "failed to get stream token") + + feed, feedErr := client.Feed(streamToken) + require.NoError(t, feedErr, "failed to init events feed") + + eventsRes, eventsErr := feed.Events() + require.NoError(t, eventsErr, "failed to get events") + require.Equal(t, 0, len(eventsRes.Events), "unexpected number of events") + + streamToken = getEventSource(t, client) + require.NotNil(t, streamToken, "failed to get stream token") + + feed, feedErr = client.Feed(streamToken, fauna.EventFeedStartTime(time.Now().Add(-time.Minute*10).UnixMicro())) + require.NoError(t, feedErr, "failed to init events feed") + + feedRes, eventsErr := feed.Events() + require.NoError(t, eventsErr, "failed to get events") + require.Equal(t, 1, len(feedRes.Events), "unexpected number of events") + }) +} + +func resetCollection(t *testing.T, client *fauna.Client) { + t.Helper() + + setupQuery, setupQueryErr := fauna.FQL(`Collection.byName("ChangeFeedTest")?.delete() +Collection.create({ name: "ChangeFeedTest" })`, nil) + require.NoError(t, setupQueryErr, "setup query error: %s", setupQueryErr) + + _, setupErr := client.Query(setupQuery) + require.NoError(t, setupErr, "setup error: %s", setupErr) +} + +func getEventSource(t *testing.T, client *fauna.Client) fauna.EventSource { + t.Helper() + + query, queryErr := fauna.FQL(`ChangeFeedTest.all().toStream()`, nil) + require.NoError(t, queryErr, "failed to create a query for stream token") + + streamRes, streamResErr := client.Query(query) + require.NoError(t, streamResErr, "failed to init events feed") + + var eventSource fauna.EventSource + unmarshalErr := streamRes.Unmarshal(&eventSource) + require.NoError(t, unmarshalErr, "failed to unmarshal stream token") + require.NotNil(t, eventSource, "stream token is nil") + require.NotEmpty(t, eventSource, "stream token is empty") + + return eventSource +} + +func createOne(t *testing.T, client *fauna.Client, feed *fauna.EventFeed) { + t.Helper() + + createOneQuery, createOneQueryErr := fauna.FQL("ChangeFeedTest.create({ foo: 'bar' })", nil) + require.NoError(t, createOneQueryErr, "failed to init query for create statement") + require.NotNil(t, createOneQuery, "create statement is nil") + + _, createOneErr := client.Query(createOneQuery) + require.NoError(t, createOneErr, "failed to create a document") + + if feed == nil { + return + } + + eventsRes, eventsErr := feed.Events() + require.NoError(t, eventsErr, "failed to get events") + + assert.Equal(t, 1, len(eventsRes.Events), "unexpected number of events") +} + +func createMultipleDocs(t *testing.T, client *fauna.Client, feed *fauna.EventFeed, start int, end int) { + t.Helper() + + query, queryErr := fauna.FQL(`Set.sequence(${start}, ${end}).forEach(n => ChangeFeedTest.create({ n: n }))`, map[string]any{ + "start": start, + "end": end, + }) + require.NoError(t, queryErr, "failed to init query for create statement") + + _, err := client.Query(query) + require.NoError(t, err) + + eventsRes, eventsErr := feed.Events() + require.NoError(t, eventsErr, "failed to get events from EventSource") + require.Equal(t, end-start, len(eventsRes.Events), "unexpected number of events") +} diff --git a/request.go b/request.go index b10d166..f212a14 100644 --- a/request.go +++ b/request.go @@ -179,3 +179,40 @@ func (streamReq *streamRequest) do(cli *Client) (bytes io.ReadCloser, err error) bytes = httpRes.Body return } + +type feedRequest struct { + apiRequest + Stream EventSource + StartTS int64 + Cursor string +} + +func (feedReq *feedRequest) do(cli *Client) (io.ReadCloser, error) { + bytesOut, marshalErr := marshal(feedReq) + if marshalErr != nil { + return nil, fmt.Errorf("marshal request failed: %w", marshalErr) + } + + changeFeedURL, parseURLErr := cli.parseFeedURL() + if parseURLErr != nil { + return nil, fmt.Errorf("parse url failed: %w", parseURLErr) + } + + attempts, httpRes, postErr := feedReq.post(cli, changeFeedURL, bytesOut) + if postErr != nil { + return nil, fmt.Errorf("post request failed: %w", postErr) + } + + if httpRes.StatusCode != http.StatusOK { + qRes, err := parseQueryResponse(httpRes) + if err == nil { + if err = getErrFauna(httpRes.StatusCode, qRes, attempts); err == nil { + err = fmt.Errorf("unknown error for http status: %d", httpRes.StatusCode) + } + } + + return nil, err + } + + return httpRes.Body, nil +} diff --git a/serializer.go b/serializer.go index 6959280..38ffaa9 100644 --- a/serializer.go +++ b/serializer.go @@ -523,6 +523,16 @@ func encode(v any, hint string) (any, error) { } return out, nil + case feedRequest: + out := map[string]any{"token": string(vt.Stream)} + if vt.StartTS > 0 { + out["start_ts"] = vt.StartTS + } + if len(vt.Cursor) > 0 { + out["cursor"] = vt.Cursor + } + return out, nil + case []byte: return encodeBytes(vt) } diff --git a/serializer_test.go b/serializer_test.go index fe95148..56f0987 100644 --- a/serializer_test.go +++ b/serializer_test.go @@ -1,6 +1,7 @@ package fauna import ( + "context" "encoding/base64" "reflect" "testing" @@ -623,3 +624,40 @@ func TestComposition(t *testing.T) { } }) } + +func TestMarshalEventSourceStructs(t *testing.T) { + t.Run("marshal query request", func(t *testing.T) { + marshalAndCheck(t, queryRequest{ + apiRequest: apiRequest{ + Context: context.Background(), + Headers: map[string]string{}, + }, + Query: nil, + Arguments: nil, + }) + }) + + t.Run("marshal stream request", func(t *testing.T) { + marshalAndCheck(t, streamRequest{ + apiRequest: apiRequest{ + Context: context.Background(), + Headers: map[string]string{}, + }, + Stream: "", + StartTS: 0, + Cursor: "", + }) + }) + + t.Run("marshal feed request", func(t *testing.T) { + marshalAndCheck(t, feedRequest{ + apiRequest: apiRequest{ + Context: context.Background(), + Headers: map[string]string{}, + }, + Stream: "", + StartTS: 0, + Cursor: "", + }) + }) +} From 4a6a2122888215f63f789f2659d503084df29ff9 Mon Sep 17 00:00:00 2001 From: Darren Cunningham Date: Wed, 23 Oct 2024 10:03:32 -0400 Subject: [PATCH 05/24] thanks golangci-lint --- event_feed.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/event_feed.go b/event_feed.go index f4ebb9f..c628930 100644 --- a/event_feed.go +++ b/event_feed.go @@ -13,8 +13,6 @@ type EventFeed struct { decoder *json.Decoder lastCursor string - - closed bool } func newEventFeed(client *Client, token EventSource, opts ...FeedOptFn) (*EventFeed, error) { From ebf98cb9c8d29561c2480d3a198f6365b3c26808 Mon Sep 17 00:00:00 2001 From: Darren Cunningham Date: Wed, 23 Oct 2024 10:11:46 -0400 Subject: [PATCH 06/24] add change feeds flag to docker --- .github/workflows/pr-validate-driver.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/pr-validate-driver.yml b/.github/workflows/pr-validate-driver.yml index d513b6a..61f3b21 100644 --- a/.github/workflows/pr-validate-driver.yml +++ b/.github/workflows/pr-validate-driver.yml @@ -21,6 +21,8 @@ jobs: image: fauna/faunadb:latest ports: - 8443:8443 + env: + FLAG_ACCOUNT_CHANGE_FEEDS: true strategy: matrix: go: [ '1.23', '1.22', '1.21', '1.20', '1.19' ] From 4a9d5ea40199dbcc9a799717d90cd3496e679aad Mon Sep 17 00:00:00 2001 From: Darren Cunningham Date: Wed, 23 Oct 2024 10:42:10 -0400 Subject: [PATCH 07/24] revert unintentional delete --- client.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/client.go b/client.go index 4f7c6a1..06fdaff 100644 --- a/client.go +++ b/client.go @@ -204,6 +204,22 @@ func (c *Client) parseQueryURL() (*url.URL, error) { return c.queryURL, nil } +func (c *Client) parseStreamURL() (*url.URL, error) { + if c.streamURL == nil { + if streamURL, err := url.Parse(c.url); err != nil { + return nil, err + } else { + c.streamURL = streamURL.JoinPath("stream", "1") + } + } + + if c.streamURL == nil { + return nil, fmt.Errorf("stream url is not set") + } + + return c.streamURL, nil +} + func (c *Client) parseFeedURL() (*url.URL, error) { if c.feedURL == nil { if feedURL, err := url.Parse(c.url); err != nil { From 9d8abad23a7c5938b061808b6f0011e66f45d783 Mon Sep 17 00:00:00 2001 From: Darren Cunningham Date: Fri, 25 Oct 2024 17:44:54 -0400 Subject: [PATCH 08/24] updated url --- client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client.go b/client.go index 06fdaff..bb90b49 100644 --- a/client.go +++ b/client.go @@ -225,7 +225,7 @@ func (c *Client) parseFeedURL() (*url.URL, error) { if feedURL, err := url.Parse(c.url); err != nil { return nil, err } else { - c.feedURL = feedURL.JoinPath("changefeed", "1") + c.feedURL = feedURL.JoinPath("feed", "1") } } From 619df392f060ebb2db8b3159ca63dc2e8948b8c2 Mon Sep 17 00:00:00 2001 From: Darren Cunningham Date: Fri, 25 Oct 2024 17:45:08 -0400 Subject: [PATCH 09/24] update references to change feed --- event_feed_example_test.go | 8 ++++---- event_feed_test.go | 14 +++++++------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/event_feed_example_test.go b/event_feed_example_test.go index ca7e843..681ff8f 100644 --- a/event_feed_example_test.go +++ b/event_feed_example_test.go @@ -10,9 +10,9 @@ import ( func ExampleEventFeed_Events() { client := fauna.NewClient("secret", fauna.DefaultTimeouts(), fauna.URL(fauna.EndpointLocal)) - query, queryErr := fauna.FQL(`Collection.byName("ChangeFeedTest")?.delete() -Collection.create({ name: "ChangeFeedTest" }) -ChangeFeedTest.all().toStream()`, nil) + query, queryErr := fauna.FQL(`Collection.byName("EventFeedTest")?.delete() +Collection.create({ name: "EventFeedTest" }) +EventFeedTest.all().toStream()`, nil) if queryErr != nil { log.Fatal(queryErr.Error()) } @@ -22,7 +22,7 @@ ChangeFeedTest.all().toStream()`, nil) log.Fatal(feedErr.Error()) } - addOne, _ := fauna.FQL(`ChangeFeedTest.create({ foo: 'bar' })`, nil) + addOne, _ := fauna.FQL(`EventFeedTest.create({ foo: 'bar' })`, nil) _, addOneErr := client.Query(addOne) if addOneErr != nil { log.Fatal(addOneErr.Error()) diff --git a/event_feed_test.go b/event_feed_test.go index 644366f..ebdf9e2 100644 --- a/event_feed_test.go +++ b/event_feed_test.go @@ -28,8 +28,8 @@ func TestEventFeed(t *testing.T) { }) }) - t.Run("can use change feeds from a query", func(t *testing.T) { - query, queryErr := fauna.FQL(`ChangeFeedTest.all().toStream()`, nil) + t.Run("can use event feeds from a query", func(t *testing.T) { + query, queryErr := fauna.FQL(`EventFeedTest.all().toStream()`, nil) require.NoError(t, queryErr, "failed to create a query for stream token") feed, feedErr := client.FeedFromQuery(query) @@ -86,8 +86,8 @@ func TestEventFeed(t *testing.T) { func resetCollection(t *testing.T, client *fauna.Client) { t.Helper() - setupQuery, setupQueryErr := fauna.FQL(`Collection.byName("ChangeFeedTest")?.delete() -Collection.create({ name: "ChangeFeedTest" })`, nil) + setupQuery, setupQueryErr := fauna.FQL(`Collection.byName("EventFeedTest")?.delete() +Collection.create({ name: "EventFeedTest" })`, nil) require.NoError(t, setupQueryErr, "setup query error: %s", setupQueryErr) _, setupErr := client.Query(setupQuery) @@ -97,7 +97,7 @@ Collection.create({ name: "ChangeFeedTest" })`, nil) func getEventSource(t *testing.T, client *fauna.Client) fauna.EventSource { t.Helper() - query, queryErr := fauna.FQL(`ChangeFeedTest.all().toStream()`, nil) + query, queryErr := fauna.FQL(`EventFeedTest.all().toStream()`, nil) require.NoError(t, queryErr, "failed to create a query for stream token") streamRes, streamResErr := client.Query(query) @@ -115,7 +115,7 @@ func getEventSource(t *testing.T, client *fauna.Client) fauna.EventSource { func createOne(t *testing.T, client *fauna.Client, feed *fauna.EventFeed) { t.Helper() - createOneQuery, createOneQueryErr := fauna.FQL("ChangeFeedTest.create({ foo: 'bar' })", nil) + createOneQuery, createOneQueryErr := fauna.FQL("EventFeedTest.create({ foo: 'bar' })", nil) require.NoError(t, createOneQueryErr, "failed to init query for create statement") require.NotNil(t, createOneQuery, "create statement is nil") @@ -135,7 +135,7 @@ func createOne(t *testing.T, client *fauna.Client, feed *fauna.EventFeed) { func createMultipleDocs(t *testing.T, client *fauna.Client, feed *fauna.EventFeed, start int, end int) { t.Helper() - query, queryErr := fauna.FQL(`Set.sequence(${start}, ${end}).forEach(n => ChangeFeedTest.create({ n: n }))`, map[string]any{ + query, queryErr := fauna.FQL(`Set.sequence(${start}, ${end}).forEach(n => EventFeedTest.create({ n: n }))`, map[string]any{ "start": start, "end": end, }) From 0b1438b348c3b1d707d3f9236dfc926fc7b04793 Mon Sep 17 00:00:00 2001 From: Darren Cunningham Date: Mon, 28 Oct 2024 15:01:32 -0400 Subject: [PATCH 10/24] v3 references --- README.md | 4 ++-- event_feed_example_test.go | 2 +- event_feed_test.go | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 8c6f6f4..ec34070 100644 --- a/README.md +++ b/README.md @@ -418,7 +418,7 @@ The [client configuration](#client-configuration) sets default query options for `Stream()`. To override these options, see [query The `Subscribe()` method accepts the `fauna.StartTime` and `fauna.EventCursor` -[StreamOptFn](https://pkg.go.dev/github.com/fauna/fauna-go/v2#StreamOptFn) +[StreamOptFn](https://pkg.go.dev/github.com/fauna/fauna-go/v3#StreamOptFn) functions as arguments. Use `fauna.StartTime()` to restart a stream at a specific timestamp: @@ -436,7 +436,7 @@ client.StreamFromQuery(streamQuery, nil, fauna.EventCursor("abc2345==")) ``` For supported functions, see -[StreamOptFn](https://pkg.go.dev/github.com/fauna/fauna-go/v2#StreamOptFn) in +[StreamOptFn](https://pkg.go.dev/github.com/fauna/fauna-go/v3#StreamOptFn) in the API reference. ## Debug logging diff --git a/event_feed_example_test.go b/event_feed_example_test.go index 681ff8f..ff6dc4c 100644 --- a/event_feed_example_test.go +++ b/event_feed_example_test.go @@ -4,7 +4,7 @@ import ( "fmt" "log" - "github.com/fauna/fauna-go/v2" + "github.com/fauna/fauna-go/v3" ) func ExampleEventFeed_Events() { diff --git a/event_feed_test.go b/event_feed_test.go index ebdf9e2..1dd3408 100644 --- a/event_feed_test.go +++ b/event_feed_test.go @@ -4,7 +4,7 @@ import ( "testing" "time" - "github.com/fauna/fauna-go/v2" + "github.com/fauna/fauna-go/v3" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) From 1cc05febe86e50621aa97aeff6e442e76cde7cff Mon Sep 17 00:00:00 2001 From: Darren Cunningham Date: Mon, 28 Oct 2024 15:15:01 -0400 Subject: [PATCH 11/24] event feed readme section --- README.md | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index ec34070..7d957e7 100644 --- a/README.md +++ b/README.md @@ -340,11 +340,11 @@ func main() { } ``` -In query results, the driver represents stream tokens as `fauna.Stream` +In query results, the driver represents event sources as `fauna.EventSource` values. -To start a stream from a query result, call `Subscribe()` on a -`fauna.Stream` value. This lets you output a stream alongside normal query +To start a stream from a query result, call `Stream()` and pass the +`fauna.EventSource`. This lets you output a stream alongside normal query results: ```go @@ -415,10 +415,10 @@ func main() { ### Stream options The [client configuration](#client-configuration) sets default query options for -`Stream()`. To override these options, see [query +`StreamFromQuery()` and `Stream()`. -The `Subscribe()` method accepts the `fauna.StartTime` and `fauna.EventCursor` -[StreamOptFn](https://pkg.go.dev/github.com/fauna/fauna-go/v3#StreamOptFn) +The `StreamFromQuery()` and `Stream()` methods accept +[StreamOptFn](https://pkg.go.dev/github.com/fauna/fauna-go/v2#StreamOptFn) functions as arguments. Use `fauna.StartTime()` to restart a stream at a specific timestamp: @@ -436,13 +436,16 @@ client.StreamFromQuery(streamQuery, nil, fauna.EventCursor("abc2345==")) ``` For supported functions, see -[StreamOptFn](https://pkg.go.dev/github.com/fauna/fauna-go/v3#StreamOptFn) in +[StreamOptFn](https://pkg.go.dev/github.com/fauna/fauna-go/v2#StreamOptFn) in the API reference. -## Debug logging - ## Event Feeds (beta) +The driver supports [Event Feeds](https://docs.fauna.com/fauna/current/learn/track-changes/streaming/#change-feeds) see [example](event_feed_example_test.go). + +## Debug logging + +To enable debug logging set the `FAUNA_DEBUG` environment variable to an integer for the value of the desired [slog.Level](https://pkg.go.dev/log/slog#Level). For Go versions 1.21 and earlier, the driver uses a [log.Logger](https://pkg.go.dev/log#Logger). For 1.22+, the driver uses the [slog.Logger](https://pkg.go.dev/log/slog#Logger). You can optionally define your own Logger. From 74243eae66eab01b3ae5e373e845e92628c13b33 Mon Sep 17 00:00:00 2001 From: Darren Cunningham Date: Mon, 28 Oct 2024 15:16:17 -0400 Subject: [PATCH 12/24] updated URL --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 7d957e7..8358769 100644 --- a/README.md +++ b/README.md @@ -441,7 +441,7 @@ the API reference. ## Event Feeds (beta) -The driver supports [Event Feeds](https://docs.fauna.com/fauna/current/learn/track-changes/streaming/#change-feeds) see [example](event_feed_example_test.go). +The driver supports [Event Feeds](https://docs.fauna.com/fauna/current/learn/cdc/#event-feeds) see [example](event_feed_example_test.go). ## Debug logging From a57c5cb83fcf8ff60d851ad5c67f57412da8cf88 Mon Sep 17 00:00:00 2001 From: Darren Cunningham Date: Mon, 28 Oct 2024 15:21:53 -0400 Subject: [PATCH 13/24] workflow env var not needed anymore --- .github/workflows/pr-validate-driver.yml | 2 -- 1 file changed, 2 deletions(-) diff --git a/.github/workflows/pr-validate-driver.yml b/.github/workflows/pr-validate-driver.yml index 61f3b21..d513b6a 100644 --- a/.github/workflows/pr-validate-driver.yml +++ b/.github/workflows/pr-validate-driver.yml @@ -21,8 +21,6 @@ jobs: image: fauna/faunadb:latest ports: - 8443:8443 - env: - FLAG_ACCOUNT_CHANGE_FEEDS: true strategy: matrix: go: [ '1.23', '1.22', '1.21', '1.20', '1.19' ] From f1f202b1ffae39890e5bc45d7401c6e4da3a8144 Mon Sep 17 00:00:00 2001 From: Darren Cunningham Date: Mon, 28 Oct 2024 17:15:24 -0400 Subject: [PATCH 14/24] add godoc comments --- client.go | 3 +++ event_feed.go | 1 + 2 files changed, 4 insertions(+) diff --git a/client.go b/client.go index bb90b49..b1a816d 100644 --- a/client.go +++ b/client.go @@ -440,6 +440,7 @@ func (c *Client) setHeader(key, val string) { c.headers[key] = val } +// FeedFromQuery opens an event feed from the event source returned by the [fauna.Query]. func (c *Client) FeedFromQuery(fql *Query, opts ...QueryOptFn) (*EventFeed, error) { res, err := c.Query(fql, opts...) if err != nil { @@ -454,6 +455,7 @@ func (c *Client) FeedFromQuery(fql *Query, opts ...QueryOptFn) (*EventFeed, erro return newEventFeed(c, token) } +// FeedFromQueryWithOptions initiates an event from the event source returned by the [fauna.Query] with custom options func (c *Client) FeedFromQueryWithOptions(fql *Query, feedOpts []FeedOptFn, opts ...QueryOptFn) (*EventFeed, error) { res, err := c.Query(fql, opts...) if err != nil { @@ -468,6 +470,7 @@ func (c *Client) FeedFromQueryWithOptions(fql *Query, feedOpts []FeedOptFn, opts return newEventFeed(c, token, feedOpts...) } +// Feed opens an event feed from the event source func (c *Client) Feed(stream EventSource, opts ...FeedOptFn) (*EventFeed, error) { return newEventFeed(c, stream, opts...) } diff --git a/event_feed.go b/event_feed.go index c628930..49df0c3 100644 --- a/event_feed.go +++ b/event_feed.go @@ -4,6 +4,7 @@ import ( "encoding/json" ) +// EventFeed represents an event feed subscription. type EventFeed struct { client *Client From 44ae2de7b67c7990019bffb49a5efefb219b024d Mon Sep 17 00:00:00 2001 From: Darren Cunningham Date: Tue, 29 Oct 2024 10:51:43 -0400 Subject: [PATCH 15/24] Update README.md Co-authored-by: James Rodewig --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 8358769..a9bc07e 100644 --- a/README.md +++ b/README.md @@ -441,7 +441,7 @@ the API reference. ## Event Feeds (beta) -The driver supports [Event Feeds](https://docs.fauna.com/fauna/current/learn/cdc/#event-feeds) see [example](event_feed_example_test.go). +The driver supports [Event Feeds](https://docs.fauna.com/fauna/current/learn/cdc/#event-feeds). See [example](event_feed_example_test.go). ## Debug logging From d4af0adf074b1e30f49def5540144a72f7a42ba1 Mon Sep 17 00:00:00 2001 From: Darren Cunningham Date: Tue, 29 Oct 2024 11:06:58 -0400 Subject: [PATCH 16/24] Update client.go Co-authored-by: Lucas Pedroza <40873230+pnwpedro@users.noreply.github.com> --- client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client.go b/client.go index b1a816d..1190fb7 100644 --- a/client.go +++ b/client.go @@ -456,7 +456,7 @@ func (c *Client) FeedFromQuery(fql *Query, opts ...QueryOptFn) (*EventFeed, erro } // FeedFromQueryWithOptions initiates an event from the event source returned by the [fauna.Query] with custom options -func (c *Client) FeedFromQueryWithOptions(fql *Query, feedOpts []FeedOptFn, opts ...QueryOptFn) (*EventFeed, error) { +func (c *Client) FeedFromQueryWithStart(fql *Query, feedStart FeedStartFn, opts ...QueryOptFn) (*EventFeed, error) { res, err := c.Query(fql, opts...) if err != nil { return nil, err From 274c87788b5abcfb021dc726c51e7a83253f1bed Mon Sep 17 00:00:00 2001 From: Darren Cunningham Date: Tue, 29 Oct 2024 11:17:36 -0400 Subject: [PATCH 17/24] PR feedback --- client.go | 18 ++---------- config.go | 6 +--- event_feed.go | 14 +++++----- event_feed_test.go | 70 +++++++++++++++++++++++++++------------------- request.go | 2 +- serializer.go | 2 +- serializer_test.go | 2 +- 7 files changed, 56 insertions(+), 58 deletions(-) diff --git a/client.go b/client.go index 1190fb7..c974f34 100644 --- a/client.go +++ b/client.go @@ -197,10 +197,6 @@ func (c *Client) parseQueryURL() (*url.URL, error) { } } - if c.queryURL == nil { - return nil, fmt.Errorf("query url is not set") - } - return c.queryURL, nil } @@ -213,10 +209,6 @@ func (c *Client) parseStreamURL() (*url.URL, error) { } } - if c.streamURL == nil { - return nil, fmt.Errorf("stream url is not set") - } - return c.streamURL, nil } @@ -229,10 +221,6 @@ func (c *Client) parseFeedURL() (*url.URL, error) { } } - if c.feedURL == nil { - return nil, fmt.Errorf("feed url is not set") - } - return c.feedURL, nil } @@ -455,8 +443,8 @@ func (c *Client) FeedFromQuery(fql *Query, opts ...QueryOptFn) (*EventFeed, erro return newEventFeed(c, token) } -// FeedFromQueryWithOptions initiates an event from the event source returned by the [fauna.Query] with custom options -func (c *Client) FeedFromQueryWithStart(fql *Query, feedStart FeedStartFn, opts ...QueryOptFn) (*EventFeed, error) { +// FeedFromQueryWithStartTime initiates an event from the event source returned by the [fauna.Query] from the given start time +func (c *Client) FeedFromQueryWithStartTime(fql *Query, startTime time.Time, opts ...QueryOptFn) (*EventFeed, error) { res, err := c.Query(fql, opts...) if err != nil { return nil, err @@ -467,7 +455,7 @@ func (c *Client) FeedFromQueryWithStart(fql *Query, feedStart FeedStartFn, opts return nil, fmt.Errorf("query should return a fauna.EventSource but got %T", res.Data) } - return newEventFeed(c, token, feedOpts...) + return newEventFeed(c, token, FeedStartFn(startTime.UnixMicro())) } // Feed opens an event feed from the event source diff --git a/config.go b/config.go index 93f97cc..74fca53 100644 --- a/config.go +++ b/config.go @@ -171,10 +171,6 @@ func argsStringFromMap(input map[string]string, currentArgs ...string) string { // FeedOptFn function to set options on the [Client.NewEventFeed] type FeedOptFn func(req *feedRequest) -func EventFeedCursor(cursor string) FeedOptFn { - return func(req *feedRequest) { req.Cursor = cursor } -} - -func EventFeedStartTime(ts int64) FeedOptFn { +func FeedStartFn(ts int64) FeedOptFn { return func(req *feedRequest) { req.StartTS = ts } } diff --git a/event_feed.go b/event_feed.go index 49df0c3..bb8169c 100644 --- a/event_feed.go +++ b/event_feed.go @@ -8,7 +8,7 @@ import ( type EventFeed struct { client *Client - stream EventSource + source EventSource opts []FeedOptFn decoder *json.Decoder @@ -16,27 +16,27 @@ type EventFeed struct { lastCursor string } -func newEventFeed(client *Client, token EventSource, opts ...FeedOptFn) (*EventFeed, error) { +func newEventFeed(client *Client, source EventSource, opts ...FeedOptFn) (*EventFeed, error) { feed := &EventFeed{ client: client, - stream: token, + source: source, opts: opts, } - if err := feed.reconnect(opts...); err != nil { + if err := feed.open(opts...); err != nil { return nil, err } return feed, nil } -func (ef *EventFeed) reconnect(opts ...FeedOptFn) error { +func (ef *EventFeed) open(opts ...FeedOptFn) error { req := feedRequest{ apiRequest: apiRequest{ ef.client.ctx, ef.client.headers, }, - Stream: ef.stream, + Source: ef.source, Cursor: ef.lastCursor, } @@ -69,7 +69,7 @@ type FeedResponse struct { // Events return the next FeedResponse from the EventFeed func (ef *EventFeed) Events() (*FeedResponse, error) { var response FeedResponse - if err := ef.reconnect(); err != nil { + if err := ef.open(); err != nil { return nil, err } diff --git a/event_feed_test.go b/event_feed_test.go index 1dd3408..ed80f71 100644 --- a/event_feed_test.go +++ b/event_feed_test.go @@ -19,7 +19,7 @@ func TestEventFeed(t *testing.T) { resetCollection(t, client) t.Run("returns errors correctly", func(t *testing.T) { - t.Run("should error when query doesn't return a stream token", func(t *testing.T) { + t.Run("should error when query doesn't return an event source", func(t *testing.T) { query, queryErr := fauna.FQL(`42`, nil) require.NoError(t, queryErr) @@ -29,30 +29,48 @@ func TestEventFeed(t *testing.T) { }) t.Run("can use event feeds from a query", func(t *testing.T) { - query, queryErr := fauna.FQL(`EventFeedTest.all().toStream()`, nil) - require.NoError(t, queryErr, "failed to create a query for stream token") + query, queryErr := fauna.FQL(`EventFeedTest.all().eventSource()`, nil) + require.NoError(t, queryErr, "failed to create a query for EventSource") feed, feedErr := client.FeedFromQuery(query) require.NoError(t, feedErr, "failed to init events feed") + var ( + start = 5 + end = 20 + ) + createOne(t, client, feed) - createMultipleDocs(t, client, feed, 5, 20) + createMultipleDocs(t, client, start, end) + + eventsRes, eventsErr := feed.Events() + require.NoError(t, eventsErr, "failed to get events from EventSource") + require.Equal(t, end-start, len(eventsRes.Events), "unexpected number of events") }) t.Run("can get events from EventSource", func(t *testing.T) { t.Run("can get an EventSource", func(t *testing.T) { - streamToken := getEventSource(t, client) - require.NotNil(t, streamToken, "failed to get stream token") + eventSource := getEventSource(t, client) + require.NotNil(t, eventSource, "failed to get an EventSource") }) t.Run("get events from an EventSource", func(t *testing.T) { - streamToken := getEventSource(t, client) + eventSource := getEventSource(t, client) - feed, feedErr := client.Feed(streamToken) + feed, feedErr := client.Feed(eventSource) require.NoError(t, feedErr, "failed to init events feed") + var ( + start = 5 + end = 20 + ) + createOne(t, client, feed) - createMultipleDocs(t, client, feed, 5, 20) + createMultipleDocs(t, client, start, end) + + eventsRes, eventsErr := feed.Events() + require.NoError(t, eventsErr, "failed to get events from EventSource") + require.Equal(t, end-start, len(eventsRes.Events), "unexpected number of events") }) }) @@ -61,20 +79,20 @@ func TestEventFeed(t *testing.T) { createOne(t, client, nil) - streamToken := getEventSource(t, client) - require.NotNil(t, streamToken, "failed to get stream token") + eventSource := getEventSource(t, client) + require.NotNil(t, eventSource, "failed to get an EventSource") - feed, feedErr := client.Feed(streamToken) + feed, feedErr := client.Feed(eventSource) require.NoError(t, feedErr, "failed to init events feed") eventsRes, eventsErr := feed.Events() require.NoError(t, eventsErr, "failed to get events") require.Equal(t, 0, len(eventsRes.Events), "unexpected number of events") - streamToken = getEventSource(t, client) - require.NotNil(t, streamToken, "failed to get stream token") + eventSource = getEventSource(t, client) + require.NotNil(t, eventSource, "failed to get an EventSource") - feed, feedErr = client.Feed(streamToken, fauna.EventFeedStartTime(time.Now().Add(-time.Minute*10).UnixMicro())) + feed, feedErr = client.Feed(eventSource, fauna.FeedStartFn(time.Now().Add(-time.Minute*10).UnixMicro())) require.NoError(t, feedErr, "failed to init events feed") feedRes, eventsErr := feed.Events() @@ -97,17 +115,17 @@ Collection.create({ name: "EventFeedTest" })`, nil) func getEventSource(t *testing.T, client *fauna.Client) fauna.EventSource { t.Helper() - query, queryErr := fauna.FQL(`EventFeedTest.all().toStream()`, nil) - require.NoError(t, queryErr, "failed to create a query for stream token") + query, queryErr := fauna.FQL(`EventFeedTest.all().eventSource()`, nil) + require.NoError(t, queryErr, "failed to create a query for EventSource") - streamRes, streamResErr := client.Query(query) - require.NoError(t, streamResErr, "failed to init events feed") + feedRes, feedResErr := client.Query(query) + require.NoError(t, feedResErr, "failed to init events feed") var eventSource fauna.EventSource - unmarshalErr := streamRes.Unmarshal(&eventSource) - require.NoError(t, unmarshalErr, "failed to unmarshal stream token") - require.NotNil(t, eventSource, "stream token is nil") - require.NotEmpty(t, eventSource, "stream token is empty") + unmarshalErr := feedRes.Unmarshal(&eventSource) + require.NoError(t, unmarshalErr, "failed to unmarshal EventSource") + require.NotNil(t, eventSource, "event source is nil") + require.NotEmpty(t, eventSource, "event source is empty") return eventSource } @@ -132,7 +150,7 @@ func createOne(t *testing.T, client *fauna.Client, feed *fauna.EventFeed) { assert.Equal(t, 1, len(eventsRes.Events), "unexpected number of events") } -func createMultipleDocs(t *testing.T, client *fauna.Client, feed *fauna.EventFeed, start int, end int) { +func createMultipleDocs(t *testing.T, client *fauna.Client, start int, end int) { t.Helper() query, queryErr := fauna.FQL(`Set.sequence(${start}, ${end}).forEach(n => EventFeedTest.create({ n: n }))`, map[string]any{ @@ -143,8 +161,4 @@ func createMultipleDocs(t *testing.T, client *fauna.Client, feed *fauna.EventFee _, err := client.Query(query) require.NoError(t, err) - - eventsRes, eventsErr := feed.Events() - require.NoError(t, eventsErr, "failed to get events from EventSource") - require.Equal(t, end-start, len(eventsRes.Events), "unexpected number of events") } diff --git a/request.go b/request.go index f212a14..97d7f12 100644 --- a/request.go +++ b/request.go @@ -182,7 +182,7 @@ func (streamReq *streamRequest) do(cli *Client) (bytes io.ReadCloser, err error) type feedRequest struct { apiRequest - Stream EventSource + Source EventSource StartTS int64 Cursor string } diff --git a/serializer.go b/serializer.go index 38ffaa9..49e22b0 100644 --- a/serializer.go +++ b/serializer.go @@ -524,7 +524,7 @@ func encode(v any, hint string) (any, error) { return out, nil case feedRequest: - out := map[string]any{"token": string(vt.Stream)} + out := map[string]any{"token": string(vt.Source)} if vt.StartTS > 0 { out["start_ts"] = vt.StartTS } diff --git a/serializer_test.go b/serializer_test.go index 56f0987..f14beec 100644 --- a/serializer_test.go +++ b/serializer_test.go @@ -655,7 +655,7 @@ func TestMarshalEventSourceStructs(t *testing.T) { Context: context.Background(), Headers: map[string]string{}, }, - Stream: "", + Source: "", StartTS: 0, Cursor: "", }) From cb6cfe91712a459499d3a51eded86465499a408f Mon Sep 17 00:00:00 2001 From: Darren Cunningham Date: Tue, 29 Oct 2024 12:46:12 -0400 Subject: [PATCH 18/24] feed options as methods --- client.go | 48 ++++++++++++++++++++++++++++++++++------------ config.go | 10 +++++++--- event_feed.go | 6 +++--- event_feed_test.go | 2 +- 4 files changed, 47 insertions(+), 19 deletions(-) diff --git a/client.go b/client.go index c974f34..21f2165 100644 --- a/client.go +++ b/client.go @@ -428,8 +428,7 @@ func (c *Client) setHeader(key, val string) { c.headers[key] = val } -// FeedFromQuery opens an event feed from the event source returned by the [fauna.Query]. -func (c *Client) FeedFromQuery(fql *Query, opts ...QueryOptFn) (*EventFeed, error) { +func (c *Client) getEventSource(fql *Query, opts ...QueryOptFn) (*EventSource, error) { res, err := c.Query(fql, opts...) if err != nil { return nil, err @@ -440,25 +439,50 @@ func (c *Client) FeedFromQuery(fql *Query, opts ...QueryOptFn) (*EventFeed, erro return nil, fmt.Errorf("query should return a fauna.EventSource but got %T", res.Data) } - return newEventFeed(c, token) + return &token, nil } -// FeedFromQueryWithStartTime initiates an event from the event source returned by the [fauna.Query] from the given start time -func (c *Client) FeedFromQueryWithStartTime(fql *Query, startTime time.Time, opts ...QueryOptFn) (*EventFeed, error) { - res, err := c.Query(fql, opts...) +// FeedFromQuery opens an event feed from the event source returned by the [fauna.Query]. +func (c *Client) FeedFromQuery(fql *Query, opts ...QueryOptFn) (*EventFeed, error) { + token, err := c.getEventSource(fql, opts...) if err != nil { return nil, err } - token, ok := res.Data.(EventSource) - if !ok { - return nil, fmt.Errorf("query should return a fauna.EventSource but got %T", res.Data) + return newEventFeed(c, *token) +} + +// FeedFromQueryWithStartTime initiates an event from the event source returned by the [fauna.Query] with custom options +func (c *Client) FeedFromQueryWithStartTime(fql *Query, start FeedStartFn, opts ...QueryOptFn) (*EventFeed, error) { + token, err := c.getEventSource(fql, opts...) + if err != nil { + return nil, err } - return newEventFeed(c, token, FeedStartFn(startTime.UnixMicro())) + return newEventFeed(c, *token, start) +} + +// FeedFromQueryWithCursor initiates an event from the event source returned by the [fauna.Query] with custom options +func (c *Client) FeedFromQueryWithCursor(fql *Query, cursor FeedStartFn, opts ...QueryOptFn) (*EventFeed, error) { + token, err := c.getEventSource(fql, opts...) + if err != nil { + return nil, err + } + + return newEventFeed(c, *token, cursor) } // Feed opens an event feed from the event source -func (c *Client) Feed(stream EventSource, opts ...FeedOptFn) (*EventFeed, error) { - return newEventFeed(c, stream, opts...) +func (c *Client) Feed(stream EventSource) (*EventFeed, error) { + return newEventFeed(c, stream) +} + +// FeedWithStartTime opens an event feed from the event source with options +func (c *Client) FeedWithStartTime(stream EventSource, start FeedStartFn) (*EventFeed, error) { + return newEventFeed(c, stream, start) +} + +// FeedWithCursor opens an event feed from the event source with options +func (c *Client) FeedWithCursor(stream EventSource, cursor FeedStartFn) (*EventFeed, error) { + return newEventFeed(c, stream, cursor) } diff --git a/config.go b/config.go index 74fca53..2ef7de8 100644 --- a/config.go +++ b/config.go @@ -168,9 +168,13 @@ func argsStringFromMap(input map[string]string, currentArgs ...string) string { return strings.ReplaceAll(params.Encode(), "&", ",") } -// FeedOptFn function to set options on the [Client.NewEventFeed] -type FeedOptFn func(req *feedRequest) +// FeedStartFn function to set options on the [Client.NewEventFeed] +type FeedStartFn func(req *feedRequest) -func FeedStartFn(ts int64) FeedOptFn { +func EventFeedCursor(cursor string) FeedStartFn { + return func(req *feedRequest) { req.Cursor = cursor } +} + +func EventFeedStartTime(ts int64) FeedStartFn { return func(req *feedRequest) { req.StartTS = ts } } diff --git a/event_feed.go b/event_feed.go index bb8169c..a588a62 100644 --- a/event_feed.go +++ b/event_feed.go @@ -9,14 +9,14 @@ type EventFeed struct { client *Client source EventSource - opts []FeedOptFn + opts []FeedStartFn decoder *json.Decoder lastCursor string } -func newEventFeed(client *Client, source EventSource, opts ...FeedOptFn) (*EventFeed, error) { +func newEventFeed(client *Client, source EventSource, opts ...FeedStartFn) (*EventFeed, error) { feed := &EventFeed{ client: client, source: source, @@ -30,7 +30,7 @@ func newEventFeed(client *Client, source EventSource, opts ...FeedOptFn) (*Event return feed, nil } -func (ef *EventFeed) open(opts ...FeedOptFn) error { +func (ef *EventFeed) open(opts ...FeedStartFn) error { req := feedRequest{ apiRequest: apiRequest{ ef.client.ctx, diff --git a/event_feed_test.go b/event_feed_test.go index ed80f71..fe4ba46 100644 --- a/event_feed_test.go +++ b/event_feed_test.go @@ -92,7 +92,7 @@ func TestEventFeed(t *testing.T) { eventSource = getEventSource(t, client) require.NotNil(t, eventSource, "failed to get an EventSource") - feed, feedErr = client.Feed(eventSource, fauna.FeedStartFn(time.Now().Add(-time.Minute*10).UnixMicro())) + feed, feedErr = client.FeedWithStartTime(eventSource, fauna.EventFeedStartTime(time.Now().Add(-time.Minute*10).UnixMicro())) require.NoError(t, feedErr, "failed to init events feed") feedRes, eventsErr := feed.Events() From 1d6a89ab74b68981c5739965ea5e8d5f397d74b5 Mon Sep 17 00:00:00 2001 From: Darren Cunningham Date: Tue, 29 Oct 2024 13:22:06 -0400 Subject: [PATCH 19/24] PR feedback --- client.go | 22 ++++++---------------- event_feed_test.go | 2 +- 2 files changed, 7 insertions(+), 17 deletions(-) diff --git a/client.go b/client.go index 21f2165..9d65f02 100644 --- a/client.go +++ b/client.go @@ -453,23 +453,13 @@ func (c *Client) FeedFromQuery(fql *Query, opts ...QueryOptFn) (*EventFeed, erro } // FeedFromQueryWithStartTime initiates an event from the event source returned by the [fauna.Query] with custom options -func (c *Client) FeedFromQueryWithStartTime(fql *Query, start FeedStartFn, opts ...QueryOptFn) (*EventFeed, error) { +func (c *Client) FeedFromQueryWithStartTime(fql *Query, time time.Time, opts ...QueryOptFn) (*EventFeed, error) { token, err := c.getEventSource(fql, opts...) if err != nil { return nil, err } - return newEventFeed(c, *token, start) -} - -// FeedFromQueryWithCursor initiates an event from the event source returned by the [fauna.Query] with custom options -func (c *Client) FeedFromQueryWithCursor(fql *Query, cursor FeedStartFn, opts ...QueryOptFn) (*EventFeed, error) { - token, err := c.getEventSource(fql, opts...) - if err != nil { - return nil, err - } - - return newEventFeed(c, *token, cursor) + return newEventFeed(c, *token, EventFeedStartTime(time.UnixMicro())) } // Feed opens an event feed from the event source @@ -478,11 +468,11 @@ func (c *Client) Feed(stream EventSource) (*EventFeed, error) { } // FeedWithStartTime opens an event feed from the event source with options -func (c *Client) FeedWithStartTime(stream EventSource, start FeedStartFn) (*EventFeed, error) { - return newEventFeed(c, stream, start) +func (c *Client) FeedWithStartTime(stream EventSource, start time.Time) (*EventFeed, error) { + return newEventFeed(c, stream, EventFeedStartTime(start.UnixMicro())) } // FeedWithCursor opens an event feed from the event source with options -func (c *Client) FeedWithCursor(stream EventSource, cursor FeedStartFn) (*EventFeed, error) { - return newEventFeed(c, stream, cursor) +func (c *Client) FeedWithCursor(stream EventSource, cursor string) (*EventFeed, error) { + return newEventFeed(c, stream, EventFeedCursor(cursor)) } diff --git a/event_feed_test.go b/event_feed_test.go index fe4ba46..0ef8e94 100644 --- a/event_feed_test.go +++ b/event_feed_test.go @@ -92,7 +92,7 @@ func TestEventFeed(t *testing.T) { eventSource = getEventSource(t, client) require.NotNil(t, eventSource, "failed to get an EventSource") - feed, feedErr = client.FeedWithStartTime(eventSource, fauna.EventFeedStartTime(time.Now().Add(-time.Minute*10).UnixMicro())) + feed, feedErr = client.FeedWithStartTime(eventSource, time.Now().Add(-time.Minute*10)) require.NoError(t, feedErr, "failed to init events feed") feedRes, eventsErr := feed.Events() From 9d54eae6cbb64295481f2bb6b7d41037be903a33 Mon Sep 17 00:00:00 2001 From: Darren Cunningham Date: Thu, 31 Oct 2024 14:39:35 -0400 Subject: [PATCH 20/24] use feed args --- client.go | 57 ++++++++++------------------ config.go | 11 ------ event_feed.go | 62 +++++++++++++++++------------- event_feed_example_test.go | 13 ++++--- event_feed_test.go | 77 ++++++++++++++++++++++++++++++-------- request.go | 7 ++-- serializer.go | 7 +++- serializer_test.go | 7 ++-- 8 files changed, 136 insertions(+), 105 deletions(-) diff --git a/client.go b/client.go index 9d65f02..deb44dd 100644 --- a/client.go +++ b/client.go @@ -428,51 +428,32 @@ func (c *Client) setHeader(key, val string) { c.headers[key] = val } -func (c *Client) getEventSource(fql *Query, opts ...QueryOptFn) (*EventSource, error) { - res, err := c.Query(fql, opts...) - if err != nil { - return nil, err - } - - token, ok := res.Data.(EventSource) - if !ok { - return nil, fmt.Errorf("query should return a fauna.EventSource but got %T", res.Data) - } - - return &token, nil +// FeedArgs optional arguments for [fauna.Client.Feed] +type FeedArgs struct { + // PageSize number of events to return per page + PageSize *int + // StartTs incompatible with Cursor + StartTs *time.Time + // Cursor incompatible with StartTs + Cursor *string } -// FeedFromQuery opens an event feed from the event source returned by the [fauna.Query]. -func (c *Client) FeedFromQuery(fql *Query, opts ...QueryOptFn) (*EventFeed, error) { - token, err := c.getEventSource(fql, opts...) - if err != nil { - return nil, err - } - - return newEventFeed(c, *token) +// Feed opens an event feed from the event source +func (c *Client) Feed(stream EventSource, feedArgs *FeedArgs) (*EventFeed, error) { + return newEventFeed(c, stream, feedArgs) } -// FeedFromQueryWithStartTime initiates an event from the event source returned by the [fauna.Query] with custom options -func (c *Client) FeedFromQueryWithStartTime(fql *Query, time time.Time, opts ...QueryOptFn) (*EventFeed, error) { - token, err := c.getEventSource(fql, opts...) +// FeedFromQuery opens an event feed from a query +func (c *Client) FeedFromQuery(query *Query, feedArgs *FeedArgs) (*EventFeed, error) { + res, err := c.Query(query) if err != nil { return nil, err } - return newEventFeed(c, *token, EventFeedStartTime(time.UnixMicro())) -} - -// Feed opens an event feed from the event source -func (c *Client) Feed(stream EventSource) (*EventFeed, error) { - return newEventFeed(c, stream) -} - -// FeedWithStartTime opens an event feed from the event source with options -func (c *Client) FeedWithStartTime(stream EventSource, start time.Time) (*EventFeed, error) { - return newEventFeed(c, stream, EventFeedStartTime(start.UnixMicro())) -} + eventSource, ok := res.Data.(EventSource) + if !ok { + return nil, fmt.Errorf("query should return a fauna.EventSource but got %T", res.Data) + } -// FeedWithCursor opens an event feed from the event source with options -func (c *Client) FeedWithCursor(stream EventSource, cursor string) (*EventFeed, error) { - return newEventFeed(c, stream, EventFeedCursor(cursor)) + return newEventFeed(c, eventSource, feedArgs) } diff --git a/config.go b/config.go index 2ef7de8..5f557ce 100644 --- a/config.go +++ b/config.go @@ -167,14 +167,3 @@ func argsStringFromMap(input map[string]string, currentArgs ...string) string { return strings.ReplaceAll(params.Encode(), "&", ",") } - -// FeedStartFn function to set options on the [Client.NewEventFeed] -type FeedStartFn func(req *feedRequest) - -func EventFeedCursor(cursor string) FeedStartFn { - return func(req *feedRequest) { req.Cursor = cursor } -} - -func EventFeedStartTime(ts int64) FeedStartFn { - return func(req *feedRequest) { req.StartTS = ts } -} diff --git a/event_feed.go b/event_feed.go index a588a62..3b57c34 100644 --- a/event_feed.go +++ b/event_feed.go @@ -2,6 +2,7 @@ package fauna import ( "encoding/json" + "fmt" ) // EventFeed represents an event feed subscription. @@ -9,43 +10,53 @@ type EventFeed struct { client *Client source EventSource - opts []FeedStartFn decoder *json.Decoder - lastCursor string + lastCursor *string + pageSize *int + startTs *int64 } -func newEventFeed(client *Client, source EventSource, opts ...FeedStartFn) (*EventFeed, error) { +func newEventFeed(client *Client, source EventSource, args *FeedArgs) (*EventFeed, error) { feed := &EventFeed{ client: client, source: source, - opts: opts, } - if err := feed.open(opts...); err != nil { + if args != nil { + if args.StartTs != nil && args.Cursor != nil { + return nil, fmt.Errorf("StartTs and Cursor cannot be used simultaneously") + } + if args.Cursor != nil { + feed.lastCursor = args.Cursor + } + + if args.StartTs != nil { + unixTime := args.StartTs.UnixMicro() + feed.startTs = &unixTime + } + + feed.pageSize = args.PageSize + } + + if err := feed.open(); err != nil { return nil, err } return feed, nil } -func (ef *EventFeed) open(opts ...FeedStartFn) error { +func (ef *EventFeed) open() error { req := feedRequest{ apiRequest: apiRequest{ ef.client.ctx, ef.client.headers, }, - Source: ef.source, - Cursor: ef.lastCursor, - } - - if (opts != nil) && (len(opts) > 0) { - ef.opts = append(ef.opts, opts...) - } - - for _, optFn := range ef.opts { - optFn(&req) + Source: ef.source, + Cursor: ef.lastCursor, + PageSize: ef.pageSize, + StartTS: ef.startTs, } byteStream, err := req.do(ef.client) @@ -58,26 +69,25 @@ func (ef *EventFeed) open(opts ...FeedStartFn) error { return nil } -// FeedResponse represents the response from the EventFeed.Events -type FeedResponse struct { +// FeedPage represents the response from the EventFeed.Events +type FeedPage struct { Events []Event `json:"events"` Cursor string `json:"cursor"` HasNext bool `json:"has_next"` Stats Stats `json:"stats"` } -// Events return the next FeedResponse from the EventFeed -func (ef *EventFeed) Events() (*FeedResponse, error) { - var response FeedResponse +// Next retrieves the next FeedPage from the EventFeed +func (ef *EventFeed) Next(page *FeedPage) error { if err := ef.open(); err != nil { - return nil, err + return err } - if err := ef.decoder.Decode(&response); err != nil { - return nil, err + if err := ef.decoder.Decode(&page); err != nil { + return err } - ef.lastCursor = response.Cursor + ef.lastCursor = &page.Cursor - return &response, nil + return nil } diff --git a/event_feed_example_test.go b/event_feed_example_test.go index ff6dc4c..f8c1667 100644 --- a/event_feed_example_test.go +++ b/event_feed_example_test.go @@ -7,17 +7,17 @@ import ( "github.com/fauna/fauna-go/v3" ) -func ExampleEventFeed_Events() { +func ExampleEventFeed_Next() { client := fauna.NewClient("secret", fauna.DefaultTimeouts(), fauna.URL(fauna.EndpointLocal)) query, queryErr := fauna.FQL(`Collection.byName("EventFeedTest")?.delete() Collection.create({ name: "EventFeedTest" }) -EventFeedTest.all().toStream()`, nil) +EventFeedTest.all().eventSource()`, nil) if queryErr != nil { log.Fatal(queryErr.Error()) } - feed, feedErr := client.FeedFromQuery(query) + feed, feedErr := client.FeedFromQuery(query, nil) if feedErr != nil { log.Fatal(feedErr.Error()) } @@ -29,16 +29,17 @@ EventFeedTest.all().toStream()`, nil) } for { - res, eventErr := feed.Events() + var page fauna.FeedPage + eventErr := feed.Next(&page) if eventErr != nil { log.Fatal(eventErr.Error()) } - for _, event := range res.Events { + for _, event := range page.Events { fmt.Println(event.Type) } - if !res.HasNext { + if !page.HasNext { break } } diff --git a/event_feed_test.go b/event_feed_test.go index 0ef8e94..9b267ec 100644 --- a/event_feed_test.go +++ b/event_feed_test.go @@ -19,11 +19,11 @@ func TestEventFeed(t *testing.T) { resetCollection(t, client) t.Run("returns errors correctly", func(t *testing.T) { - t.Run("should error when query doesn't return an event source", func(t *testing.T) { + t.Run("should error when the query doesn't return an event source", func(t *testing.T) { query, queryErr := fauna.FQL(`42`, nil) require.NoError(t, queryErr) - _, feedErr := client.FeedFromQuery(query) + _, feedErr := client.FeedFromQuery(query, nil) require.ErrorContains(t, feedErr, "query should return a fauna.EventSource but got int") }) }) @@ -32,7 +32,7 @@ func TestEventFeed(t *testing.T) { query, queryErr := fauna.FQL(`EventFeedTest.all().eventSource()`, nil) require.NoError(t, queryErr, "failed to create a query for EventSource") - feed, feedErr := client.FeedFromQuery(query) + feed, feedErr := client.FeedFromQuery(query, nil) require.NoError(t, feedErr, "failed to init events feed") var ( @@ -43,9 +43,10 @@ func TestEventFeed(t *testing.T) { createOne(t, client, feed) createMultipleDocs(t, client, start, end) - eventsRes, eventsErr := feed.Events() + var page fauna.FeedPage + eventsErr := feed.Next(&page) require.NoError(t, eventsErr, "failed to get events from EventSource") - require.Equal(t, end-start, len(eventsRes.Events), "unexpected number of events") + require.Equal(t, end-start, len(page.Events), "unexpected number of events") }) t.Run("can get events from EventSource", func(t *testing.T) { @@ -57,7 +58,7 @@ func TestEventFeed(t *testing.T) { t.Run("get events from an EventSource", func(t *testing.T) { eventSource := getEventSource(t, client) - feed, feedErr := client.Feed(eventSource) + feed, feedErr := client.Feed(eventSource, nil) require.NoError(t, feedErr, "failed to init events feed") var ( @@ -68,9 +69,10 @@ func TestEventFeed(t *testing.T) { createOne(t, client, feed) createMultipleDocs(t, client, start, end) - eventsRes, eventsErr := feed.Events() + var page fauna.FeedPage + eventsErr := feed.Next(&page) require.NoError(t, eventsErr, "failed to get events from EventSource") - require.Equal(t, end-start, len(eventsRes.Events), "unexpected number of events") + require.Equal(t, end-start, len(page.Events), "unexpected number of events") }) }) @@ -82,22 +84,64 @@ func TestEventFeed(t *testing.T) { eventSource := getEventSource(t, client) require.NotNil(t, eventSource, "failed to get an EventSource") - feed, feedErr := client.Feed(eventSource) + feed, feedErr := client.Feed(eventSource, nil) require.NoError(t, feedErr, "failed to init events feed") - eventsRes, eventsErr := feed.Events() + var page fauna.FeedPage + eventsErr := feed.Next(&page) require.NoError(t, eventsErr, "failed to get events") - require.Equal(t, 0, len(eventsRes.Events), "unexpected number of events") + require.Equal(t, 0, len(page.Events), "unexpected number of events") eventSource = getEventSource(t, client) require.NotNil(t, eventSource, "failed to get an EventSource") - feed, feedErr = client.FeedWithStartTime(eventSource, time.Now().Add(-time.Minute*10)) + tenMinutesAgo := time.Now().Add(-10 * time.Minute) + feed, feedErr = client.Feed(eventSource, &fauna.FeedArgs{ + StartTs: &tenMinutesAgo, + }) require.NoError(t, feedErr, "failed to init events feed") - feedRes, eventsErr := feed.Events() + eventsErr = feed.Next(&page) require.NoError(t, eventsErr, "failed to get events") - require.Equal(t, 1, len(feedRes.Events), "unexpected number of events") + require.Equal(t, 1, len(page.Events), "unexpected number of events") + }) + + t.Run("can use page size", func(t *testing.T) { + resetCollection(t, client) + + eventSource := getEventSource(t, client) + + pageSize := 3 + feed, feedErr := client.Feed(eventSource, &fauna.FeedArgs{ + PageSize: &pageSize, + }) + require.NoError(t, feedErr, "failed to init events feed") + + var ( + start = 5 + end = 20 + page fauna.FeedPage + seenEvents int + ) + + createOne(t, client, feed) + createMultipleDocs(t, client, start, end) + + for { + eventsErr := feed.Next(&page) + require.NoError(t, eventsErr, "failed to get events from EventSource") + + seenEvents += len(page.Events) + + if !page.HasNext { + break + } + + // every page but the last should have the right page size + require.Equal(t, pageSize, len(page.Events), "unexpected number of events") + } + + require.Equal(t, end-start, seenEvents, "unexpected number of events") }) } @@ -144,10 +188,11 @@ func createOne(t *testing.T, client *fauna.Client, feed *fauna.EventFeed) { return } - eventsRes, eventsErr := feed.Events() + var page fauna.FeedPage + eventsErr := feed.Next(&page) require.NoError(t, eventsErr, "failed to get events") - assert.Equal(t, 1, len(eventsRes.Events), "unexpected number of events") + assert.Equal(t, 1, len(page.Events), "unexpected number of events") } func createMultipleDocs(t *testing.T, client *fauna.Client, start int, end int) { diff --git a/request.go b/request.go index 97d7f12..ff4c0b3 100644 --- a/request.go +++ b/request.go @@ -182,9 +182,10 @@ func (streamReq *streamRequest) do(cli *Client) (bytes io.ReadCloser, err error) type feedRequest struct { apiRequest - Source EventSource - StartTS int64 - Cursor string + Source EventSource + Cursor *string `json:"cursor,omitempty"` + StartTS *int64 `json:"start_ts,omitempty"` + PageSize *int `json:"page_size,omitempty"` } func (feedReq *feedRequest) do(cli *Client) (io.ReadCloser, error) { diff --git a/serializer.go b/serializer.go index 49e22b0..64103d4 100644 --- a/serializer.go +++ b/serializer.go @@ -525,10 +525,13 @@ func encode(v any, hint string) (any, error) { case feedRequest: out := map[string]any{"token": string(vt.Source)} - if vt.StartTS > 0 { + if vt.PageSize != nil { + out["page_size"] = vt.PageSize + } + if vt.StartTS != nil { out["start_ts"] = vt.StartTS } - if len(vt.Cursor) > 0 { + if vt.Cursor != nil { out["cursor"] = vt.Cursor } return out, nil diff --git a/serializer_test.go b/serializer_test.go index f14beec..70d9192 100644 --- a/serializer_test.go +++ b/serializer_test.go @@ -655,9 +655,10 @@ func TestMarshalEventSourceStructs(t *testing.T) { Context: context.Background(), Headers: map[string]string{}, }, - Source: "", - StartTS: 0, - Cursor: "", + Source: "", + Cursor: nil, + PageSize: nil, + StartTS: nil, }) }) } From 86c451eeebe6057fa1718c04a90c8d8907c73a5d Mon Sep 17 00:00:00 2001 From: Darren Cunningham Date: Thu, 31 Oct 2024 15:02:38 -0400 Subject: [PATCH 21/24] open should only happen in Next --- event_feed.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/event_feed.go b/event_feed.go index 3b57c34..b0cc5ad 100644 --- a/event_feed.go +++ b/event_feed.go @@ -40,10 +40,6 @@ func newEventFeed(client *Client, source EventSource, args *FeedArgs) (*EventFee feed.pageSize = args.PageSize } - if err := feed.open(); err != nil { - return nil, err - } - return feed, nil } From 9d2f788e521ea6c5214dad14927cbe0c3ccbae09 Mon Sep 17 00:00:00 2001 From: Darren Cunningham Date: Fri, 1 Nov 2024 08:43:25 -0400 Subject: [PATCH 22/24] feed option funcs --- client.go | 18 ++++----------- config.go | 18 +++++++++++++++ event_feed.go | 47 +++++++++++++++----------------------- event_feed_example_test.go | 2 +- event_feed_test.go | 16 +++++-------- request.go | 6 ++--- serializer.go | 6 ++--- serializer_test.go | 6 ++--- 8 files changed, 56 insertions(+), 63 deletions(-) diff --git a/client.go b/client.go index deb44dd..8494472 100644 --- a/client.go +++ b/client.go @@ -428,23 +428,13 @@ func (c *Client) setHeader(key, val string) { c.headers[key] = val } -// FeedArgs optional arguments for [fauna.Client.Feed] -type FeedArgs struct { - // PageSize number of events to return per page - PageSize *int - // StartTs incompatible with Cursor - StartTs *time.Time - // Cursor incompatible with StartTs - Cursor *string -} - // Feed opens an event feed from the event source -func (c *Client) Feed(stream EventSource, feedArgs *FeedArgs) (*EventFeed, error) { - return newEventFeed(c, stream, feedArgs) +func (c *Client) Feed(stream EventSource, opts ...FeedOptFn) (*EventFeed, error) { + return newEventFeed(c, stream, opts...) } // FeedFromQuery opens an event feed from a query -func (c *Client) FeedFromQuery(query *Query, feedArgs *FeedArgs) (*EventFeed, error) { +func (c *Client) FeedFromQuery(query *Query, opts ...FeedOptFn) (*EventFeed, error) { res, err := c.Query(query) if err != nil { return nil, err @@ -455,5 +445,5 @@ func (c *Client) FeedFromQuery(query *Query, feedArgs *FeedArgs) (*EventFeed, er return nil, fmt.Errorf("query should return a fauna.EventSource but got %T", res.Data) } - return newEventFeed(c, eventSource, feedArgs) + return newEventFeed(c, eventSource, opts...) } diff --git a/config.go b/config.go index 5f557ce..2f72b74 100644 --- a/config.go +++ b/config.go @@ -167,3 +167,21 @@ func argsStringFromMap(input map[string]string, currentArgs ...string) string { return strings.ReplaceAll(params.Encode(), "&", ",") } + +// FeedOptFn function to set options on the [fauna.EventFeed] +type FeedOptFn func(req *feedRequest) + +// EventFeedCursor set the cursor for the [fauna.EventFeed] +func EventFeedCursor(cursor string) FeedOptFn { + return func(req *feedRequest) { req.Cursor = cursor } +} + +// EventFeedStartTime set the start time for the [fauna.EventFeed] +func EventFeedStartTime(ts int64) FeedOptFn { + return func(req *feedRequest) { req.StartTS = ts } +} + +// EventFeedPageSize set the page size for the [fauna.EventFeed] +func EventFeedPageSize(ts int) FeedOptFn { + return func(req *feedRequest) { req.PageSize = ts } +} diff --git a/event_feed.go b/event_feed.go index b0cc5ad..fbf21ea 100644 --- a/event_feed.go +++ b/event_feed.go @@ -2,7 +2,6 @@ package fauna import ( "encoding/json" - "fmt" ) // EventFeed represents an event feed subscription. @@ -13,46 +12,36 @@ type EventFeed struct { decoder *json.Decoder - lastCursor *string - pageSize *int - startTs *int64 + opts []FeedOptFn + lastCursor string } -func newEventFeed(client *Client, source EventSource, args *FeedArgs) (*EventFeed, error) { +func newEventFeed(client *Client, source EventSource, opts ...FeedOptFn) (*EventFeed, error) { feed := &EventFeed{ client: client, source: source, - } - - if args != nil { - if args.StartTs != nil && args.Cursor != nil { - return nil, fmt.Errorf("StartTs and Cursor cannot be used simultaneously") - } - if args.Cursor != nil { - feed.lastCursor = args.Cursor - } - - if args.StartTs != nil { - unixTime := args.StartTs.UnixMicro() - feed.startTs = &unixTime - } - - feed.pageSize = args.PageSize + opts: opts, } return feed, nil } -func (ef *EventFeed) open() error { +func (ef *EventFeed) open(opts ...FeedOptFn) error { req := feedRequest{ apiRequest: apiRequest{ ef.client.ctx, ef.client.headers, }, - Source: ef.source, - Cursor: ef.lastCursor, - PageSize: ef.pageSize, - StartTS: ef.startTs, + Source: ef.source, + Cursor: ef.lastCursor, + } + + if (opts != nil) && (len(opts) > 0) { + ef.opts = append(ef.opts, opts...) + } + + for _, optFn := range ef.opts { + optFn(&req) } byteStream, err := req.do(ef.client) @@ -65,7 +54,7 @@ func (ef *EventFeed) open() error { return nil } -// FeedPage represents the response from the EventFeed.Events +// FeedPage represents the response from [fauna.EventFeed.Next] type FeedPage struct { Events []Event `json:"events"` Cursor string `json:"cursor"` @@ -73,7 +62,7 @@ type FeedPage struct { Stats Stats `json:"stats"` } -// Next retrieves the next FeedPage from the EventFeed +// Next retrieves the next FeedPage from the [fauna.EventFeed] func (ef *EventFeed) Next(page *FeedPage) error { if err := ef.open(); err != nil { return err @@ -83,7 +72,7 @@ func (ef *EventFeed) Next(page *FeedPage) error { return err } - ef.lastCursor = &page.Cursor + ef.lastCursor = page.Cursor return nil } diff --git a/event_feed_example_test.go b/event_feed_example_test.go index f8c1667..bcc7015 100644 --- a/event_feed_example_test.go +++ b/event_feed_example_test.go @@ -17,7 +17,7 @@ EventFeedTest.all().eventSource()`, nil) log.Fatal(queryErr.Error()) } - feed, feedErr := client.FeedFromQuery(query, nil) + feed, feedErr := client.FeedFromQuery(query) if feedErr != nil { log.Fatal(feedErr.Error()) } diff --git a/event_feed_test.go b/event_feed_test.go index 9b267ec..4036a5a 100644 --- a/event_feed_test.go +++ b/event_feed_test.go @@ -23,7 +23,7 @@ func TestEventFeed(t *testing.T) { query, queryErr := fauna.FQL(`42`, nil) require.NoError(t, queryErr) - _, feedErr := client.FeedFromQuery(query, nil) + _, feedErr := client.FeedFromQuery(query) require.ErrorContains(t, feedErr, "query should return a fauna.EventSource but got int") }) }) @@ -32,7 +32,7 @@ func TestEventFeed(t *testing.T) { query, queryErr := fauna.FQL(`EventFeedTest.all().eventSource()`, nil) require.NoError(t, queryErr, "failed to create a query for EventSource") - feed, feedErr := client.FeedFromQuery(query, nil) + feed, feedErr := client.FeedFromQuery(query) require.NoError(t, feedErr, "failed to init events feed") var ( @@ -58,7 +58,7 @@ func TestEventFeed(t *testing.T) { t.Run("get events from an EventSource", func(t *testing.T) { eventSource := getEventSource(t, client) - feed, feedErr := client.Feed(eventSource, nil) + feed, feedErr := client.Feed(eventSource) require.NoError(t, feedErr, "failed to init events feed") var ( @@ -84,7 +84,7 @@ func TestEventFeed(t *testing.T) { eventSource := getEventSource(t, client) require.NotNil(t, eventSource, "failed to get an EventSource") - feed, feedErr := client.Feed(eventSource, nil) + feed, feedErr := client.Feed(eventSource) require.NoError(t, feedErr, "failed to init events feed") var page fauna.FeedPage @@ -96,9 +96,7 @@ func TestEventFeed(t *testing.T) { require.NotNil(t, eventSource, "failed to get an EventSource") tenMinutesAgo := time.Now().Add(-10 * time.Minute) - feed, feedErr = client.Feed(eventSource, &fauna.FeedArgs{ - StartTs: &tenMinutesAgo, - }) + feed, feedErr = client.Feed(eventSource, fauna.EventFeedStartTime(tenMinutesAgo.UnixMicro())) require.NoError(t, feedErr, "failed to init events feed") eventsErr = feed.Next(&page) @@ -112,9 +110,7 @@ func TestEventFeed(t *testing.T) { eventSource := getEventSource(t, client) pageSize := 3 - feed, feedErr := client.Feed(eventSource, &fauna.FeedArgs{ - PageSize: &pageSize, - }) + feed, feedErr := client.Feed(eventSource, fauna.EventFeedPageSize(pageSize)) require.NoError(t, feedErr, "failed to init events feed") var ( diff --git a/request.go b/request.go index ff4c0b3..4808e8f 100644 --- a/request.go +++ b/request.go @@ -183,9 +183,9 @@ func (streamReq *streamRequest) do(cli *Client) (bytes io.ReadCloser, err error) type feedRequest struct { apiRequest Source EventSource - Cursor *string `json:"cursor,omitempty"` - StartTS *int64 `json:"start_ts,omitempty"` - PageSize *int `json:"page_size,omitempty"` + Cursor string + StartTS int64 + PageSize int } func (feedReq *feedRequest) do(cli *Client) (io.ReadCloser, error) { diff --git a/serializer.go b/serializer.go index 64103d4..e32ee57 100644 --- a/serializer.go +++ b/serializer.go @@ -525,13 +525,13 @@ func encode(v any, hint string) (any, error) { case feedRequest: out := map[string]any{"token": string(vt.Source)} - if vt.PageSize != nil { + if vt.PageSize > 0 { out["page_size"] = vt.PageSize } - if vt.StartTS != nil { + if vt.StartTS > 0 { out["start_ts"] = vt.StartTS } - if vt.Cursor != nil { + if len(vt.Cursor) > 0 { out["cursor"] = vt.Cursor } return out, nil diff --git a/serializer_test.go b/serializer_test.go index 70d9192..30ec022 100644 --- a/serializer_test.go +++ b/serializer_test.go @@ -656,9 +656,9 @@ func TestMarshalEventSourceStructs(t *testing.T) { Headers: map[string]string{}, }, Source: "", - Cursor: nil, - PageSize: nil, - StartTS: nil, + Cursor: "", + PageSize: 0, + StartTS: 0, }) }) } From dec72aab45421dd546dee6ae3decdf39fa2c9f07 Mon Sep 17 00:00:00 2001 From: Darren Cunningham Date: Fri, 1 Nov 2024 10:14:36 -0400 Subject: [PATCH 23/24] feed option fn validation --- client.go | 4 ++-- config.go | 2 ++ event_feed.go | 26 ++++++++++++++++++++++++-- event_feed_test.go | 23 +++++++++++++++++++++++ 4 files changed, 51 insertions(+), 4 deletions(-) diff --git a/client.go b/client.go index 8494472..016b73d 100644 --- a/client.go +++ b/client.go @@ -430,7 +430,7 @@ func (c *Client) setHeader(key, val string) { // Feed opens an event feed from the event source func (c *Client) Feed(stream EventSource, opts ...FeedOptFn) (*EventFeed, error) { - return newEventFeed(c, stream, opts...) + return newEventFeed(c, stream, false, opts...) } // FeedFromQuery opens an event feed from a query @@ -445,5 +445,5 @@ func (c *Client) FeedFromQuery(query *Query, opts ...FeedOptFn) (*EventFeed, err return nil, fmt.Errorf("query should return a fauna.EventSource but got %T", res.Data) } - return newEventFeed(c, eventSource, opts...) + return newEventFeed(c, eventSource, true, opts...) } diff --git a/config.go b/config.go index 2f72b74..faa617d 100644 --- a/config.go +++ b/config.go @@ -172,11 +172,13 @@ func argsStringFromMap(input map[string]string, currentArgs ...string) string { type FeedOptFn func(req *feedRequest) // EventFeedCursor set the cursor for the [fauna.EventFeed] +// cannot be used with [EventFeedStartTime] or in [fauna.Client.FeedFromQuery] func EventFeedCursor(cursor string) FeedOptFn { return func(req *feedRequest) { req.Cursor = cursor } } // EventFeedStartTime set the start time for the [fauna.EventFeed] +// cannot be used with [EventFeedCursor] func EventFeedStartTime(ts int64) FeedOptFn { return func(req *feedRequest) { req.StartTS = ts } } diff --git a/event_feed.go b/event_feed.go index fbf21ea..93886ef 100644 --- a/event_feed.go +++ b/event_feed.go @@ -2,6 +2,7 @@ package fauna import ( "encoding/json" + "fmt" ) // EventFeed represents an event feed subscription. @@ -16,17 +17,29 @@ type EventFeed struct { lastCursor string } -func newEventFeed(client *Client, source EventSource, opts ...FeedOptFn) (*EventFeed, error) { +func newEventFeed(client *Client, source EventSource, fromQuery bool, opts ...FeedOptFn) (*EventFeed, error) { feed := &EventFeed{ client: client, source: source, opts: opts, } + // init a feed request to validate feed options + req, err := feed.newFeedRequest() + if err != nil { + return nil, err + } + if fromQuery && len(req.Cursor) > 0 { + return nil, fmt.Errorf("cannot use EventFeedCursor with FeedFromQuery") + } + if req.StartTS > 0 && len(req.Cursor) > 0 { + return nil, fmt.Errorf("cannot set both EventFeedStartTime and EventFeedCursor") + } + return feed, nil } -func (ef *EventFeed) open(opts ...FeedOptFn) error { +func (ef *EventFeed) newFeedRequest(opts ...FeedOptFn) (*feedRequest, error) { req := feedRequest{ apiRequest: apiRequest{ ef.client.ctx, @@ -44,6 +57,15 @@ func (ef *EventFeed) open(opts ...FeedOptFn) error { optFn(&req) } + return &req, nil +} + +func (ef *EventFeed) open(opts ...FeedOptFn) error { + req, err := ef.newFeedRequest(opts...) + if err != nil { + return err + } + byteStream, err := req.do(ef.client) if err != nil { return err diff --git a/event_feed_test.go b/event_feed_test.go index 4036a5a..1aadd25 100644 --- a/event_feed_test.go +++ b/event_feed_test.go @@ -26,6 +26,29 @@ func TestEventFeed(t *testing.T) { _, feedErr := client.FeedFromQuery(query) require.ErrorContains(t, feedErr, "query should return a fauna.EventSource but got int") }) + + t.Run("should error when attempting to use a cursor with a query", func(t *testing.T) { + query, queryErr := fauna.FQL(`EventFeedTest.all().eventSource()`, nil) + require.NoError(t, queryErr, "failed to create a query for EventSource") + + _, feedErr := client.FeedFromQuery(query, fauna.EventFeedCursor("cursor")) + require.ErrorContains(t, feedErr, "cannot use EventFeedCursor with FeedFromQuery") + }) + + t.Run("should error when attempting to use a start time and a cursor", func(t *testing.T) { + query, queryErr := fauna.FQL(`EventFeedTest.all().eventSource()`, nil) + require.NoError(t, queryErr, "failed to create a query for EventSource") + + req, reqErr := client.Query(query) + require.NoError(t, reqErr, "failed to execute query") + + var response fauna.EventSource + unmarshalErr := req.Unmarshal(&response) + require.NoError(t, unmarshalErr, "failed to unmarshal EventSource") + + _, feedErr := client.Feed(response, fauna.EventFeedStartTime(time.Now().UnixMicro()), fauna.EventFeedCursor("cursor")) + require.ErrorContains(t, feedErr, "cannot set both EventFeedStartTime and EventFeedCursor") + }) }) t.Run("can use event feeds from a query", func(t *testing.T) { From 153807c890f3657e5dc3126b8fd144696b6cf233 Mon Sep 17 00:00:00 2001 From: Darren Cunningham Date: Fri, 1 Nov 2024 14:01:15 -0400 Subject: [PATCH 24/24] feedOpts --- client.go | 30 ++++++++++++++++++++++++++++-- config.go | 8 ++++---- event_feed.go | 42 ++++++++++++++++++------------------------ event_feed_test.go | 2 +- 4 files changed, 51 insertions(+), 31 deletions(-) diff --git a/client.go b/client.go index 016b73d..db8dc95 100644 --- a/client.go +++ b/client.go @@ -430,11 +430,24 @@ func (c *Client) setHeader(key, val string) { // Feed opens an event feed from the event source func (c *Client) Feed(stream EventSource, opts ...FeedOptFn) (*EventFeed, error) { - return newEventFeed(c, stream, false, opts...) + feedOpts, err := parseFeedOptions(opts...) + if err != nil { + return nil, err + } + + return newEventFeed(c, stream, feedOpts) } // FeedFromQuery opens an event feed from a query func (c *Client) FeedFromQuery(query *Query, opts ...FeedOptFn) (*EventFeed, error) { + feedOpts, err := parseFeedOptions(opts...) + if err != nil { + return nil, err + } + if feedOpts.Cursor != nil { + return nil, fmt.Errorf("cannot use EventFeedCursor with FeedFromQuery") + } + res, err := c.Query(query) if err != nil { return nil, err @@ -445,5 +458,18 @@ func (c *Client) FeedFromQuery(query *Query, opts ...FeedOptFn) (*EventFeed, err return nil, fmt.Errorf("query should return a fauna.EventSource but got %T", res.Data) } - return newEventFeed(c, eventSource, true, opts...) + return newEventFeed(c, eventSource, feedOpts) +} + +func parseFeedOptions(opts ...FeedOptFn) (*feedOptions, error) { + feedOpts := feedOptions{} + for _, optFn := range opts { + optFn(&feedOpts) + } + + if feedOpts.StartTS != nil && feedOpts.Cursor != nil { + return nil, fmt.Errorf("cannot use EventFeedStartTime and EventFeedCursor together") + } + + return &feedOpts, nil } diff --git a/config.go b/config.go index faa617d..2764596 100644 --- a/config.go +++ b/config.go @@ -169,21 +169,21 @@ func argsStringFromMap(input map[string]string, currentArgs ...string) string { } // FeedOptFn function to set options on the [fauna.EventFeed] -type FeedOptFn func(req *feedRequest) +type FeedOptFn func(req *feedOptions) // EventFeedCursor set the cursor for the [fauna.EventFeed] // cannot be used with [EventFeedStartTime] or in [fauna.Client.FeedFromQuery] func EventFeedCursor(cursor string) FeedOptFn { - return func(req *feedRequest) { req.Cursor = cursor } + return func(req *feedOptions) { req.Cursor = &cursor } } // EventFeedStartTime set the start time for the [fauna.EventFeed] // cannot be used with [EventFeedCursor] func EventFeedStartTime(ts int64) FeedOptFn { - return func(req *feedRequest) { req.StartTS = ts } + return func(req *feedOptions) { req.StartTS = &ts } } // EventFeedPageSize set the page size for the [fauna.EventFeed] func EventFeedPageSize(ts int) FeedOptFn { - return func(req *feedRequest) { req.PageSize = ts } + return func(req *feedOptions) { req.PageSize = &ts } } diff --git a/event_feed.go b/event_feed.go index 93886ef..0f17139 100644 --- a/event_feed.go +++ b/event_feed.go @@ -2,7 +2,6 @@ package fauna import ( "encoding/json" - "fmt" ) // EventFeed represents an event feed subscription. @@ -13,33 +12,27 @@ type EventFeed struct { decoder *json.Decoder - opts []FeedOptFn + opts *feedOptions lastCursor string } -func newEventFeed(client *Client, source EventSource, fromQuery bool, opts ...FeedOptFn) (*EventFeed, error) { +type feedOptions struct { + PageSize *int + Cursor *string + StartTS *int64 +} + +func newEventFeed(client *Client, source EventSource, opts *feedOptions) (*EventFeed, error) { feed := &EventFeed{ client: client, source: source, opts: opts, } - // init a feed request to validate feed options - req, err := feed.newFeedRequest() - if err != nil { - return nil, err - } - if fromQuery && len(req.Cursor) > 0 { - return nil, fmt.Errorf("cannot use EventFeedCursor with FeedFromQuery") - } - if req.StartTS > 0 && len(req.Cursor) > 0 { - return nil, fmt.Errorf("cannot set both EventFeedStartTime and EventFeedCursor") - } - return feed, nil } -func (ef *EventFeed) newFeedRequest(opts ...FeedOptFn) (*feedRequest, error) { +func (ef *EventFeed) newFeedRequest() (*feedRequest, error) { req := feedRequest{ apiRequest: apiRequest{ ef.client.ctx, @@ -48,20 +41,21 @@ func (ef *EventFeed) newFeedRequest(opts ...FeedOptFn) (*feedRequest, error) { Source: ef.source, Cursor: ef.lastCursor, } - - if (opts != nil) && (len(opts) > 0) { - ef.opts = append(ef.opts, opts...) + if ef.opts.StartTS != nil { + req.StartTS = *ef.opts.StartTS } - - for _, optFn := range ef.opts { - optFn(&req) + if ef.opts.Cursor != nil { + req.Cursor = *ef.opts.Cursor + } + if ef.opts.PageSize != nil { + req.PageSize = *ef.opts.PageSize } return &req, nil } -func (ef *EventFeed) open(opts ...FeedOptFn) error { - req, err := ef.newFeedRequest(opts...) +func (ef *EventFeed) open() error { + req, err := ef.newFeedRequest() if err != nil { return err } diff --git a/event_feed_test.go b/event_feed_test.go index 1aadd25..ddec9d6 100644 --- a/event_feed_test.go +++ b/event_feed_test.go @@ -47,7 +47,7 @@ func TestEventFeed(t *testing.T) { require.NoError(t, unmarshalErr, "failed to unmarshal EventSource") _, feedErr := client.Feed(response, fauna.EventFeedStartTime(time.Now().UnixMicro()), fauna.EventFeedCursor("cursor")) - require.ErrorContains(t, feedErr, "cannot set both EventFeedStartTime and EventFeedCursor") + require.ErrorContains(t, feedErr, "cannot use EventFeedStartTime and EventFeedCursor together") }) })