-
Notifications
You must be signed in to change notification settings - Fork 0
/
analysis.go
131 lines (106 loc) · 2.3 KB
/
analysis.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package dataflow
import (
"errors"
"fmt"
)
func analyze(stages map[string]Stage) error {
if _, found := stages[Input]; !found {
return errors.New("data input stage not found")
} else if _, found := stages[sink]; !found {
return errors.New("final execution stage not found")
}
graph := convertStages(stages)
if containsLoop(graph) {
return errors.New("execution graph contains loops")
}
return consistencyCheck(graph)
}
// Reverse stage representation into adjacency list graph.
func convertStages(stages map[string]Stage) map[string][]string {
graph := make(map[string][]string, len(stages))
for node, stage := range stages {
for _, predecessor := range stage.requires {
graph[predecessor] = append(graph[predecessor], node)
}
}
return graph
}
// Classical 3-color DFS loop detection algorithm.
func containsLoop(graph map[string][]string) bool {
type color uint8
const (
white = 1 + iota
gray
black
)
var (
colors = make(map[string]color, len(graph))
dfs func(n string) bool
)
dfs = func(n string) bool {
// start node processing
colors[n] = gray
for _, successor := range graph[n] {
switch colors[successor] {
case gray:
return true
case white:
if dfs(successor) {
return true
}
}
}
// node processing finished
colors[n] = black
return false
}
// init all nodes as white
for node := range graph {
colors[node] = white
}
// traverse
for node := range graph {
if colors[node] == white {
if dfs(node) {
return true
}
}
}
return false
}
// Ensure execution graph consistency.
func consistencyCheck(graph map[string][]string) error {
var (
dfs func(n string) error
visited = make(map[string]bool)
)
dfs = func(n string) error {
visited[n] = true
if len(graph[n]) == 0 && n != sink {
// dangling execution
return fmt.Errorf("intermediate stage with no outputs: %s", n)
} else {
for _, succ := range graph[n] {
if err := dfs(succ); err != nil {
return err
}
}
}
return nil
}
for node, successors := range graph {
visited[node] = false
for _, succ := range successors {
visited[succ] = false
}
}
if err := dfs(Input); err != nil {
return err
}
for stage, vis := range visited {
if !vis {
return fmt.Errorf("unreachable stage: %s", stage)
}
}
return nil
}