-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstream.go
195 lines (163 loc) · 4.32 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
package impact
import (
"encoding/json"
"errors"
"io/ioutil"
"math"
"regexp"
"time"
)
// decode channel names
const (
ACCELERATION string = `^[A-Z0-9\_]+_[A-Z]N[A-Z0-9]$`
VELOCITY string = `^[A-Z0-9\_]+_[A-Z]H[A-Z0-9]$`
)
// running stream state information
type Stream struct {
Name string // station name
Latitude float32 // station latitude
Longitude float32 // station longitude
Rate float64 // stream sampling rate
Gain float64 // stream gain
Q float64 // high-pass filter coeff
h *HighPass // high-pass filter
i *Integrator // intergrator
mmi int32 // the last intesity sent
flush time.Time // previous flush
last time.Time // previous packet
level int32 // the noise threshold level
probation time.Duration // the noise probation period
jailed bool // it's been too noisy
good time.Time // the last good data time
bad time.Time // the last bad data time
}
// pull in public stream information from a json config file
func LoadStreams(config string) (map[string]*Stream, error) {
f, err := ioutil.ReadFile(config)
if err != nil {
return nil, err
}
var s map[string]*Stream
err = json.Unmarshal(f, &s)
if err != nil {
return nil, err
}
return s, nil
}
// initialize a stream, setting type of input and filters
func (s *Stream) Init(srcname string, probation time.Duration, level int32) (bool, error) {
s.probation = probation
s.level = level
s.h = nil
s.i = nil
// update structure and filters
if regexp.MustCompile(VELOCITY).MatchString(srcname) {
if s.Q > 0.0 {
s.h = NewHighPass(s.Gain, s.Q)
}
} else if regexp.MustCompile(ACCELERATION).MatchString(srcname) {
if s.Q > 0.0 {
s.h = NewHighPass(s.Gain, s.Q)
s.i = NewIntegrator(1.0, 1.0/s.Rate, s.Q)
}
} else {
return false, errors.New("unable to match srcname for velocity or acceleration")
}
return true, nil
}
// time to send a message, either timeout or different value
func (s *Stream) Flush(d time.Duration, mmi int32) bool {
// same intensity?
if s.mmi == mmi {
// ignore times
if d == 0 {
return false
}
// too soon?
if time.Since(s.flush).Seconds() < d.Seconds() {
return false
}
}
// keep state
s.flush = time.Now()
s.mmi = mmi
// a noisy stream
if s.mmi > s.level {
// should be jailed ...
if s.last.Sub(s.good) > s.probation {
s.jailed = true
}
s.bad = s.last
} else {
if s.last.Sub(s.bad) > s.probation {
s.jailed = false
}
s.good = s.last
}
// skip as noisy
if s.jailed {
return false
}
return true
}
// given an array of samples .. pass them through a block at a time
func (s *Stream) ProcessSamples(source string, srcname string, starttime time.Time, samples []int32) (Message, error) {
// resulting possible message
m := Message{Source: source, Quality: "measured", Latitude: s.Latitude, Longitude: s.Longitude, Comment: s.Name}
// need a sampling rate
if !(s.Rate > 0.0) {
return m, errors.New("invalid sampling rate")
}
// check we have samples
if !(len(samples) > 0) {
return m, errors.New("no samples given")
}
// need high pass filter at least
if s.i != nil && s.h == nil {
return m, errors.New("filter not fully initialised")
}
// has there been a break?
if math.Abs(starttime.Sub(s.last).Seconds()-1.0/s.Rate) > (0.5 / s.Rate) {
// reset filters
if s.h != nil {
s.h.Reset()
}
if s.i != nil {
s.i.Reset()
}
// first run it backwards (a pre-conditioning strategy)
for i := range samples {
if s.i != nil {
s.h.Sample(s.i.Sample((float64)(samples[len(samples)-i-1])))
} else if s.h != nil {
s.h.Sample((float64)(samples[len(samples)-i-1]))
}
}
// reset the noise times
s.bad = time.Unix(0, 0)
s.good = time.Unix(0, 0)
}
// reset time
m.Time = starttime
m.MMI = Intensity(0)
// find max velocity
var max float64 = 0.0
for i := range samples {
var f float64
if s.i != nil {
f = s.h.Sample(s.i.Sample((float64)(samples[i])))
} else if s.h != nil {
f = s.h.Sample((float64)(samples[i]))
} else {
f = (float64)(samples[i]) / s.Gain
}
if math.Abs(f) > max {
max = math.Abs(f)
m.Time = starttime.Add((time.Duration)((float64)(time.Second) * (float64)(i) / s.Rate))
m.MMI = Intensity(max)
}
}
// get ready for next packet
s.last = starttime.Add((time.Duration)((float64)(time.Second) * (float64)(len(samples)-1) / s.Rate))
return m, nil
}