Skip to content

Commit

Permalink
feedOpts
Browse files Browse the repository at this point in the history
  • Loading branch information
cynicaljoy committed Nov 1, 2024
1 parent dec72aa commit 153807c
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 31 deletions.
30 changes: 28 additions & 2 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,11 +430,24 @@ func (c *Client) setHeader(key, val string) {

// Feed opens an event feed from the event source
func (c *Client) Feed(stream EventSource, opts ...FeedOptFn) (*EventFeed, error) {
return newEventFeed(c, stream, false, opts...)
feedOpts, err := parseFeedOptions(opts...)
if err != nil {
return nil, err
}

return newEventFeed(c, stream, feedOpts)
}

// FeedFromQuery opens an event feed from a query
func (c *Client) FeedFromQuery(query *Query, opts ...FeedOptFn) (*EventFeed, error) {
feedOpts, err := parseFeedOptions(opts...)
if err != nil {
return nil, err
}
if feedOpts.Cursor != nil {
return nil, fmt.Errorf("cannot use EventFeedCursor with FeedFromQuery")
}

res, err := c.Query(query)
if err != nil {
return nil, err
Expand All @@ -445,5 +458,18 @@ func (c *Client) FeedFromQuery(query *Query, opts ...FeedOptFn) (*EventFeed, err
return nil, fmt.Errorf("query should return a fauna.EventSource but got %T", res.Data)
}

return newEventFeed(c, eventSource, true, opts...)
return newEventFeed(c, eventSource, feedOpts)
}

func parseFeedOptions(opts ...FeedOptFn) (*feedOptions, error) {
feedOpts := feedOptions{}
for _, optFn := range opts {
optFn(&feedOpts)
}

if feedOpts.StartTS != nil && feedOpts.Cursor != nil {
return nil, fmt.Errorf("cannot use EventFeedStartTime and EventFeedCursor together")
}

return &feedOpts, nil
}
8 changes: 4 additions & 4 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,21 +169,21 @@ func argsStringFromMap(input map[string]string, currentArgs ...string) string {
}

// FeedOptFn function to set options on the [fauna.EventFeed]
type FeedOptFn func(req *feedRequest)
type FeedOptFn func(req *feedOptions)

// EventFeedCursor set the cursor for the [fauna.EventFeed]
// cannot be used with [EventFeedStartTime] or in [fauna.Client.FeedFromQuery]
func EventFeedCursor(cursor string) FeedOptFn {
return func(req *feedRequest) { req.Cursor = cursor }
return func(req *feedOptions) { req.Cursor = &cursor }
}

// EventFeedStartTime set the start time for the [fauna.EventFeed]
// cannot be used with [EventFeedCursor]
func EventFeedStartTime(ts int64) FeedOptFn {
return func(req *feedRequest) { req.StartTS = ts }
return func(req *feedOptions) { req.StartTS = &ts }
}

// EventFeedPageSize set the page size for the [fauna.EventFeed]
func EventFeedPageSize(ts int) FeedOptFn {
return func(req *feedRequest) { req.PageSize = ts }
return func(req *feedOptions) { req.PageSize = &ts }
}
42 changes: 18 additions & 24 deletions event_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package fauna

import (
"encoding/json"
"fmt"
)

// EventFeed represents an event feed subscription.
Expand All @@ -13,33 +12,27 @@ type EventFeed struct {

decoder *json.Decoder

opts []FeedOptFn
opts *feedOptions
lastCursor string
}

func newEventFeed(client *Client, source EventSource, fromQuery bool, opts ...FeedOptFn) (*EventFeed, error) {
type feedOptions struct {
PageSize *int
Cursor *string
StartTS *int64
}

func newEventFeed(client *Client, source EventSource, opts *feedOptions) (*EventFeed, error) {
feed := &EventFeed{
client: client,
source: source,
opts: opts,
}

// init a feed request to validate feed options
req, err := feed.newFeedRequest()
if err != nil {
return nil, err
}
if fromQuery && len(req.Cursor) > 0 {
return nil, fmt.Errorf("cannot use EventFeedCursor with FeedFromQuery")
}
if req.StartTS > 0 && len(req.Cursor) > 0 {
return nil, fmt.Errorf("cannot set both EventFeedStartTime and EventFeedCursor")
}

return feed, nil
}

func (ef *EventFeed) newFeedRequest(opts ...FeedOptFn) (*feedRequest, error) {
func (ef *EventFeed) newFeedRequest() (*feedRequest, error) {
req := feedRequest{
apiRequest: apiRequest{
ef.client.ctx,
Expand All @@ -48,20 +41,21 @@ func (ef *EventFeed) newFeedRequest(opts ...FeedOptFn) (*feedRequest, error) {
Source: ef.source,
Cursor: ef.lastCursor,
}

if (opts != nil) && (len(opts) > 0) {
ef.opts = append(ef.opts, opts...)
if ef.opts.StartTS != nil {
req.StartTS = *ef.opts.StartTS
}

for _, optFn := range ef.opts {
optFn(&req)
if ef.opts.Cursor != nil {
req.Cursor = *ef.opts.Cursor
}
if ef.opts.PageSize != nil {
req.PageSize = *ef.opts.PageSize
}

return &req, nil
}

func (ef *EventFeed) open(opts ...FeedOptFn) error {
req, err := ef.newFeedRequest(opts...)
func (ef *EventFeed) open() error {
req, err := ef.newFeedRequest()
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion event_feed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestEventFeed(t *testing.T) {
require.NoError(t, unmarshalErr, "failed to unmarshal EventSource")

_, feedErr := client.Feed(response, fauna.EventFeedStartTime(time.Now().UnixMicro()), fauna.EventFeedCursor("cursor"))
require.ErrorContains(t, feedErr, "cannot set both EventFeedStartTime and EventFeedCursor")
require.ErrorContains(t, feedErr, "cannot use EventFeedStartTime and EventFeedCursor together")
})
})

Expand Down

0 comments on commit 153807c

Please sign in to comment.