From 274c87788b5abcfb021dc726c51e7a83253f1bed Mon Sep 17 00:00:00 2001 From: Darren Cunningham Date: Tue, 29 Oct 2024 11:17:36 -0400 Subject: [PATCH] PR feedback --- client.go | 18 ++---------- config.go | 6 +--- event_feed.go | 14 +++++----- event_feed_test.go | 70 +++++++++++++++++++++++++++------------------- request.go | 2 +- serializer.go | 2 +- serializer_test.go | 2 +- 7 files changed, 56 insertions(+), 58 deletions(-) diff --git a/client.go b/client.go index 1190fb7..c974f34 100644 --- a/client.go +++ b/client.go @@ -197,10 +197,6 @@ func (c *Client) parseQueryURL() (*url.URL, error) { } } - if c.queryURL == nil { - return nil, fmt.Errorf("query url is not set") - } - return c.queryURL, nil } @@ -213,10 +209,6 @@ func (c *Client) parseStreamURL() (*url.URL, error) { } } - if c.streamURL == nil { - return nil, fmt.Errorf("stream url is not set") - } - return c.streamURL, nil } @@ -229,10 +221,6 @@ func (c *Client) parseFeedURL() (*url.URL, error) { } } - if c.feedURL == nil { - return nil, fmt.Errorf("feed url is not set") - } - return c.feedURL, nil } @@ -455,8 +443,8 @@ func (c *Client) FeedFromQuery(fql *Query, opts ...QueryOptFn) (*EventFeed, erro return newEventFeed(c, token) } -// FeedFromQueryWithOptions initiates an event from the event source returned by the [fauna.Query] with custom options -func (c *Client) FeedFromQueryWithStart(fql *Query, feedStart FeedStartFn, opts ...QueryOptFn) (*EventFeed, error) { +// FeedFromQueryWithStartTime initiates an event from the event source returned by the [fauna.Query] from the given start time +func (c *Client) FeedFromQueryWithStartTime(fql *Query, startTime time.Time, opts ...QueryOptFn) (*EventFeed, error) { res, err := c.Query(fql, opts...) if err != nil { return nil, err @@ -467,7 +455,7 @@ func (c *Client) FeedFromQueryWithStart(fql *Query, feedStart FeedStartFn, opts return nil, fmt.Errorf("query should return a fauna.EventSource but got %T", res.Data) } - return newEventFeed(c, token, feedOpts...) + return newEventFeed(c, token, FeedStartFn(startTime.UnixMicro())) } // Feed opens an event feed from the event source diff --git a/config.go b/config.go index 93f97cc..74fca53 100644 --- a/config.go +++ b/config.go @@ -171,10 +171,6 @@ func argsStringFromMap(input map[string]string, currentArgs ...string) string { // FeedOptFn function to set options on the [Client.NewEventFeed] type FeedOptFn func(req *feedRequest) -func EventFeedCursor(cursor string) FeedOptFn { - return func(req *feedRequest) { req.Cursor = cursor } -} - -func EventFeedStartTime(ts int64) FeedOptFn { +func FeedStartFn(ts int64) FeedOptFn { return func(req *feedRequest) { req.StartTS = ts } } diff --git a/event_feed.go b/event_feed.go index 49df0c3..bb8169c 100644 --- a/event_feed.go +++ b/event_feed.go @@ -8,7 +8,7 @@ import ( type EventFeed struct { client *Client - stream EventSource + source EventSource opts []FeedOptFn decoder *json.Decoder @@ -16,27 +16,27 @@ type EventFeed struct { lastCursor string } -func newEventFeed(client *Client, token EventSource, opts ...FeedOptFn) (*EventFeed, error) { +func newEventFeed(client *Client, source EventSource, opts ...FeedOptFn) (*EventFeed, error) { feed := &EventFeed{ client: client, - stream: token, + source: source, opts: opts, } - if err := feed.reconnect(opts...); err != nil { + if err := feed.open(opts...); err != nil { return nil, err } return feed, nil } -func (ef *EventFeed) reconnect(opts ...FeedOptFn) error { +func (ef *EventFeed) open(opts ...FeedOptFn) error { req := feedRequest{ apiRequest: apiRequest{ ef.client.ctx, ef.client.headers, }, - Stream: ef.stream, + Source: ef.source, Cursor: ef.lastCursor, } @@ -69,7 +69,7 @@ type FeedResponse struct { // Events return the next FeedResponse from the EventFeed func (ef *EventFeed) Events() (*FeedResponse, error) { var response FeedResponse - if err := ef.reconnect(); err != nil { + if err := ef.open(); err != nil { return nil, err } diff --git a/event_feed_test.go b/event_feed_test.go index 1dd3408..ed80f71 100644 --- a/event_feed_test.go +++ b/event_feed_test.go @@ -19,7 +19,7 @@ func TestEventFeed(t *testing.T) { resetCollection(t, client) t.Run("returns errors correctly", func(t *testing.T) { - t.Run("should error when query doesn't return a stream token", func(t *testing.T) { + t.Run("should error when query doesn't return an event source", func(t *testing.T) { query, queryErr := fauna.FQL(`42`, nil) require.NoError(t, queryErr) @@ -29,30 +29,48 @@ func TestEventFeed(t *testing.T) { }) t.Run("can use event feeds from a query", func(t *testing.T) { - query, queryErr := fauna.FQL(`EventFeedTest.all().toStream()`, nil) - require.NoError(t, queryErr, "failed to create a query for stream token") + query, queryErr := fauna.FQL(`EventFeedTest.all().eventSource()`, nil) + require.NoError(t, queryErr, "failed to create a query for EventSource") feed, feedErr := client.FeedFromQuery(query) require.NoError(t, feedErr, "failed to init events feed") + var ( + start = 5 + end = 20 + ) + createOne(t, client, feed) - createMultipleDocs(t, client, feed, 5, 20) + createMultipleDocs(t, client, start, end) + + eventsRes, eventsErr := feed.Events() + require.NoError(t, eventsErr, "failed to get events from EventSource") + require.Equal(t, end-start, len(eventsRes.Events), "unexpected number of events") }) t.Run("can get events from EventSource", func(t *testing.T) { t.Run("can get an EventSource", func(t *testing.T) { - streamToken := getEventSource(t, client) - require.NotNil(t, streamToken, "failed to get stream token") + eventSource := getEventSource(t, client) + require.NotNil(t, eventSource, "failed to get an EventSource") }) t.Run("get events from an EventSource", func(t *testing.T) { - streamToken := getEventSource(t, client) + eventSource := getEventSource(t, client) - feed, feedErr := client.Feed(streamToken) + feed, feedErr := client.Feed(eventSource) require.NoError(t, feedErr, "failed to init events feed") + var ( + start = 5 + end = 20 + ) + createOne(t, client, feed) - createMultipleDocs(t, client, feed, 5, 20) + createMultipleDocs(t, client, start, end) + + eventsRes, eventsErr := feed.Events() + require.NoError(t, eventsErr, "failed to get events from EventSource") + require.Equal(t, end-start, len(eventsRes.Events), "unexpected number of events") }) }) @@ -61,20 +79,20 @@ func TestEventFeed(t *testing.T) { createOne(t, client, nil) - streamToken := getEventSource(t, client) - require.NotNil(t, streamToken, "failed to get stream token") + eventSource := getEventSource(t, client) + require.NotNil(t, eventSource, "failed to get an EventSource") - feed, feedErr := client.Feed(streamToken) + feed, feedErr := client.Feed(eventSource) require.NoError(t, feedErr, "failed to init events feed") eventsRes, eventsErr := feed.Events() require.NoError(t, eventsErr, "failed to get events") require.Equal(t, 0, len(eventsRes.Events), "unexpected number of events") - streamToken = getEventSource(t, client) - require.NotNil(t, streamToken, "failed to get stream token") + eventSource = getEventSource(t, client) + require.NotNil(t, eventSource, "failed to get an EventSource") - feed, feedErr = client.Feed(streamToken, fauna.EventFeedStartTime(time.Now().Add(-time.Minute*10).UnixMicro())) + feed, feedErr = client.Feed(eventSource, fauna.FeedStartFn(time.Now().Add(-time.Minute*10).UnixMicro())) require.NoError(t, feedErr, "failed to init events feed") feedRes, eventsErr := feed.Events() @@ -97,17 +115,17 @@ Collection.create({ name: "EventFeedTest" })`, nil) func getEventSource(t *testing.T, client *fauna.Client) fauna.EventSource { t.Helper() - query, queryErr := fauna.FQL(`EventFeedTest.all().toStream()`, nil) - require.NoError(t, queryErr, "failed to create a query for stream token") + query, queryErr := fauna.FQL(`EventFeedTest.all().eventSource()`, nil) + require.NoError(t, queryErr, "failed to create a query for EventSource") - streamRes, streamResErr := client.Query(query) - require.NoError(t, streamResErr, "failed to init events feed") + feedRes, feedResErr := client.Query(query) + require.NoError(t, feedResErr, "failed to init events feed") var eventSource fauna.EventSource - unmarshalErr := streamRes.Unmarshal(&eventSource) - require.NoError(t, unmarshalErr, "failed to unmarshal stream token") - require.NotNil(t, eventSource, "stream token is nil") - require.NotEmpty(t, eventSource, "stream token is empty") + unmarshalErr := feedRes.Unmarshal(&eventSource) + require.NoError(t, unmarshalErr, "failed to unmarshal EventSource") + require.NotNil(t, eventSource, "event source is nil") + require.NotEmpty(t, eventSource, "event source is empty") return eventSource } @@ -132,7 +150,7 @@ func createOne(t *testing.T, client *fauna.Client, feed *fauna.EventFeed) { assert.Equal(t, 1, len(eventsRes.Events), "unexpected number of events") } -func createMultipleDocs(t *testing.T, client *fauna.Client, feed *fauna.EventFeed, start int, end int) { +func createMultipleDocs(t *testing.T, client *fauna.Client, start int, end int) { t.Helper() query, queryErr := fauna.FQL(`Set.sequence(${start}, ${end}).forEach(n => EventFeedTest.create({ n: n }))`, map[string]any{ @@ -143,8 +161,4 @@ func createMultipleDocs(t *testing.T, client *fauna.Client, feed *fauna.EventFee _, err := client.Query(query) require.NoError(t, err) - - eventsRes, eventsErr := feed.Events() - require.NoError(t, eventsErr, "failed to get events from EventSource") - require.Equal(t, end-start, len(eventsRes.Events), "unexpected number of events") } diff --git a/request.go b/request.go index f212a14..97d7f12 100644 --- a/request.go +++ b/request.go @@ -182,7 +182,7 @@ func (streamReq *streamRequest) do(cli *Client) (bytes io.ReadCloser, err error) type feedRequest struct { apiRequest - Stream EventSource + Source EventSource StartTS int64 Cursor string } diff --git a/serializer.go b/serializer.go index 38ffaa9..49e22b0 100644 --- a/serializer.go +++ b/serializer.go @@ -524,7 +524,7 @@ func encode(v any, hint string) (any, error) { return out, nil case feedRequest: - out := map[string]any{"token": string(vt.Stream)} + out := map[string]any{"token": string(vt.Source)} if vt.StartTS > 0 { out["start_ts"] = vt.StartTS } diff --git a/serializer_test.go b/serializer_test.go index 56f0987..f14beec 100644 --- a/serializer_test.go +++ b/serializer_test.go @@ -655,7 +655,7 @@ func TestMarshalEventSourceStructs(t *testing.T) { Context: context.Background(), Headers: map[string]string{}, }, - Stream: "", + Source: "", StartTS: 0, Cursor: "", })