-
Notifications
You must be signed in to change notification settings - Fork 1
/
node.go
102 lines (81 loc) · 1.41 KB
/
node.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package go_data_routing
import (
"sync"
)
type nodeType int
//go:generate stringer -type=nodeType
const (
source nodeType = iota + 1
sink
filter
processor
to
wiretap
consumer
//aggregator
//enricher
)
type exchangeType int
const (
Request exchangeType = iota + 1
RequestReply
Stop
)
type Exchange struct {
Type exchangeType
ReturnAddress chan Exchange
Initiator *Node
Header interface{}
Msg Job // both Input & result
}
type NodeState struct {
stopped bool
in int
}
type Node struct {
typ nodeType
isLast bool
Input chan Exchange
Output chan Exchange
sync.Mutex
NodeState
setup func()
runner func()
}
func newNode(t nodeType) *Node {
n := &Node{typ: t}
n.Input = make(chan Exchange)
return n
}
func (n *Node) onStop() {
n.Lock()
defer n.Unlock()
n.stopped = true
}
func (n *Node) getStopped() bool {
n.Lock()
defer n.Unlock()
return n.stopped
}
func (n *Node) getState() NodeState {
n.Lock()
defer n.Unlock()
return n.NodeState
}
func (n *Node) incrIn() {
n.Lock()
defer n.Unlock()
n.in++
}
// Send an exchange to a next node
func (n *Node) Send(e Exchange) {
//fmt.Printf("Send> %v +v | req-rep: %v, %v %v\n", n.isLast, e.Type == RequestReply, e.Initiator != nil, n.isLast)
if n.isLast {
// return reply to the initiator of Request-Reply
if e.Type == RequestReply && e.Initiator != nil {
e.ReturnAddress <- e
}
return
}
n.Output <- e
}