Skip to content

Commit

Permalink
PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
cynicaljoy committed Oct 29, 2024
1 parent d4af0ad commit 274c877
Show file tree
Hide file tree
Showing 7 changed files with 56 additions and 58 deletions.
18 changes: 3 additions & 15 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,6 @@ func (c *Client) parseQueryURL() (*url.URL, error) {
}
}

if c.queryURL == nil {
return nil, fmt.Errorf("query url is not set")
}

return c.queryURL, nil
}

Expand All @@ -213,10 +209,6 @@ func (c *Client) parseStreamURL() (*url.URL, error) {
}
}

if c.streamURL == nil {
return nil, fmt.Errorf("stream url is not set")
}

return c.streamURL, nil
}

Expand All @@ -229,10 +221,6 @@ func (c *Client) parseFeedURL() (*url.URL, error) {
}
}

if c.feedURL == nil {
return nil, fmt.Errorf("feed url is not set")
}

return c.feedURL, nil
}

Expand Down Expand Up @@ -455,8 +443,8 @@ func (c *Client) FeedFromQuery(fql *Query, opts ...QueryOptFn) (*EventFeed, erro
return newEventFeed(c, token)
}

// FeedFromQueryWithOptions initiates an event from the event source returned by the [fauna.Query] with custom options
func (c *Client) FeedFromQueryWithStart(fql *Query, feedStart FeedStartFn, opts ...QueryOptFn) (*EventFeed, error) {
// FeedFromQueryWithStartTime initiates an event from the event source returned by the [fauna.Query] from the given start time
func (c *Client) FeedFromQueryWithStartTime(fql *Query, startTime time.Time, opts ...QueryOptFn) (*EventFeed, error) {
res, err := c.Query(fql, opts...)
if err != nil {
return nil, err
Expand All @@ -467,7 +455,7 @@ func (c *Client) FeedFromQueryWithStart(fql *Query, feedStart FeedStartFn, opts
return nil, fmt.Errorf("query should return a fauna.EventSource but got %T", res.Data)
}

return newEventFeed(c, token, feedOpts...)
return newEventFeed(c, token, FeedStartFn(startTime.UnixMicro()))
}

// Feed opens an event feed from the event source
Expand Down
6 changes: 1 addition & 5 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,6 @@ func argsStringFromMap(input map[string]string, currentArgs ...string) string {
// FeedOptFn function to set options on the [Client.NewEventFeed]
type FeedOptFn func(req *feedRequest)

func EventFeedCursor(cursor string) FeedOptFn {
return func(req *feedRequest) { req.Cursor = cursor }
}

func EventFeedStartTime(ts int64) FeedOptFn {
func FeedStartFn(ts int64) FeedOptFn {
return func(req *feedRequest) { req.StartTS = ts }
}
14 changes: 7 additions & 7 deletions event_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,35 +8,35 @@ import (
type EventFeed struct {
client *Client

stream EventSource
source EventSource
opts []FeedOptFn

decoder *json.Decoder

lastCursor string
}

func newEventFeed(client *Client, token EventSource, opts ...FeedOptFn) (*EventFeed, error) {
func newEventFeed(client *Client, source EventSource, opts ...FeedOptFn) (*EventFeed, error) {
feed := &EventFeed{
client: client,
stream: token,
source: source,
opts: opts,
}

if err := feed.reconnect(opts...); err != nil {
if err := feed.open(opts...); err != nil {
return nil, err
}

return feed, nil
}

func (ef *EventFeed) reconnect(opts ...FeedOptFn) error {
func (ef *EventFeed) open(opts ...FeedOptFn) error {
req := feedRequest{
apiRequest: apiRequest{
ef.client.ctx,
ef.client.headers,
},
Stream: ef.stream,
Source: ef.source,
Cursor: ef.lastCursor,
}

Expand Down Expand Up @@ -69,7 +69,7 @@ type FeedResponse struct {
// Events return the next FeedResponse from the EventFeed
func (ef *EventFeed) Events() (*FeedResponse, error) {
var response FeedResponse
if err := ef.reconnect(); err != nil {
if err := ef.open(); err != nil {
return nil, err
}

Expand Down
70 changes: 42 additions & 28 deletions event_feed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestEventFeed(t *testing.T) {
resetCollection(t, client)

t.Run("returns errors correctly", func(t *testing.T) {
t.Run("should error when query doesn't return a stream token", func(t *testing.T) {
t.Run("should error when query doesn't return an event source", func(t *testing.T) {
query, queryErr := fauna.FQL(`42`, nil)
require.NoError(t, queryErr)

Expand All @@ -29,30 +29,48 @@ func TestEventFeed(t *testing.T) {
})

t.Run("can use event feeds from a query", func(t *testing.T) {
query, queryErr := fauna.FQL(`EventFeedTest.all().toStream()`, nil)
require.NoError(t, queryErr, "failed to create a query for stream token")
query, queryErr := fauna.FQL(`EventFeedTest.all().eventSource()`, nil)
require.NoError(t, queryErr, "failed to create a query for EventSource")

feed, feedErr := client.FeedFromQuery(query)
require.NoError(t, feedErr, "failed to init events feed")

var (
start = 5
end = 20
)

createOne(t, client, feed)
createMultipleDocs(t, client, feed, 5, 20)
createMultipleDocs(t, client, start, end)

eventsRes, eventsErr := feed.Events()
require.NoError(t, eventsErr, "failed to get events from EventSource")
require.Equal(t, end-start, len(eventsRes.Events), "unexpected number of events")
})

t.Run("can get events from EventSource", func(t *testing.T) {
t.Run("can get an EventSource", func(t *testing.T) {
streamToken := getEventSource(t, client)
require.NotNil(t, streamToken, "failed to get stream token")
eventSource := getEventSource(t, client)
require.NotNil(t, eventSource, "failed to get an EventSource")
})

t.Run("get events from an EventSource", func(t *testing.T) {
streamToken := getEventSource(t, client)
eventSource := getEventSource(t, client)

feed, feedErr := client.Feed(streamToken)
feed, feedErr := client.Feed(eventSource)
require.NoError(t, feedErr, "failed to init events feed")

var (
start = 5
end = 20
)

createOne(t, client, feed)
createMultipleDocs(t, client, feed, 5, 20)
createMultipleDocs(t, client, start, end)

eventsRes, eventsErr := feed.Events()
require.NoError(t, eventsErr, "failed to get events from EventSource")
require.Equal(t, end-start, len(eventsRes.Events), "unexpected number of events")
})
})

Expand All @@ -61,20 +79,20 @@ func TestEventFeed(t *testing.T) {

createOne(t, client, nil)

streamToken := getEventSource(t, client)
require.NotNil(t, streamToken, "failed to get stream token")
eventSource := getEventSource(t, client)
require.NotNil(t, eventSource, "failed to get an EventSource")

feed, feedErr := client.Feed(streamToken)
feed, feedErr := client.Feed(eventSource)
require.NoError(t, feedErr, "failed to init events feed")

eventsRes, eventsErr := feed.Events()
require.NoError(t, eventsErr, "failed to get events")
require.Equal(t, 0, len(eventsRes.Events), "unexpected number of events")

streamToken = getEventSource(t, client)
require.NotNil(t, streamToken, "failed to get stream token")
eventSource = getEventSource(t, client)
require.NotNil(t, eventSource, "failed to get an EventSource")

feed, feedErr = client.Feed(streamToken, fauna.EventFeedStartTime(time.Now().Add(-time.Minute*10).UnixMicro()))
feed, feedErr = client.Feed(eventSource, fauna.FeedStartFn(time.Now().Add(-time.Minute*10).UnixMicro()))
require.NoError(t, feedErr, "failed to init events feed")

feedRes, eventsErr := feed.Events()
Expand All @@ -97,17 +115,17 @@ Collection.create({ name: "EventFeedTest" })`, nil)
func getEventSource(t *testing.T, client *fauna.Client) fauna.EventSource {
t.Helper()

query, queryErr := fauna.FQL(`EventFeedTest.all().toStream()`, nil)
require.NoError(t, queryErr, "failed to create a query for stream token")
query, queryErr := fauna.FQL(`EventFeedTest.all().eventSource()`, nil)
require.NoError(t, queryErr, "failed to create a query for EventSource")

streamRes, streamResErr := client.Query(query)
require.NoError(t, streamResErr, "failed to init events feed")
feedRes, feedResErr := client.Query(query)
require.NoError(t, feedResErr, "failed to init events feed")

var eventSource fauna.EventSource
unmarshalErr := streamRes.Unmarshal(&eventSource)
require.NoError(t, unmarshalErr, "failed to unmarshal stream token")
require.NotNil(t, eventSource, "stream token is nil")
require.NotEmpty(t, eventSource, "stream token is empty")
unmarshalErr := feedRes.Unmarshal(&eventSource)
require.NoError(t, unmarshalErr, "failed to unmarshal EventSource")
require.NotNil(t, eventSource, "event source is nil")
require.NotEmpty(t, eventSource, "event source is empty")

return eventSource
}
Expand All @@ -132,7 +150,7 @@ func createOne(t *testing.T, client *fauna.Client, feed *fauna.EventFeed) {
assert.Equal(t, 1, len(eventsRes.Events), "unexpected number of events")
}

func createMultipleDocs(t *testing.T, client *fauna.Client, feed *fauna.EventFeed, start int, end int) {
func createMultipleDocs(t *testing.T, client *fauna.Client, start int, end int) {
t.Helper()

query, queryErr := fauna.FQL(`Set.sequence(${start}, ${end}).forEach(n => EventFeedTest.create({ n: n }))`, map[string]any{
Expand All @@ -143,8 +161,4 @@ func createMultipleDocs(t *testing.T, client *fauna.Client, feed *fauna.EventFee

_, err := client.Query(query)
require.NoError(t, err)

eventsRes, eventsErr := feed.Events()
require.NoError(t, eventsErr, "failed to get events from EventSource")
require.Equal(t, end-start, len(eventsRes.Events), "unexpected number of events")
}
2 changes: 1 addition & 1 deletion request.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (streamReq *streamRequest) do(cli *Client) (bytes io.ReadCloser, err error)

type feedRequest struct {
apiRequest
Stream EventSource
Source EventSource
StartTS int64
Cursor string
}
Expand Down
2 changes: 1 addition & 1 deletion serializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ func encode(v any, hint string) (any, error) {
return out, nil

case feedRequest:
out := map[string]any{"token": string(vt.Stream)}
out := map[string]any{"token": string(vt.Source)}
if vt.StartTS > 0 {
out["start_ts"] = vt.StartTS
}
Expand Down
2 changes: 1 addition & 1 deletion serializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,7 @@ func TestMarshalEventSourceStructs(t *testing.T) {
Context: context.Background(),
Headers: map[string]string{},
},
Stream: "",
Source: "",
StartTS: 0,
Cursor: "",
})
Expand Down

0 comments on commit 274c877

Please sign in to comment.