This repository has been archived by the owner on Jun 25, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 16
/
Copy pathscheduler.go
126 lines (112 loc) · 3.03 KB
/
scheduler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package sdm630
import (
"log"
"time"
. "github.com/gonium/gosdm630/internal/meters"
)
type MeterScheduler struct {
out QuerySnipChannel
control ControlSnipChannel
meters map[uint8]*Meter
mc *MeasurementCache
}
func NewMeterScheduler(
out QuerySnipChannel,
control ControlSnipChannel,
devices map[uint8]*Meter,
) *MeterScheduler {
return &MeterScheduler{
out: out,
meters: devices,
control: control,
}
}
// SetupScheduler creates a scheduler and its wiring
func SetupScheduler(meters map[uint8]*Meter, qe *ModbusEngine) (*MeterScheduler, QuerySnipChannel) {
// Create Channels that link the goroutines
var scheduler2queryengine = make(QuerySnipChannel)
var queryengine2scheduler = make(ControlSnipChannel)
var queryengine2tee = make(QuerySnipChannel)
scheduler := NewMeterScheduler(
scheduler2queryengine,
queryengine2scheduler,
meters,
)
go qe.Transform(
scheduler2queryengine, // input
queryengine2scheduler, // error
queryengine2tee, // output
)
return scheduler, queryengine2tee
}
func (q *MeterScheduler) SetCache(mc *MeasurementCache) {
q.mc = mc
}
func (q *MeterScheduler) produceSnips(out QuerySnipChannel) {
for {
for _, meter := range q.meters {
operations := meter.Producer.Produce()
for _, operation := range operations {
// Check if meter is still valid
if meter.GetState() != UNAVAILABLE {
snip := NewQuerySnip(meter.DeviceId, operation)
q.out <- snip
}
}
}
}
}
func (q *MeterScheduler) supervisor() {
for {
for _, meter := range q.meters {
if meter.GetState() == UNAVAILABLE {
log.Printf("Attempting to ping unavailable device %d", meter.DeviceId)
// inject probe snip - the re-enabling logic is in Run()
operation := meter.Producer.Probe()
snip := NewQuerySnip(meter.DeviceId, operation)
q.out <- snip
}
}
time.Sleep(1 * time.Minute)
}
}
func (q *MeterScheduler) Run() {
source := make(QuerySnipChannel)
go q.supervisor()
go q.produceSnips(source)
for {
select {
case snip := <-source:
q.out <- snip
case controlSnip := <-q.control:
switch controlSnip.Type {
case CONTROLSNIP_ERROR:
// search meter and deactivate it...
log.Printf("Device %d failed terminally due to: %s",
controlSnip.DeviceId, controlSnip.Message)
if meter, ok := q.meters[controlSnip.DeviceId]; ok {
state := meter.GetState()
meter.UpdateState(UNAVAILABLE)
if state == AVAILABLE && q.mc != nil {
// purge cache if present
q.mc.Purge(meter.DeviceId)
}
} else {
log.Fatal("Internal device id mismatch - this should not happen!")
}
case CONTROLSNIP_OK:
// search meter and reactivate it...
if meter, ok := q.meters[controlSnip.DeviceId]; ok {
if meter.GetState() != AVAILABLE {
log.Printf("Reactivating device %d", controlSnip.DeviceId)
meter.UpdateState(AVAILABLE)
}
} else {
log.Fatal("Internal device id mismatch - this should not happen!")
}
default:
log.Fatal("Received unknown control snip - something weird happened.")
}
}
}
}