Skip to content

Commit

Permalink
[Heartbeat] Fix monitor duration wrapper (#36900)
Browse files Browse the repository at this point in the history
Fixes #36892.

Monitor duration is not being calculated correctly, where start time is initialized after monitor execution and wrapping order is overriding retries event order.

(cherry picked from commit 5d6c308)
  • Loading branch information
emilioalvap authored and mergify[bot] committed Oct 25, 2023
1 parent fd0446f commit 6533ef2
Show file tree
Hide file tree
Showing 8 changed files with 227 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ is collected by it.

- Fix panics when parsing dereferencing invalid parsed url. {pull}34702[34702]
- Fix retries to trigger on a down monitor with no previous state. {pull}36842[36842]
- Fix monitor duration calculation with retries. {pull}36900[36900]

*Metricbeat*

Expand Down
4 changes: 4 additions & 0 deletions heartbeat/monitors/wrappers/summarizer/plugdrop.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,7 @@ func (d DropBrowserExtraEvents) BeforeSummary(event *beat.Event) BeforeSummaryAc
func (d DropBrowserExtraEvents) BeforeRetry() {
// noop
}

func (d DropBrowserExtraEvents) BeforeEachEvent(event *beat.Event) {
// noop
}
6 changes: 6 additions & 0 deletions heartbeat/monitors/wrappers/summarizer/plugerr.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ func NewBrowserErrPlugin() *BrowserErrPlugin {
}
}

func (esp *BrowserErrPlugin) BeforeEachEvent(event *beat.Event) {} // noop

