-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmonitor.go
98 lines (78 loc) · 2.11 KB
/
monitor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package main
import (
"bufio"
"time"
)
// Monitor is a monitoring routine that tracks changes to a log file,
// calculates metrics based on accumulated data
// and issues messages based on changes to a log file and/or metrics.
func Monitor(cfg *Config, s *Session, doneChan chan struct{}, msgChan chan<- msg) {
f := NewFrame(cfg.MTF, cfg.PollInt)
r := bufio.NewReader(s.File)
stat, err := s.File.Stat()
if err != nil {
panic(err)
}
prevSize := stat.Size() // starting file read position
// Move reader's needle to the position where we stopped reading last time or to the initial position.
if _, err := s.File.Seek(prevSize, 0); err != nil {
panic(err)
}
// Start tickers
tickerPolling := time.NewTicker(time.Second * time.Duration(cfg.PollInt))
tickerReporting := time.NewTicker(time.Second * time.Duration(cfg.ReportInt))
polls := 0
monitorLoop:
for {
select {
// Main completion handler.
case <-doneChan:
tickerPolling.Stop()
tickerReporting.Stop()
break monitorLoop
// Poll ticker.
case t := <-tickerPolling.C:
{
polls++
// Capture point data.
p := NewPoint(s.File, r, prevSize)
err := p.GetChange()
if err != nil {
msgChan <- msgErr(err)
}
// Register current level of traffic, i.e.
// quantity of log entries since last poll.
f.Rec(p.linesQty)
// Pass entries to the session storage.
err = s.ConsumeLines(p.lines)
if err != nil {
msgChan <- msgErr(err)
}
// Print out current point data.
if cfg.SendTicks {
msgChan <- msgPoint(f.AvgTraffic, s.AlertThreshold)
}
// Monitor alert threshold.
if cfg.SendAlerts {
if s.ShouldEscalate(f.AvgTraffic) {
msgChan <- msgAlertEsc(f.AvgTraffic, t)
}
if s.ShouldDeescalate(f.AvgTraffic) {
msgChan <- msgAlertDeesc(f.AvgTraffic, t)
}
}
if polls == cfg.MaxPolls {
doneChan <- struct{}{}
}
}
// Reporting ticker.
case t := <-tickerReporting.C:
{
if cfg.SendReports {
// Get data accumulated during report interval and clean report buffer.
msgChan <- msgReport(s.FlushReport(cfg, &t))
}
}
}
}
}