-
Notifications
You must be signed in to change notification settings - Fork 3
/
mock_retry_manager_test.go
92 lines (78 loc) · 2.28 KB
/
mock_retry_manager_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package consumer
import (
"context"
"errors"
"time"
failuremodel "github.com/inviqa/kafka-consumer-go/data/failure/model"
"github.com/inviqa/kafka-consumer-go/data/retry/model"
)
type mockRetryManager struct {
// indexed by topic name
recvdFailures map[string][]failuremodel.Failure
willErrorOnPublishFailure bool
willErrorOnGetBatch bool
retryErrored bool
retrySuccessful bool
runMaintenanceCallCount int
}
// GetBatch will return in-memory received failures as retries
func (mr *mockRetryManager) GetBatch(ctx context.Context, topic string, sequence uint8, interval time.Duration) ([]model.Retry, error) {
if mr.willErrorOnGetBatch {
return nil, errors.New("oops")
}
failures, ok := mr.recvdFailures[topic]
if !ok {
return []model.Retry{}, nil
}
var rts []model.Retry
for _, failure := range failures {
rts = append(rts, model.Retry{
PayloadJSON: failure.Message,
PayloadHeaders: failure.MessageHeaders,
PayloadKey: failure.MessageKey,
Topic: failure.Topic,
KafkaPartition: failure.KafkaPartition,
KafkaOffset: failure.KafkaOffset,
})
}
return rts, nil
}
func (mr *mockRetryManager) MarkSuccessful(ctx context.Context, retry model.Retry) error {
mr.retrySuccessful = true
return nil
}
func (mr *mockRetryManager) MarkErrored(ctx context.Context, retry model.Retry, err error) error {
mr.retryErrored = true
return nil
}
func (mr *mockRetryManager) PublishFailure(ctx context.Context, f failuremodel.Failure) error {
if mr.willErrorOnPublishFailure {
return errors.New("oops")
}
mr.recvdFailures[f.Topic] = append(mr.recvdFailures[f.Topic], f)
return nil
}
func (mr *mockRetryManager) RunMaintenance(ctx context.Context) error {
mr.runMaintenanceCallCount++
return nil
}
func newMockRetryManager(willError bool) *mockRetryManager {
return &mockRetryManager{
recvdFailures: map[string][]failuremodel.Failure{},
willErrorOnPublishFailure: willError,
}
}
func (mr *mockRetryManager) getPublishedFailureCountByTopic(topic string) int {
f, ok := mr.recvdFailures[topic]
if !ok {
return 0
}
return len(f)
}
func (mr *mockRetryManager) getFirstPublishedFailureByTopic(topic string) *failuremodel.Failure {
f, ok := mr.recvdFailures[topic]
if !ok {
return nil
}
return &f[0]
}