-
Notifications
You must be signed in to change notification settings - Fork 1
/
router_context.go
125 lines (105 loc) · 2.12 KB
/
router_context.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package go_data_routing
import (
"context"
"fmt"
"sync"
"time"
)
type RouterContext struct {
routesWg sync.WaitGroup
ctx context.Context
routes map[string]*Route
routesOrder []string
}
func NewRouterContext(ctx context.Context) *RouterContext {
c := &RouterContext{
ctx: ctx,
routes: make(map[string]*Route),
}
return c
}
func (c *RouterContext) Route(name string) *Route {
c.routesWg.Add(1)
rb := NewRouteBuilder(c)
c.routes[name] = rb
c.routesOrder = append(c.routesOrder, name)
return rb
}
func (c *RouterContext) lookupRoute(s string) *Route {
return c.routes[s]
}
func (c *RouterContext) Run() {
fmt.Println("RouterContext > Run")
// link nodes by channels
for _, rt := range c.routes {
prev := (*Node)(nil)
for i, n := range rt.nodes {
if n.setup != nil {
n.setup()
}
if prev != nil {
prev.Output = n.Input
}
prev = n
if i == len(rt.nodes)-1 {
n.isLast = true
}
}
}
// launch runners
for _, rt := range c.routes {
for _, n := range rt.nodes {
go func(n *Node) {
n.runner()
c.onRunnerStop(n)
}(n)
}
}
go func() {
//c.Print()
for {
select {
case <-c.ctx.Done():
fmt.Println("Stopping..")
for _, rn := range c.routesOrder {
r := c.routes[rn]
r.waitZeroReferences()
if !r.nodes.getFirstNode().getStopped() {
r.nodes.getFirstNode().Input <- Exchange{Type: Stop}
}
}
return
case <-time.After(5 * time.Second):
c.Print()
}
}
}()
c.routesWg.Wait() // wait for all routes to stop
}
func (c *RouterContext) Print() {
//fmt.Print("\033[H\033[2J") // clear screen
for _, rn := range c.routesOrder {
r := c.routes[rn]
fmt.Printf("Route: %s\n", rn)
fmt.Println("type in Stop")
for _, n := range r.nodes {
s := n.getState()
fmt.Printf("└%-12s %-6d %v @ %p\n", n.typ.String(), s.in, boolToYN(s.stopped), n)
}
}
}
func (c *RouterContext) onRunnerStop(n *Node) {
n.onStop()
if !n.isLast {
n.Send(Exchange{Type: Stop})
return
}
// last node in a route
c.routesWg.Done()
}
func boolToYN(b bool) string {
if b {
return "Y"
}
return "N"
}