forked from stoewer/go-nakadi
-
Notifications
You must be signed in to change notification settings - Fork 0
/
streams_integration_test.go
62 lines (50 loc) · 1.85 KB
/
streams_integration_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
// +build integration
package nakadi
import (
"encoding/json"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestIntegrationStreamAPI(t *testing.T) {
events := []DataChangeEvent{}
helperLoadTestData(t, "events-data-create-stream.json", &events)
eventType := &EventType{}
helperLoadTestData(t, "event-type-create.json", eventType)
newSub := &Subscription{OwningApplication: "test-app", EventTypes: []string{eventType.Name}, ReadFrom: "begin"}
subscription := helperCreateSubscriptions(t, eventType, newSub)[0]
defer helperDeleteSubscriptions(t, eventType, subscription)
client := New(defaultNakadiURL, &ClientOptions{ConnectionTimeout: time.Second})
publishAPI := NewPublishAPI(client, eventType.Name, &PublishOptions{Retry: true})
// publish events
for _, e := range events {
err := publishAPI.PublishDataChangeEvent([]DataChangeEvent{e})
require.NoError(t, err)
}
// stream events
streamAPI := NewStream(client, subscription.ID, &StreamOptions{BatchLimit: 4, MaxUncommittedEvents: 2, CommitRetry: true})
received := []DataChangeEvent{}
for len(received) < len(events) {
cursor, rawEvents, err := streamAPI.NextEvents()
require.NoError(t, err)
temp := []DataChangeEvent{}
err = json.Unmarshal(rawEvents, &temp)
require.NoError(t, err)
assert.Len(t, temp, 2)
received = append(received, temp...)
err = streamAPI.CommitCursor(cursor)
require.NoError(t, err)
}
// compare events
require.Equal(t, len(events), len(received))
for i, e := range events {
assert.Equal(t, e.Metadata.EID, received[i].Metadata.EID)
assert.Equal(t, e.Metadata.EventType, received[i].Metadata.EventType)
assert.Equal(t, e.DataType, received[i].DataType)
assert.Equal(t, e.DataOP, received[i].DataOP)
assert.Equal(t, e.Data, received[i].Data)
}
err := streamAPI.Close()
assert.NoError(t, err)
}