-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfeed.go
219 lines (184 loc) · 7.07 KB
/
feed.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
package sofa
import (
"bufio"
"encoding/json"
"net/http"
"net/url"
"github.com/google/go-querystring/query"
)
type changesFeedType string
const (
// FeedPolling represents a "normal" feed in CouchDB which regulargly polls
// the server for updates.
FeedPolling changesFeedType = "normal"
// FeedLongPolling represents the type of feed which uses long-polling to reduce
// polling frequency & therefore requests to the server.
FeedLongPolling changesFeedType = "longpoll"
// FeedContinuous represents the type of feed which holds open a connection to
// the server and receives a stream of events.
FeedContinuous changesFeedType = "continuous"
// FeedEventSource represents the CouchDB feed type of "eventsource". This type is not
// yet implemented in this library.
FeedEventSource changesFeedType = "eventsource"
)
// ChangesFeed is an interface which is implemented by all types of changes feed which are
// available on the server.
type ChangesFeed interface {
Next(ChangesFeedParams) (ChangesFeedUpdate, error)
}
// ChangesFeedUpdate is a single update from a changes feed.
type ChangesFeedUpdate struct {
LastSeq AlwaysString `json:"last_seq"`
Pending int64 `json:"pending"`
Results []ChangesFeedChange `json:"results"`
}
// ChangesFeedChange is a single change to a document in the database. One of
// more of these will be included in updates where any changes were actually made.
type ChangesFeedChange struct {
Changes []struct {
Rev string `json:"rev"`
} `json:"changes"`
Deleted bool `json:"deleted"`
ID string `json:"id"`
Seq AlwaysString `json:"seq"`
}
type ChangesFeedParams interface {
Values() (url.Values, error)
SetFeedType(ftype string)
}
// ChangesFeedParams1 includes all parameters which can be provided to
// control the output of a changes feed from a database on a version 1 server.
type ChangesFeedParams1 struct {
DocumentIDs []string `url:"doc_ids,omitempty"`
Conflicts BooleanParameter `url:"conflicts,omitempty"`
Descending BooleanParameter `url:"descending,omitempty"`
Feed string `url:"feed,omitempty"`
Filter string `url:"filter,omitempty"`
Heartbeat int64 `url:"heartbeat,omitempty"`
IncludeDocs BooleanParameter `url:"include_docs,omitempty"`
Attachments BooleanParameter `url:"attachments,omitempty"`
AttachmentEncodingInfo BooleanParameter `url:"att_encoding_info,omitempty"`
LastEventID int64 `url:"last-event-id,omitempty"`
Limit int64 `url:"limit,omitempty"`
Since int64 `url:"since,omitempty"`
Style string `url:"style,omitempty"`
Timeout int64 `url:"timeout,omitempty"`
View string `url:"view,omitempty"`
}
// Values converts a ChangesFeedParams1 to a url.Values by performing pre-conversion
// of any values which require special handling by conversion to JSON before being
// passed to CouchDB.
func (params *ChangesFeedParams1) Values() (url.Values, error) {
v, err := query.Values(params)
if err != nil {
return nil, err
}
if params.DocumentIDs != nil {
jBytes, err := json.Marshal(params.DocumentIDs)
if err != nil {
return nil, err
}
v.Set("doc_ids", string(jBytes))
}
return v, nil
}
func (params *ChangesFeedParams1) SetFeedType(ftype string) {
params.Feed = ftype
}
// ChangesFeedParams2 includes all parameters which can be provided to
// control the output of a changes feed from a database on a version 2/3 server.
type ChangesFeedParams2 struct {
DocumentIDs []string `url:"doc_ids,omitempty"`
Conflicts BooleanParameter `url:"conflicts,omitempty"`
Descending BooleanParameter `url:"descending,omitempty"`
Feed string `url:"feed,omitempty"`
Filter string `url:"filter,omitempty"`
Heartbeat int64 `url:"heartbeat,omitempty"`
IncludeDocs BooleanParameter `url:"include_docs,omitempty"`
Attachments BooleanParameter `url:"attachments,omitempty"`
AttachmentEncodingInfo BooleanParameter `url:"att_encoding_info,omitempty"`
LastEventID int64 `url:"last-event-id,omitempty"`
Limit int64 `url:"limit,omitempty"`
Since string `url:"since,omitempty"`
Style string `url:"style,omitempty"`
Timeout int64 `url:"timeout,omitempty"`
View string `url:"view,omitempty"`
}
type ChangesFeedParams3 = ChangesFeedParams2
// Values converts a ChangesFeedParams2 to a url.Values by performing pre-conversion
// of any values which require special handling by conversion to JSON before being
// passed to CouchDB.
func (params *ChangesFeedParams2) Values() (url.Values, error) {
v, err := query.Values(params)
if err != nil {
return nil, err
}
if params.DocumentIDs != nil {
jBytes, err := json.Marshal(params.DocumentIDs)
if err != nil {
return nil, err
}
v.Set("doc_ids", string(jBytes))
}
return v, nil
}
func (params *ChangesFeedParams2) SetFeedType(ftype string) {
params.Feed = ftype
}
// PollingChangesFeed can be used for either type of changes feed which polls
// the database for information ("normal" and "longpoll").
type PollingChangesFeed struct {
db *Database
feedType string
}
// Next polls for the next update from the database. This may block until a timeout is
// reached if there are no updates available.
func (f PollingChangesFeed) Next(params ChangesFeedParams) (ChangesFeedUpdate, error) {
params.SetFeedType(f.feedType)
v, err := params.Values()
if err != nil {
return ChangesFeedUpdate{}, err
}
var u ChangesFeedUpdate
_, err = f.db.con.unmarshalRequest("GET", f.db.ViewPath("_changes"), v, nil, &u)
return u, err
}
// ContinuousChangesFeed maintains a connection to the database and receives continuous
// updates as they arrive.
type ContinuousChangesFeed struct {
db *Database
params ChangesFeedParams
resp *http.Response
scanner *bufio.Scanner
}
// Next gets the next available item from the changes feed. This will block until an item
// becomes available.
func (f *ContinuousChangesFeed) Next() (ChangesFeedChange, error) {
if f.resp == nil {
f.params.SetFeedType(string(FeedContinuous))
v, err := f.params.Values()
if err != nil {
return ChangesFeedChange{}, err
}
resp, err := f.db.con.urlRequest("GET", f.db.con.URL(f.db.ViewPath("_changes")), v, nil, false)
if err != nil {
return ChangesFeedChange{}, err
}
f.resp = resp
f.scanner = bufio.NewScanner(f.resp.Body)
}
if !f.scanner.Scan() {
return ChangesFeedChange{}, f.scanner.Err()
}
// Swallow an extra newline if needed
if len(f.scanner.Bytes()) == 0 {
if !f.scanner.Scan() {
return ChangesFeedChange{}, f.scanner.Err()
}
}
var u ChangesFeedChange
if err := json.Unmarshal(f.scanner.Bytes(), &u); err != nil {
return ChangesFeedChange{}, err
}
return u, nil
}