-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpipelines.go
93 lines (86 loc) · 1.93 KB
/
pipelines.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package pipelines
import "sync"
func New(input chan any, options ...option) Listener {
config := new(config)
config.apply(options...)
return &listener{
input: input,
logger: config.logger,
groups: config.groups,
}
}
type listener struct {
logger Logger
groups []*group
input chan any
}
func (this *listener) Listen() {
input := this.input
for _, group := range this.groups {
output := make(chan any, group.bufferCapacity)
go group.run(input, output)
input = output
}
for v := range input {
this.logger.Printf("value at end of pipeline: %v", v)
}
}
type group struct {
bufferCapacity int
sendViaSelectCallback func(any)
stations []Station
}
func (this *group) run(input, output chan any) {
if len(this.stations) > 1 {
this.runFannedOutStation(input, output)
} else {
this.runStation(this.stations[0], input, output)
}
}
func (this *group) runFannedOutStation(input, final chan any) {
defer close(final)
var outs []chan any
for _, station := range this.stations {
out := make(chan any)
outs = append(outs, out)
go this.runStation(station, input, out)
}
var waiter sync.WaitGroup
waiter.Add(len(outs))
defer waiter.Wait()
for _, out := range outs {
go func(out chan any) {
defer waiter.Done()
for item := range out {
final <- item
}
}(out)
}
}
func (this *group) runStation(station Station, input, output chan any) {
defer close(output)
var out func(v any)
if this.sendViaSelectCallback != nil {
out = sendViaSelect(output, this.sendViaSelectCallback)
} else {
out = blockingSend(output)
}
if finalizer, ok := station.(Finalizer); ok {
defer finalizer.Finalize(out)
}
for input := range input {
station.Do(input, out)
}
}
func sendViaSelect(output chan any, callback func(any)) func(any) {
return func(v any) {
select {
case output <- v:
default:
callback(v)
}
}
}
func blockingSend(output chan any) func(any) {
return func(v any) { output <- v }
}