forked from stoewer/go-nakadi
-
Notifications
You must be signed in to change notification settings - Fork 0
/
processor_integration_test.go
62 lines (51 loc) · 1.74 KB
/
processor_integration_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
// +build integration
package nakadi
import (
"encoding/json"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestIntegrationProcessor(t *testing.T) {
events := []DataChangeEvent{}
helperLoadTestData(t, "events-data-create-stream.json", &events)
eventType := &EventType{}
helperLoadTestData(t, "event-type-create.json", eventType)
newSub := &Subscription{OwningApplication: "test-app", EventTypes: []string{eventType.Name}, ReadFrom: "begin"}
subscription := helperCreateSubscriptions(t, eventType, newSub)[0]
defer helperDeleteSubscriptions(t, eventType, subscription)
client := New(defaultNakadiURL, &ClientOptions{ConnectionTimeout: time.Second})
publishAPI := NewPublishAPI(client, eventType.Name, &PublishOptions{Retry: true})
// publish events
for _, e := range events {
err := publishAPI.PublishDataChangeEvent([]DataChangeEvent{e})
require.NoError(t, err)
}
// process events
eventCh := make(chan DataChangeEvent, 1)
processor := NewProcessor(client, subscription.ID, &ProcessorOptions{BatchLimit: 2, EventsPerMinute: 60})
processor.Start(func(i int, id string, rawEvents []byte) error {
events := []DataChangeEvent{}
err := json.Unmarshal(rawEvents, &events)
assert.NoError(t, err)
if err != nil {
return err
}
for _, e := range events {
eventCh <- e
}
return nil
})
// compare events
for i := 0; i < len(events); i++ {
e := <-eventCh
assert.Equal(t, events[i].Metadata.EID, e.Metadata.EID)
assert.Equal(t, events[i].Metadata.EventType, e.Metadata.EventType)
assert.Equal(t, events[i].DataType, e.DataType)
assert.Equal(t, events[i].DataOP, e.DataOP)
assert.Equal(t, events[i].Data, e.Data)
}
err := processor.Stop()
assert.NoError(t, err)
}