-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathstream.go
119 lines (103 loc) · 2.77 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package nevent
import (
"fmt"
"strings"
"github.com/nats-io/nats.go"
)
type Stream struct {
jet nats.JetStreamContext
}
func NewStream(nc *nats.Conn) (*Stream, error) {
jet, err := nc.JetStream()
if err != nil {
return nil, err
}
return &Stream{jet: jet}, nil
}
// EnsureStream provide single subject for purperse
func (it *Stream) EnsureStream(s string, opts ...StreamOption) (*nats.StreamInfo, error) {
o := it.GetOptions(opts...)
subject := o.subjectTransformer(s)
name := o.subjectNormalizer(subject)
cfg := *o.config
cfg.Name = name
cfg.Subjects = []string{subject}
si, err := it.jet.AddStream(&cfg)
if err != nil && err.Error() == "stream name already in use" {
si, err = it.jet.UpdateStream(&cfg)
if err != nil {
if strings.HasPrefix(err.Error(), "stream configuration update can not change") {
if o.force {
err = it.jet.DeleteStream(cfg.Name)
if err != nil {
return nil, fmt.Errorf("delete stream error when force update stream: %w", err)
}
si, err = it.jet.AddStream(&cfg)
if err != nil {
return nil, fmt.Errorf("add stream error when force update stream: %w", err)
}
} else {
return nil, fmt.Errorf("try use force options(data loss) or do stream migration: %w", err)
}
} else {
return nil, fmt.Errorf("update stream error: %w", err)
}
}
}
return si, err
}
func (it *Stream) GetOptions(opts ...StreamOption) *streamOptions {
o := &streamOptions{}
for _, opt := range opts {
opt.apply(o)
}
if o.config == nil {
o.config = new(nats.StreamConfig)
}
if o.subjectTransformer == nil {
o.subjectTransformer = DefaultSubjectTransformer()
}
if o.subjectNormalizer == nil {
o.subjectNormalizer = DefaultSubjectNormalize
}
return o
}
func StreamConfig(config *nats.StreamConfig) StreamOption {
return newFuncStreamOption(func(o *streamOptions) {
o.config = config
})
}
// you will lost data in stream
func StreamForceUpdate() StreamOption {
return newFuncStreamOption(func(o *streamOptions) {
o.force = true
})
}
func StreamSubjectTransformer(subjectTransformer SubjectTransformer) StreamOption {
return newFuncStreamOption(func(o *streamOptions) {
o.subjectTransformer = subjectTransformer
})
}
func StreamSubjectNormalizer(subjectNormalizer SubjectNormalizer) StreamOption {
return newFuncStreamOption(func(o *streamOptions) {
o.subjectNormalizer = subjectNormalizer
})
}
type streamOptions struct {
config *nats.StreamConfig
subjectTransformer SubjectTransformer
subjectNormalizer SubjectNormalizer
force bool
}
type StreamOption interface {
apply(*streamOptions)
}
type funcStreamOption struct {
f func(*streamOptions)
}
func (it *funcStreamOption) apply(o *streamOptions) {
it.f(o)
}
func newFuncStreamOption(f func(*streamOptions)) StreamOption {
return &funcStreamOption{f: f}
}