func (esp *BrowserErrPlugin) EachEvent(event *beat.Event, eventErr error) EachEventActions {
// track these to determine if the journey
// needs an error injected due to incompleteness
Expand Down Expand Up @@ -127,6 +129,10 @@ func (esp *LightweightErrPlugin) BeforeRetry() {
// noop
}

func (esp *LightweightErrPlugin) BeforeEachEvent(event *beat.Event) {
// noop
}

// errToFieldVal reflects on the error and returns either an *ecserr.ECSErr if possible, and a look.Reason otherwise
func errToFieldVal(eventErr error) (errVal interface{}) {
var asECS *ecserr.ECSErr
Expand Down
15 changes: 11 additions & 4 deletions heartbeat/monitors/wrappers/summarizer/plugmondur.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,26 @@ type LightweightDurationPlugin struct {
}

func (lwdsp *LightweightDurationPlugin) EachEvent(event *beat.Event, _ error) EachEventActions {
// Effectively only runs once, on the first event
return 0 // noop
}

func (lwdsp *LightweightDurationPlugin) BeforeEachEvent(event *beat.Event) {
// Effectively capture on the first event
if lwdsp.startedAt == nil {
now := time.Now()
lwdsp.startedAt = &now
}
return 0
}

func (lwdsp *LightweightDurationPlugin) BeforeSummary(event *beat.Event) BeforeSummaryActions {
_, _ = event.PutValue("monitor.duration.us", look.RTTMS(time.Since(*lwdsp.startedAt)))
return 0
}

func (lwdsp *LightweightDurationPlugin) BeforeRetry() {}
func (lwdsp *LightweightDurationPlugin) BeforeRetry() {
// Reset event start time
lwdsp.startedAt = nil
}

// BrowserDurationPlugin handles the logic for writing the `monitor.duration.us` field
// for browser monitors.
Expand Down Expand Up @@ -82,4 +88,5 @@ func (bwdsp *BrowserDurationPlugin) BeforeSummary(event *beat.Event) BeforeSumma
return 0
}

func (bwdsp *BrowserDurationPlugin) BeforeRetry() {}
func (bwdsp *BrowserDurationPlugin) BeforeRetry() {}
func (bwdsp *BrowserDurationPlugin) BeforeEachEvent(event *beat.Event) {} // noop
4 changes: 4 additions & 0 deletions heartbeat/monitors/wrappers/summarizer/plugstatestat.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ func (ssp *BrowserStateStatusPlugin) BeforeRetry() {
ssp.cssp.BeforeRetry()
}

func (ssp *BrowserStateStatusPlugin) BeforeEachEvent(event *beat.Event) {} //noop

// LightweightStateStatusPlugin encapsulates the writing of the primary fields used by the summary,
// those being `state.*`, `status.*` , `event.type`, and `monitor.check_group`
type LightweightStateStatusPlugin struct {
Expand Down Expand Up @@ -113,6 +115,8 @@ func (ssp *LightweightStateStatusPlugin) BeforeRetry() {
ssp.cssp.BeforeRetry()
}

func (ssp *LightweightStateStatusPlugin) BeforeEachEvent(event *beat.Event) {} // noop

type commonSSP struct {
js *jobsummary.JobSummary
stateTracker *monitorstate.Tracker
Expand Down
2 changes: 2 additions & 0 deletions heartbeat/monitors/wrappers/summarizer/plugurl.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,5 @@ func (busp *BrowserURLPlugin) BeforeSummary(event *beat.Event) BeforeSummaryActi
func (busp *BrowserURLPlugin) BeforeRetry() {
busp.urlFields = nil
}

func (busp *BrowserURLPlugin) BeforeEachEvent(event *beat.Event) {} //noop
19 changes: 16 additions & 3 deletions heartbeat/monitors/wrappers/summarizer/summarizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ type Summarizer struct {
startedAt time.Time
}

func (s Summarizer) beforeEachEvent(event *beat.Event) {
for _, plugin := range s.plugins {
plugin.BeforeEachEvent(event)
}
}

// EachEventActions is a set of options using bitmasks to inform execution after the EachEvent callback
type EachEventActions uint8

Expand All @@ -58,6 +64,9 @@ const RetryBeforeSummary = 1
// in one location. Prior to this code was strewn about a bit more and following it was
// a bit trickier.
type SummarizerPlugin interface {
// BeforeEachEvent is called on each event, and allows for the mutation of events
// before monitor execution
BeforeEachEvent(event *beat.Event)
// EachEvent is called on each event, and allows for the mutation of events
EachEvent(event *beat.Event, err error) EachEventActions
// BeforeSummary is run on the final (summary) event for each monitor.
Expand Down Expand Up @@ -106,6 +115,10 @@ func (s *Summarizer) setupPlugins() {
// This adds the state and summary top level fields.
func (s *Summarizer) Wrap(j jobs.Job) jobs.Job {
return func(event *beat.Event) ([]jobs.Job, error) {

// call BeforeEachEvent for each plugin before running job
s.beforeEachEvent(event)

conts, eventErr := j(event)

s.mtx.Lock()
Expand Down Expand Up @@ -145,14 +158,14 @@ func (s *Summarizer) Wrap(j jobs.Job) jobs.Job {
// kibana queries
// 2. If the site error is very short 1s gives it a tiny bit of time to recover
delayedRootJob := func(event *beat.Event) ([]jobs.Job, error) {
time.Sleep(s.retryDelay)
for _, p := range s.plugins {
p.BeforeRetry()
}
time.Sleep(s.retryDelay)
return s.rootJob(event)
return s.Wrap(s.rootJob)(event)
}

conts = []jobs.Job{delayedRootJob}
return []jobs.Job{delayedRootJob}, eventErr
}
}

Expand Down
183 changes: 183 additions & 0 deletions heartbeat/monitors/wrappers/summarizer/summarizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ package summarizer

import (
"fmt"
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/heartbeat/look"
"github.com/elastic/beats/v7/heartbeat/monitors/jobs"
"github.com/elastic/beats/v7/heartbeat/monitors/stdfields"
"github.com/elastic/beats/v7/heartbeat/monitors/wrappers/monitorstate"
Expand Down Expand Up @@ -219,3 +221,184 @@ func TestSummarizer(t *testing.T) {
})
}
}

// Test wrapper plugin hook order. Guaranteed order for plugins to be called upon determines
// what data can be appended to the event at each stage through retries. With this guarantee,
// plugins just need to ascertain that their invariants apply through hook execution order
func TestSummarizerPluginOrder(t *testing.T) {
t.Parallel()

// these tests use strings to describe sequences of events
tests := []struct {
name string
maxAttempts int
expectedOrder []string
}{
{
"one attempt",
1,
[]string{"bee", "job", "ee", "bs"},
},
{
"two attempts",
2,
[]string{"bee", "job", "ee", "bs", "br", "bee", "job", "ee", "bs"},
},
}

for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()

// Monitor setup
tracker := monitorstate.NewTracker(monitorstate.NilStateLoader, false)
sf := stdfields.StdMonitorFields{ID: "testmon", Name: "testmon", Type: "http", MaxAttempts: uint16(tt.maxAttempts)}

// Test locals
calls := make(chan string, 100)
mtx := sync.Mutex{}
appendCall := func(event string) {
mtx.Lock()
defer mtx.Unlock()

// Append call to global chan
calls <- event
}

// We simplify these to always down since hook order should not be
// determined by status
job := func(event *beat.Event) (j []jobs.Job, retErr error) {

calls <- "job"

event.Fields = mapstr.M{
"monitor": mapstr.M{
"id": "test",
"status": string(monitorstate.StatusDown),
},
}

return nil, fmt.Errorf("dummyerr")
}

s := NewSummarizer(job, sf, tracker)
// Shorten retry delay to make tests run faster
s.retryDelay = 2 * time.Millisecond
// Add mock plugin
s.plugins = append(s.plugins, &MockPlugin{
eachEvent: func(_ *beat.Event, _ error) {
appendCall("ee")
},
beforeSummary: func(_ *beat.Event) {
appendCall("bs")
},
beforeRetry: func() {
appendCall("br")
},
beforeEachEvent: func(_ *beat.Event) {
appendCall("bee")
},
})
wrapped := s.Wrap(job)

_, _ = jobs.ExecJobAndConts(t, wrapped)

close(calls)

// gather order
rcvdOrder := []string{}
for c := range calls {
rcvdOrder = append(rcvdOrder, c)
}

require.Equal(t, tt.expectedOrder, rcvdOrder)
require.Len(t, rcvdOrder, len(tt.expectedOrder))
})
}
}

func TestRetryLightweightMonitorDuration(t *testing.T) {
t.Parallel()

// Monitor setup
tracker := monitorstate.NewTracker(monitorstate.NilStateLoader, false)
sf := stdfields.StdMonitorFields{ID: "testmon", Name: "testmon", Type: "http", MaxAttempts: uint16(2)}

// We simplify these to always down
job := func(event *beat.Event) (j []jobs.Job, retErr error) {

// some platforms don't have enough precision to track immediate monitors time
time.Sleep(100 * time.Millisecond)

event.Fields = mapstr.M{
"monitor": mapstr.M{
"id": "test",
"status": string(monitorstate.StatusDown),
},
}

return nil, fmt.Errorf("dummyerr")
}

var retryStart time.Time

s := NewSummarizer(job, sf, tracker)
// Shorten retry delay to make tests run faster
s.retryDelay = 2 * time.Millisecond
// Add mock plugin
s.plugins = append(s.plugins, &MockPlugin{
beforeRetry: func() {
retryStart = time.Now()
},
eachEvent: func(_ *beat.Event, _ error) {},
beforeSummary: func(_ *beat.Event) {},
beforeEachEvent: func(_ *beat.Event) {},
})
wrapped := s.Wrap(job)

events, _ := jobs.ExecJobAndConts(t, wrapped)

retryElapsed := time.Since(retryStart)
require.False(t, retryStart.IsZero())
var rcvdDuration interface{}
for _, event := range events {
summaryIface, _ := event.GetValue("summary")
summary := summaryIface.(*jobsummary.JobSummary)

if summary.FinalAttempt {
rcvdDuration, _ = event.GetValue("monitor.duration.us")
}
}
require.Greater(t, rcvdDuration, int64(0))
// Ensures monitor duration only takes into account the last attempt execution time
// by comparing it to the time spent after last retry started (retryElapsed)
require.GreaterOrEqual(t, look.RTTMS(retryElapsed), rcvdDuration)
}

type MockPlugin struct {
eachEvent func(e *beat.Event, err error)
beforeSummary func(e *beat.Event)
beforeRetry func()
beforeEachEvent func(e *beat.Event)
}

func (mp *MockPlugin) EachEvent(e *beat.Event, err error) EachEventActions {
mp.eachEvent(e, err)

return 0
}

func (mp *MockPlugin) BeforeSummary(e *beat.Event) BeforeSummaryActions {
mp.beforeSummary(e)

return 0
}

func (mp *MockPlugin) BeforeRetry() {
mp.beforeRetry()
}

func (mp *MockPlugin) BeforeEachEvent(e *beat.Event) {
mp.beforeEachEvent(e)
}

0 comments on commit 6533ef2

Please sign in to comment.