-
Notifications
You must be signed in to change notification settings - Fork 0
/
flow.go
143 lines (114 loc) · 3.28 KB
/
flow.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package dataflow
import (
"fmt"
)
const (
// terminal labels
Input = "exec-input" // used externally
sink = "exec-sink"
)
/* Execution stages */
// Constructor for intermediate execution stages.
func NewStage(label string, exec StageExecution, requires ...string) Stage {
return Stage{label: label, exec: exec, requires: requires}
}
// Final stage aggregates results from the network.
func NewFinalStage(exec StageExecution, requires ...string) Stage {
return Stage{label: sink, exec: exec, requires: requires}
}
// Stage used for initial argument distribution in the network.
func inputStage() Stage {
return Stage{label: Input, exec: func(args ...interface{}) (interface{}, error) { return args[0], nil }}
}
/* Execution graph */
func NewExecutionGraph(final Stage, stages ...Stage) (*ExecutionGraph, error) {
var stageMap = map[string]Stage{
Input: inputStage(),
sink: final,
}
for _, stage := range stages {
if stage.label == Input || stage.label == sink {
return nil, fmt.Errorf("label '%s' interfering with internal stage", stage.label)
}
stageMap[stage.label] = stage
}
if err := analyze(stageMap); err != nil {
return nil, err
}
return &ExecutionGraph{stages: stageMap}, nil
}
// On each method invocation a new instance of flow network
// will be spawned, to independently process incoming requests.
func (g ExecutionGraph) Run() (TotalExecution, Destructor) {
var (
in = make(chan either, 1)
out = make(chan either, 1)
stages = make(map[string]*node)
)
// construct flow network
for label, stage := range g.stages {
stages[label] = &node{label: label, exec: stage.exec}
}
// data pipes wiring
for label, stage := range g.stages {
n := stages[label]
for _, required := range stage.requires {
pipe := make(chan either, 1)
n.in = append(n.in, pipe)
stages[required].out = append(stages[required].out, pipe)
}
}
// communication with external world
stages[Input].in = []<-chan either{in}
stages[sink].out = []chan<- either{out}
// spawn network
for _, stage := range stages {
go func(s *node) { runStage(s) }(stage)
}
var totalExec TotalExecution = func(arg interface{}) (interface{}, error) {
in <- either{Value: arg}
result := <-out
return result.Value, result.Err
}
return totalExec, func() { close(in) }
}
func runStage(stage *node) {
var args = make([]interface{}, len(stage.in))
for {
var executionErr error
// wait until all arguments become available
for i := 0; i < len(stage.in); i++ {
arg, ok := <-stage.in[i]
if !ok {
// propagate network collapsing
for _, successor := range stage.out {
close(successor)
}
return
}
args[i] = arg.Value
if arg.Err != nil {
// currently catches last error only
executionErr = arg.Err
}
}
if executionErr != nil {
// if error emerged somewhere in the execution path - do not
// run computation, just propagate error to all successors
for _, successor := range stage.out {
successor <- either{Err: executionErr}
}
continue
}
// execute stage computation
val, err := stage.exec(args...)
if err != nil {
err = fmt.Errorf("stage %s: %w", stage.label, err)
}
result := either{Value: val, Err: err}
// ... and fan out its result
for _, successor := range stage.out {
successor <- result
}
}
}