-
Notifications
You must be signed in to change notification settings - Fork 1
/
produced_message.go
41 lines (35 loc) · 1.07 KB
/
produced_message.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package kafkamock
import (
"bytes"
"encoding/json"
"github.com/confluentinc/confluent-kafka-go/kafka"
"testing"
)
type ProducerMessage struct {
Message *kafka.Message
}
func (m * ProducerMessage) AssertValueEquals(t *testing.T, v interface{}) {
value, _ := json.Marshal(v)
isEqual := bytes.Compare(m.Message.Value, value) == 0
if !isEqual {
t.Errorf("Produced message (%s) is not equal as expected (%s)", m.Message.Value, value)
}
}
func (m * ProducerMessage) AssertTopic(t *testing.T, topic string) {
tc := m.Message.TopicPartition.Topic
if *tc != topic {
t.Errorf("Produced topic (%s) is not equal as expected (%s)", *tc, topic)
}
}
func (m * ProducerMessage) AssertPartition(t *testing.T, partition int32) {
p := m.Message.TopicPartition.Partition
if p != partition {
t.Errorf("Produced partition (%d) is not equal as expected (%d)", p, partition)
}
}
func (m * ProducerMessage) AssertOffset(t *testing.T, offset kafka.Offset) {
o := m.Message.TopicPartition.Offset
if o != offset {
t.Errorf("Produced offset (%d) is not equal as expected (%d)", o, offset)
}
}