forked from brimdata/super
-
Notifications
You must be signed in to change notification settings - Fork 0
/
mux.go
155 lines (139 loc) · 3.6 KB
/
mux.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
package op
import (
"context"
"sync"
"github.com/brimdata/zed"
"github.com/brimdata/zed/zbuf"
"github.com/brimdata/zed/zcode"
)
type labeled struct {
zbuf.Batch
label int
}
func Unwrap(batch zbuf.Batch) (zbuf.Batch, int) {
var label int
if inner, ok := batch.(*labeled); ok {
batch = inner
label = inner.label
}
return batch, label
}
// Mux implements the muxing of a set of parallel paths at the output of
// a flowgraph. It also implements the double-EOS algorithm with proc.Latch
// to detect the end of each parallel stream. Its output protocol is a single EOS
// when all of the upstream legs are done at which time it cancels the flowgraoh.
// Each batch returned by the mux is wrapped in a Batch, which can be unwrappd
// with Unwrap to extract the integer index of the output (in left-to-right
// DFS traversal order of the flowgraph). This proc requires more than one
// parent; use proc.Latcher for a single-output flowgraph.
type Mux struct {
pctx *Context
once sync.Once
ch <-chan result
parents []*puller
nparents int
}
type result struct {
batch zbuf.Batch
label int
err error
}
type puller struct {
zbuf.Puller
ch chan<- result
label int
}
func (p *puller) run(ctx context.Context) {
for {
batch, err := p.Pull(false)
select {
case p.ch <- result{batch, p.label, err}:
if batch == nil || err != nil {
return
}
case <-ctx.Done():
return
}
}
}
func NewMux(pctx *Context, parents []zbuf.Puller) *Mux {
if len(parents) <= 1 {
panic("mux.New() must be called with two or more parents")
}
ch := make(chan result)
pullers := make([]*puller, 0, len(parents))
for label, parent := range parents {
pullers = append(pullers, &puller{NewCatcher(parent), ch, label})
}
return &Mux{
pctx: pctx,
ch: ch,
parents: pullers,
nparents: len(parents),
}
}
// Pull implements the merge logic for returning data from the upstreams.
func (m *Mux) Pull(bool) (zbuf.Batch, error) {
if m.nparents == 0 {
// When we get to EOS, we make sure all the flowgraph
// goroutines terminate by canceling the proc context.
m.pctx.Cancel()
return nil, nil
}
m.once.Do(func() {
for _, puller := range m.parents {
go puller.run(m.pctx.Context)
}
})
for {
select {
case res := <-m.ch:
batch := res.batch
err := res.err
if err != nil {
m.pctx.Cancel()
return nil, err
}
if batch != nil {
batch = &labeled{batch, res.label}
} else {
eoc := EndOfChannel(res.label)
batch = &eoc
m.nparents--
}
return batch, err
case <-m.pctx.Context.Done():
return nil, m.pctx.Context.Err()
}
}
}
type Single struct {
zbuf.Puller
eos bool
}
func NewSingle(parent zbuf.Puller) *Single {
return &Single{Puller: parent}
}
func (s *Single) Pull(bool) (zbuf.Batch, error) {
if s.eos {
return nil, nil
}
batch, err := s.Puller.Pull(false)
if batch == nil {
s.eos = true
eoc := EndOfChannel(0)
batch = &eoc
}
return batch, err
}
// EndOfChannel is an empty batch that represents the termination of one
// of the output paths of a muxed flowgraph and thus will be ignored downstream
// unless explicitly detected.
type EndOfChannel int
var _ zbuf.Batch = (*EndOfChannel)(nil)
func (*EndOfChannel) Ref() {}
func (*EndOfChannel) Unref() {}
func (*EndOfChannel) Values() []zed.Value { return nil }
func (*EndOfChannel) Vars() []zed.Value { return nil }
func (*EndOfChannel) CopyValue(zed.Value) *zed.Value { return nil }
func (*EndOfChannel) NewValue(zed.Type, zcode.Bytes) *zed.Value { return nil }