From 91cfadb2295e83372b5310b4647239364363c8c0 Mon Sep 17 00:00:00 2001 From: Thomas FOUBERT Date: Fri, 24 May 2024 23:28:17 -0400 Subject: [PATCH 1/3] chore: refactored monitoring Handler entirely The monitoring routine is now working with an operator pattern like the dashboard, which enable us to control the basic functions of monitoring inside other packages such as server To my knowledge it didn't introduce any race-cond as the operator is mutexed and locked for every read/write operation on non-mutexed field Also nil pointer deref should happen as the pointers susceptible of being nil are checked --- dashboard/dashboard.go | 11 +- monitoring/monitoring.go | 197 +++++++++++++++++++++-------------- monitoring/timeserie.go | 6 +- monitoring/timeserie_trim.go | 8 +- server/cmd.go | 7 +- server/server.go | 6 +- server/server_test.go | 5 +- 7 files changed, 143 insertions(+), 97 deletions(-) diff --git a/dashboard/dashboard.go b/dashboard/dashboard.go index 80ac9c3..bcd6916 100644 --- a/dashboard/dashboard.go +++ b/dashboard/dashboard.go @@ -2,6 +2,7 @@ package dashboard import ( "sync" + "time" ) type Probe struct { @@ -15,14 +16,16 @@ type Data struct { type Operator struct { sync.Mutex - workers map[int]chan *Data - In chan *Data + workers map[int]chan *Data + PollingFreq time.Duration + In chan *Data } func Handle() *Operator { operator := &Operator{ - workers: make(map[int]chan *Data), - In: make(chan *Data), + workers: make(map[int]chan *Data), + PollingFreq: 5 * time.Second, + In: make(chan *Data), } go func(operator *Operator) { for { diff --git a/monitoring/monitoring.go b/monitoring/monitoring.go index 92870be..6d0e699 100644 --- a/monitoring/monitoring.go +++ b/monitoring/monitoring.go @@ -41,102 +41,140 @@ type Payload struct { Machine string `json:"-"` } -type probeObject struct { +type probeWorker struct { sync.Mutex - name string - data chan *Payload - stop chan bool - status probeStatus - counter int - lastNormal time.Time - timeSerie *probeTimeSerie + name string + data chan *Payload + stop chan bool + status probeStatus + counter int + lastNormal time.Time + timeSerie *probeTimeSerie + headOperator *Operator +} + +type Operator struct { + sync.Mutex + In chan *Payload + probeMap sync.Map + probeList []string + dashboardOperator *dashboard.Operator } // Handle function handles the payload from the API server -func Handle(channel chan *Payload, dashboardOperator *dashboard.Operator) { +func Handle(dashboardOperator *dashboard.Operator) *Operator { log.Debug("Starting monitoring.Handle") - var probeMap sync.Map - var probeList = make([]string, 0) - var timer = time.NewTimer(5 * time.Second) + operator := &Operator{ + In: make(chan *Payload), + probeMap: sync.Map{}, + probeList: make([]string, 0), + dashboardOperator: dashboardOperator, + } + go func(operator *Operator, dashboardOperator *dashboard.Operator) { + var timer = time.NewTimer(0) + if dashboardOperator != nil { + timer = time.NewTimer(operator.dashboardOperator.PollingFreq) + } + for { + select { + case <-timer.C: + if dashboardOperator == nil { + continue + } - for { - select { - case <-timer.C: - if dashboardOperator == nil { - continue - } + operator.dashboardOperator.Lock() + operator.InformDashboard() + timer.Reset(operator.dashboardOperator.PollingFreq) + operator.dashboardOperator.Unlock() - dashboardPayload := &dashboard.Data{ - Probes: make([]*dashboard.Probe, 0), - } + case payload := <-operator.In: + if loaded, ok := operator.probeMap.Load(payload.Machine); ok { + probe, ok := loaded.(*probeWorker) + if !ok { + log.WithFields(log.Fields{ + "machine": payload.Machine, + }).Fatal("Failed to load probe") + } + + probe.informWorker(payload) + } else { + // Create a new probe + newProbe := makeProbe(payload, operator) - for _, probe := range probeList { - if loaded, ok := probeMap.Load(probe); ok { - probe := loaded.(*probeObject) - probe.Lock() - dashboardProbe := &dashboard.Probe{ - Name: strings.Clone(probe.name), - Status: strings.Clone(probe.status.String()), + value, loaded := operator.probeMap.LoadOrStore(payload.Machine, newProbe) + if loaded { + log.WithFields(log.Fields{ + "machine": payload.Machine, + }).Fatal("Machine already exists") } - probe.Unlock() - dashboardPayload.Probes = append(dashboardPayload.Probes, dashboardProbe) - } - } - dashboardOperator.In <- dashboardPayload - timer.Reset(5 * time.Second) - continue - case payload := <-channel: - if loaded, ok := probeMap.Load(payload.Machine); ok { - probe := loaded.(*probeObject) - if payload.MachineStatus == "delete" { - // Delete the probe - probe.Lock() - for i, name := range probeList { - if name == payload.Machine { - probeList = append(probeList[:i], probeList[i+1:]...) - break - } + probe, ok := value.(*probeWorker) + if !ok { + log.WithFields(log.Fields{ + "machine": payload.Machine, + }).Fatal("Failed to load probe") } - probe.delete() - probeMap.Delete(payload.Machine) - } else { - // Send the payload to the probe - probe.data <- payload - } - } else { - // Create a new probe - newProbe := makeProbe(payload) - value, loaded := probeMap.LoadOrStore(payload.Machine, newProbe) - if loaded { log.WithFields(log.Fields{ + "probe": probe.name, "machine": payload.Machine, - }).Fatal("Machine already exists") - } + "status": probe.status, + }).Info("Starting probe thread") - probe, ok := value.(*probeObject) - if !ok { - log.WithFields(log.Fields{ - "machine": payload.Machine, - }).Fatal("Failed to load probe") + operator.Lock() + operator.probeList = append(operator.probeList, payload.Machine) + operator.Unlock() + + go probe.work() + + probe.informWorker(payload) } + } + } + }(operator, dashboardOperator) + return operator +} - log.WithFields(log.Fields{ - "probe": probe.name, - "machine": payload.Machine, - "status": probe.status, - }).Info("Starting probe thread") +func (o *Operator) InformDashboard() { + dashboardPayload := &dashboard.Data{ + Probes: make([]*dashboard.Probe, 0), + } + + for _, probe := range o.probeList { + if loaded, ok := o.probeMap.Load(probe); ok { + probe := loaded.(*probeWorker) + probe.Lock() + dashboardProbe := &dashboard.Probe{ + Name: strings.Clone(probe.name), + Status: strings.Clone(probe.status.String()), + } + probe.Unlock() + dashboardPayload.Probes = append(dashboardPayload.Probes, dashboardProbe) + } + } + + o.dashboardOperator.In <- dashboardPayload +} - probeList = append(probeList, payload.Machine) - go probe.work() - probe.data <- payload +func (p *probeWorker) informWorker(payload *Payload) { + if payload.MachineStatus == "delete" { + p.Lock() + p.headOperator.Lock() + for i, name := range p.headOperator.probeList { + if name == payload.Machine { + p.headOperator.probeList = append(p.headOperator.probeList[:i], p.headOperator.probeList[i+1:]...) + break } } + p.delete() + p.headOperator.probeMap.Delete(payload.Machine) + p.headOperator.Unlock() + } else { + p.data <- payload } } -func (p *probeObject) work() { +func (p *probeWorker) work() { inactivityDelay, err := time.ParseDuration(config.Server.ProbeInactivityDelay) if err != nil { log.WithError(err).Fatal("Failed to parse inactivity delay") @@ -172,7 +210,7 @@ func (p *probeObject) work() { } } -func (p *probeObject) timerIncrement() { +func (p *probeWorker) timerIncrement() { switch p.status { case normal: p.updateStatus() @@ -204,7 +242,7 @@ func (p *probeObject) timerIncrement() { } } -func (p *probeObject) reset() { +func (p *probeWorker) reset() { if p.status > normal { log.Infof("Machine %s is back in normal state\n", p.name) } @@ -213,7 +251,7 @@ func (p *probeObject) reset() { p.lastNormal = time.Now() } -func (p *probeObject) updateStatus() { +func (p *probeWorker) updateStatus() { p.status++ p.counter = 0 if p.status > normal { @@ -224,7 +262,7 @@ func (p *probeObject) updateStatus() { log.Infof("Machine %s is now in %s state\n", p.name, p.status.String()) } -func (p *probeObject) delete() { +func (p *probeWorker) delete() { log.WithFields(log.Fields{ "probe": p.name, }).Info("Deleting probe") @@ -233,8 +271,8 @@ func (p *probeObject) delete() { close(p.stop) } -func makeProbe(originPayload *Payload) *probeObject { - return &probeObject{ +func makeProbe(originPayload *Payload, operator *Operator) *probeWorker { + return &probeWorker{ name: originPayload.Machine, data: make(chan *Payload, 1), stop: make(chan bool), @@ -249,5 +287,6 @@ func makeProbe(originPayload *Payload) *probeObject { }, size: 1, }, + headOperator: operator, } } diff --git a/monitoring/timeserie.go b/monitoring/timeserie.go index cd38502..e60d777 100644 --- a/monitoring/timeserie.go +++ b/monitoring/timeserie.go @@ -48,7 +48,7 @@ type probeTimeSerie struct { size int } -func (p *probeObject) workServices(payload *Payload) { +func (p *probeWorker) workServices(payload *Payload) { p.timeSerie.Lock() p.storePayload(payload) if p.timeSerie.size > trimTimeSeriesThreshold { @@ -58,7 +58,7 @@ func (p *probeObject) workServices(payload *Payload) { p.timeSerie.Unlock() } -func (p *probeObject) storePayload(payload *Payload) { +func (p *probeWorker) storePayload(payload *Payload) { tempServiceStatus := make(map[string]*serviceStatus) tempCount := 0 @@ -112,7 +112,7 @@ func (p *probeObject) storePayload(payload *Payload) { }).Trace("Payload stored in timeserie") } -func (p *probeObject) checkAlert() { +func (p *probeWorker) checkAlert() { if p.timeSerie.head == nil { return } diff --git a/monitoring/timeserie_trim.go b/monitoring/timeserie_trim.go index 1ee0d27..1a3aff4 100644 --- a/monitoring/timeserie_trim.go +++ b/monitoring/timeserie_trim.go @@ -4,7 +4,7 @@ import log "github.com/sirupsen/logrus" const trimTimeSeriesThreshold = 100 -func (p *probeObject) trimTimeSerie() { +func (p *probeWorker) trimTimeSerie() { p.timeSerie.Lock() defer p.timeSerie.Unlock() @@ -55,7 +55,7 @@ func (p *probeObject) trimTimeSerie() { } } -func (p *probeObject) trimToLastNode() { +func (p *probeWorker) trimToLastNode() { p.timeSerie.head.previous = nil p.timeSerie.size = 1 log.WithFields(log.Fields{ @@ -65,7 +65,7 @@ func (p *probeObject) trimToLastNode() { return } -func (p *probeObject) trimToNode(count int) { +func (p *probeWorker) trimToNode(count int) { currentNode := p.timeSerie.head for i := 0; i < count-1; i++ { currentNode = currentNode.previous @@ -79,7 +79,7 @@ func (p *probeObject) trimToNode(count int) { return } -func (p *probeObject) trimToGivenNode(currentNode *timeSerieNode, count int) { +func (p *probeWorker) trimToGivenNode(currentNode *timeSerieNode, count int) { currentNode.previous = nil p.timeSerie.size = count log.WithFields(log.Fields{ diff --git a/server/cmd.go b/server/cmd.go index e88c922..810077a 100644 --- a/server/cmd.go +++ b/server/cmd.go @@ -16,7 +16,6 @@ import ( func Cmd(rootCmd *cobra.Command) { var noAlerting bool var noDash bool - payloadChannel := make(chan *monitoring.Payload) serverCmd := &cobra.Command{ Use: "run", @@ -35,12 +34,14 @@ func Cmd(rootCmd *cobra.Command) { if !noDash { dashboardOperator = dashboard.Handle() } else { + dashboardOperator = nil log.Warn("Dashboard disabled") } - go monitoring.Handle(payloadChannel, dashboardOperator) + + monitoringOperator := monitoring.Handle(dashboardOperator) addr := fmt.Sprintf("%s:%d", config.Server.ListeningAddress, config.Server.Port) - newServer(payloadChannel, dashboardOperator).Listen(addr) + newServer(monitoringOperator, dashboardOperator).Listen(addr) // Start panicwatch to catch panics err := panicwatch.Start(panicwatch.Config{ OnPanic: func(p panicwatch.Panic) { diff --git a/server/server.go b/server/server.go index 1d3af37..93bef4f 100644 --- a/server/server.go +++ b/server/server.go @@ -18,7 +18,7 @@ import ( //go:embed static/* var dashboardStatic embed.FS -func newServer(payloadChannel chan *monitoring.Payload, dashboardOperator *dashboard.Operator) *fiber.App { +func newServer(monitoringOperator *monitoring.Operator, dashboardOperator *dashboard.Operator) *fiber.App { app := fiber.New(fiber.Config{ AppName: "DeepSentinel API", }) @@ -28,11 +28,11 @@ func newServer(payloadChannel chan *monitoring.Payload, dashboardOperator *dashb app.Get("/health", getHealthHandler) app.Post("/probe/:machine/report", func(c *fiber.Ctx) error { - return postProbeReportHandler(c, payloadChannel) + return postProbeReportHandler(c, monitoringOperator.In) }) app.Delete("/probe/:machine", func(c *fiber.Ctx) error { - return deleteProbeHandler(c, payloadChannel) + return deleteProbeHandler(c, monitoringOperator.In) }) if dashboardOperator != nil { diff --git a/server/server_test.go b/server/server_test.go index 9b347d3..c9765f0 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -37,7 +37,10 @@ func TestNewServer(t *testing.T) { _ = payload } }() - s := newServer(payloadTestChan, dashboardOperator) + + monitoringOperator := monitoring.Handle(dashboardOperator) + + s := newServer(monitoringOperator, dashboardOperator) // Test if the server is created assert.NotNil(t, s, "newServer() returned nil") From 0e7d87d8faf021dabcb6cef0bce6621f1d5ea582 Mon Sep 17 00:00:00 2001 From: Thomas FOUBERT Date: Fri, 24 May 2024 23:39:17 -0400 Subject: [PATCH 2/3] chore: comments for new exported components --- monitoring/monitoring.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/monitoring/monitoring.go b/monitoring/monitoring.go index 6d0e699..15b81b5 100644 --- a/monitoring/monitoring.go +++ b/monitoring/monitoring.go @@ -53,6 +53,11 @@ type probeWorker struct { headOperator *Operator } +// Operator defines the handler for the monitoring +// It contains the channel to receive the payload from the API server +// It contains the map of the probes and the list of the probes +// It contains the dashboard operator to inform the dashboard of the status of the probes +// Everything that defines the monitoring event loop is here type Operator struct { sync.Mutex In chan *Payload @@ -83,7 +88,7 @@ func Handle(dashboardOperator *dashboard.Operator) *Operator { } operator.dashboardOperator.Lock() - operator.InformDashboard() + operator.informDashboard() timer.Reset(operator.dashboardOperator.PollingFreq) operator.dashboardOperator.Unlock() @@ -135,7 +140,7 @@ func Handle(dashboardOperator *dashboard.Operator) *Operator { return operator } -func (o *Operator) InformDashboard() { +func (o *Operator) informDashboard() { dashboardPayload := &dashboard.Data{ Probes: make([]*dashboard.Probe, 0), } From 126191d8eb4efb79f73fb3bb71b3cfb2d7561aa9 Mon Sep 17 00:00:00 2001 From: Thomas FOUBERT Date: Sat, 25 May 2024 00:35:15 -0400 Subject: [PATCH 3/3] WIP - saving state --- monitoring/decision.go | 1 + monitoring/timeserie.go | 42 ++++++++++++++++++++++++++--------------- 2 files changed, 28 insertions(+), 15 deletions(-) create mode 100644 monitoring/decision.go diff --git a/monitoring/decision.go b/monitoring/decision.go new file mode 100644 index 0000000..2f2912a --- /dev/null +++ b/monitoring/decision.go @@ -0,0 +1 @@ +package monitoring diff --git a/monitoring/timeserie.go b/monitoring/timeserie.go index e60d777..6ac2fc9 100644 --- a/monitoring/timeserie.go +++ b/monitoring/timeserie.go @@ -31,17 +31,32 @@ func stringtoStatusType(str string) (statusType, error) { return fail, errors.New("invalid status string") } +func (s statusType) String() string { + statusStr := map[statusType]string{ + pass: "pass", + warn: "warn", + fail: "fail", + } + return statusStr[s] +} + type serviceStatus struct { status statusType count int } +type servicesStatus map[string]*serviceStatus + type timeSerieNode struct { timestamp time.Time - services map[string]*serviceStatus + services servicesStatus previous *timeSerieNode } +// probeTimeSerie is a mutexed linked list of timeSerieNode +// It is used to store the status of the services +// The head of the list is the latest status +// It goes like : previous <- ... <- head type probeTimeSerie struct { sync.Mutex head *timeSerieNode @@ -59,12 +74,11 @@ func (p *probeWorker) workServices(payload *Payload) { } func (p *probeWorker) storePayload(payload *Payload) { - tempServiceStatus := make(map[string]*serviceStatus) - tempCount := 0 + tempServiceStatus := make(servicesStatus) for service, state := range payload.Services { + tempCount := 0 parsedStatus, err := stringtoStatusType(state) - if err != nil { log.WithFields(log.Fields{ "probe": p.name, @@ -90,7 +104,7 @@ func (p *probeWorker) storePayload(payload *Payload) { "probe": p.name, "machine": payload.Machine, "service": service, - "status": parsedStatus, + "status": parsedStatus.String(), "count": tempCount, }).Trace("Service status stored in timeserie") } @@ -119,24 +133,22 @@ func (p *probeWorker) checkAlert() { for service, status := range p.timeSerie.head.services { var alertingStatus string - alert := false + toAlert := false lowThreshhold := config.Server.FailedToAlertedLowThreshold highThreshhold := config.Server.FailedToAlertedLowThreshold + config.Server.AlertedLowToAlertedHighThreshold - if status.status == fail && status.count >= lowThreshhold && status.count < highThreshhold { + if status.status == fail && status.count == lowThreshhold && status.count < highThreshhold { alertingStatus = "low" - alert = true + toAlert = true } else if status.status == fail && status.count == highThreshhold { alertingStatus = "high" - alert = true - } else if status.status == fail && status.count > highThreshhold { - alert = false - } else { - continue + toAlert = true + } else if status.status == fail { + toAlert = false } - if alert { + if toAlert { log.WithFields(log.Fields{ "probe": p.name, "machine": p.name, @@ -145,7 +157,7 @@ func (p *probeWorker) checkAlert() { }).Warnf("Service in fail status. Alerting %s", alertingStatus) service = p.name + "-" + service alerting.ServerAlert("service", service, alertingStatus) - } else if !alert && status.count%10 == 0 { + } else if status.status == fail && status.count > lowThreshhold && status.count%10 == 0 { log.WithFields(log.Fields{ "probe": p.name, "machine": p.name,