-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathflow.go
237 lines (200 loc) · 8.09 KB
/
flow.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
// Package flow provides support for very basic FBP / pipelines. Each handler represents async
// stage consuming data from the input channel and publishing results to the output channel.
// Each handler runs in separate goroutines.
//
// User must implement Handler function and add it to the Flow. Each handler usually creates an output channel, reads from
// the input channel, processes data and sends results to the output channel. Processing sequence defined by order of those handlers.
//
// Any Handler can run in multiple concurrent goroutines (workers) by using the Parallel decorator.
//
// FanOut allows to pass multiple handlers in the broadcast mode, i.e., each handler gets every input record.
// Outputs from these handlers merged and combined into a single output channel.
//
// Processing error detected as return error value from user's handler func. Such error interrupts all other
// running handlers gracefully and won't keep any goroutine running/leaking.
//
// Each Flow object can be executed only once.
//
// Handler has to handle context cancellation as a termination signal.
package flow
import (
"context"
"log"
"sync"
"golang.org/x/sync/errgroup"
)
// Flow object with list of all runnable functions and common context
type Flow struct {
group *errgroup.Group // all handlers runs in this errgroup
ctx context.Context // context used for cancellation
lastCh chan interface{} // last channel in flow
funcs []func() error // all runnable functions
fanoutBuffer int // buffer size for fanout
activateOnce sync.Once // prevents multiple activations of flow
}
// Handler defines function type used as flow stages, implementations of handler provided by the user.
// Each handler returns the new out(put) channel and runnable fn function.
// fn will be executed in a separate goroutine. fn is thread-safe and may have mutable state. It will live
// all flow lifetime and usually implements read->process->write cycle. If fn returns != nil it indicates
// critical failure and will stop, with canceled context, all handlers in the flow.
type Handler func(ctx context.Context, in chan interface{}) (out chan interface{}, runFn func() error)
// New creates flow object with context and common errgroup. This errgroup used to schedule and cancel all handlers.
// options defines non-default parameters.
func New(options ...Option) *Flow {
// default flow object parameters
result := Flow{
ctx: context.Background(),
fanoutBuffer: 1,
}
// apply options
for _, opt := range options {
opt(&result)
}
wg, ctx := errgroup.WithContext(result.ctx)
result.ctx = context.WithValue(ctx, MetricsContextKey, NewMetrics())
result.group = wg
return &result
}
// Add one or more handlers. Each will be linked to the previous one and order of handlers defines sequence of stages in the flow.
// can be called multiple times.
func (f *Flow) Add(handlers ...Handler) *Flow {
for _, handler := range handlers {
ch, fn := handler(f.ctx, f.lastCh)
if ch == nil {
log.Fatalf("can't register flow handler with nil channel!")
}
f.lastCh = ch
f.funcs = append(f.funcs, fn) // register runnable with flow executor
}
return f
}
// Parallel is a decorator, converts & adopts single handler to concurrently executed (parallel) handler.
// First it makes multiple handlers, registers all of them with common input channel as workers
// and then merges their output channels into single out channel (fan-in)
func (f *Flow) Parallel(concurrent int, handler Handler) Handler {
if concurrent <= 1 { // not really parallel
return handler
}
return func(ctx context.Context, ch chan interface{}) (chan interface{}, func() error) {
var outChs []chan interface{}
for n := 0; n < concurrent; n++ {
ctxWithID := context.WithValue(ctx, CidContextKey, n) // put n as id to context for parallel handlers
out, fn := handler(ctxWithID, ch) // all parallel handlers read from the same lastCh
f.funcs = append(f.funcs, fn) // register runnable with flow executor
outChs = append(outChs, out)
}
return f.merge(ctx, outChs) // returns chan and mergeFn. will be registered in usual way, as all handlers do
}
}
// FanOut runs all handlers against common input channel and results go to common output channel.
// This will broadcast each record to multiple handlers and each may process it in different way.
func (f *Flow) FanOut(handler Handler, handlers ...Handler) Handler {
// handlers params split as head handler and tail handlers. This is done just to prevent empty list of handlers
// to be passed and ensure at least one handler.
if len(handlers) == 0 {
return handler
}
return func(ctx context.Context, ch chan interface{}) (chan interface{}, func() error) {
handlers = append([]Handler{handler}, handlers...) // add head handler to head
inChs := make([]chan interface{}, len(handlers)) // input channels for forked input from ch
outChs := make([]chan interface{}, len(handlers)) // output channels for merging
for i := 0; i < len(handlers); i++ {
inChs[i] = make(chan interface{}, f.fanoutBuffer) // buffered to allow async readers
ctxWithID := context.WithValue(ctx, CidContextKey, i) // keep i as ID for handler in context
out, fn := handlers[i](ctxWithID, inChs[i]) // handle forked input
f.funcs = append(f.funcs, fn) // register runnable with flow executor
outChs[i] = out // collect all output channels
}
// broadcast every record from input channel ch to multiple, forked channels
go func() {
defer func() {
for _, in := range inChs {
close(in)
}
}()
for e := range ch {
// blocked, but buffered to allow async (optional) iteration
for _, in := range inChs {
if err := Send(ctx, in, e); err != nil {
return
}
}
}
}()
return f.merge(ctx, outChs) // returns merged chan and mergeFn. will be registered in usual way, as all handlers do
}
}
// Go activates flow. Should be called exactly once after all handlers added, next calls ignored.
func (f *Flow) Go() *Flow {
f.activateOnce.Do(func() {
for _, fn := range f.funcs {
if fn != nil { // in rare cases handler may rerun nil fn if nothing runnable (async) needed
f.group.Go(fn)
}
}
})
return f
}
// Wait for completion, returns error if any happened in handlers.
func (f *Flow) Wait() error {
return f.group.Wait()
}
// Channel returns last (final) channel in flow. Usually consumers don't need this channel, but can be used
// to return some final result(s)
func (f *Flow) Channel() chan interface{} {
return f.lastCh
}
// Metrics returns all user-defined counters from the context.
func (f *Flow) Metrics() *Metrics {
return f.ctx.Value(MetricsContextKey).(*Metrics)
}
// merge gets multiple channels and fan-in to a single output channel
func (f *Flow) merge(ctx context.Context, chs []chan interface{}) (mergeCh chan interface{}, mergeFn func() error) {
mergeCh = make(chan interface{})
mergeFn = func() error {
defer close(mergeCh)
gr, ctxGroup := errgroup.WithContext(ctx)
for _, ch := range chs {
ch := ch
gr.Go(func() error {
for e := range ch {
if err := Send(ctxGroup, mergeCh, e); err != nil {
return err
}
}
return nil
})
}
return gr.Wait()
}
return mergeCh, mergeFn
}
// CID returns concurrentID set by parallel wrapper
// The only use for cid is to alo some indication/logging.
func CID(ctx context.Context) int {
cid, ok := ctx.Value(CidContextKey).(int)
if !ok { // for non-parallel won't have any
cid = 0
}
return cid
}
// Send entry to channel or returns error if context canceled.
// Shortcut for send-or-fail-on-cancel most handlers implement.
func Send(ctx context.Context, ch chan interface{}, e interface{}) error {
select {
case ch <- e:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
// Recv gets entry from the channel or returns error if context canceled.
// Shortcut for read-or-fail-on-cancel most handlers implement.
func Recv(ctx context.Context, ch chan interface{}) (interface{}, error) {
select {
case val := <-ch:
return val, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}