From 6533ef2fcc1f53daf83e26370f86e3c280144de5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Emilio=20Alvarez=20Pi=C3=B1eiro?= <95703246+emilioalvap@users.noreply.github.com> Date: Wed, 25 Oct 2023 17:53:16 +0200 Subject: [PATCH] [Heartbeat] Fix monitor duration wrapper (#36900) Fixes #36892. Monitor duration is not being calculated correctly, where start time is initialized after monitor execution and wrapping order is overriding retries event order. (cherry picked from commit 5d6c308da027fd7702e60a8b64cdbe0b40755355) --- CHANGELOG.next.asciidoc | 1 + .../monitors/wrappers/summarizer/plugdrop.go | 4 + .../monitors/wrappers/summarizer/plugerr.go | 6 + .../wrappers/summarizer/plugmondur.go | 15 +- .../wrappers/summarizer/plugstatestat.go | 4 + .../monitors/wrappers/summarizer/plugurl.go | 2 + .../wrappers/summarizer/summarizer.go | 19 +- .../wrappers/summarizer/summarizer_test.go | 183 ++++++++++++++++++ 8 files changed, 227 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index f3b36aa384e1..429edcac70ef 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -115,6 +115,7 @@ is collected by it. - Fix panics when parsing dereferencing invalid parsed url. {pull}34702[34702] - Fix retries to trigger on a down monitor with no previous state. {pull}36842[36842] +- Fix monitor duration calculation with retries. {pull}36900[36900] *Metricbeat* diff --git a/heartbeat/monitors/wrappers/summarizer/plugdrop.go b/heartbeat/monitors/wrappers/summarizer/plugdrop.go index fff6c143bf02..a4ddc61abe7d 100644 --- a/heartbeat/monitors/wrappers/summarizer/plugdrop.go +++ b/heartbeat/monitors/wrappers/summarizer/plugdrop.go @@ -43,3 +43,7 @@ func (d DropBrowserExtraEvents) BeforeSummary(event *beat.Event) BeforeSummaryAc func (d DropBrowserExtraEvents) BeforeRetry() { // noop } + +func (d DropBrowserExtraEvents) BeforeEachEvent(event *beat.Event) { + // noop +} diff --git a/heartbeat/monitors/wrappers/summarizer/plugerr.go b/heartbeat/monitors/wrappers/summarizer/plugerr.go index 1010370f520c..83ab6de4f5ab 100644 --- a/heartbeat/monitors/wrappers/summarizer/plugerr.go +++ b/heartbeat/monitors/wrappers/summarizer/plugerr.go @@ -46,6 +46,8 @@ func NewBrowserErrPlugin() *BrowserErrPlugin { } } +func (esp *BrowserErrPlugin) BeforeEachEvent(event *beat.Event) {} // noop + func (esp *BrowserErrPlugin) EachEvent(event *beat.Event, eventErr error) EachEventActions { // track these to determine if the journey // needs an error injected due to incompleteness @@ -127,6 +129,10 @@ func (esp *LightweightErrPlugin) BeforeRetry() { // noop } +func (esp *LightweightErrPlugin) BeforeEachEvent(event *beat.Event) { + // noop +} + // errToFieldVal reflects on the error and returns either an *ecserr.ECSErr if possible, and a look.Reason otherwise func errToFieldVal(eventErr error) (errVal interface{}) { var asECS *ecserr.ECSErr diff --git a/heartbeat/monitors/wrappers/summarizer/plugmondur.go b/heartbeat/monitors/wrappers/summarizer/plugmondur.go index f677e57693f8..d71cc96ff2c1 100644 --- a/heartbeat/monitors/wrappers/summarizer/plugmondur.go +++ b/heartbeat/monitors/wrappers/summarizer/plugmondur.go @@ -31,12 +31,15 @@ type LightweightDurationPlugin struct { } func (lwdsp *LightweightDurationPlugin) EachEvent(event *beat.Event, _ error) EachEventActions { - // Effectively only runs once, on the first event + return 0 // noop +} + +func (lwdsp *LightweightDurationPlugin) BeforeEachEvent(event *beat.Event) { + // Effectively capture on the first event if lwdsp.startedAt == nil { now := time.Now() lwdsp.startedAt = &now } - return 0 } func (lwdsp *LightweightDurationPlugin) BeforeSummary(event *beat.Event) BeforeSummaryActions { @@ -44,7 +47,10 @@ func (lwdsp *LightweightDurationPlugin) BeforeSummary(event *beat.Event) BeforeS return 0 } -func (lwdsp *LightweightDurationPlugin) BeforeRetry() {} +func (lwdsp *LightweightDurationPlugin) BeforeRetry() { + // Reset event start time + lwdsp.startedAt = nil +} // BrowserDurationPlugin handles the logic for writing the `monitor.duration.us` field // for browser monitors. @@ -82,4 +88,5 @@ func (bwdsp *BrowserDurationPlugin) BeforeSummary(event *beat.Event) BeforeSumma return 0 } -func (bwdsp *BrowserDurationPlugin) BeforeRetry() {} +func (bwdsp *BrowserDurationPlugin) BeforeRetry() {} +func (bwdsp *BrowserDurationPlugin) BeforeEachEvent(event *beat.Event) {} // noop diff --git a/heartbeat/monitors/wrappers/summarizer/plugstatestat.go b/heartbeat/monitors/wrappers/summarizer/plugstatestat.go index 4acfee4dc361..cf7e90af5f30 100644 --- a/heartbeat/monitors/wrappers/summarizer/plugstatestat.go +++ b/heartbeat/monitors/wrappers/summarizer/plugstatestat.go @@ -74,6 +74,8 @@ func (ssp *BrowserStateStatusPlugin) BeforeRetry() { ssp.cssp.BeforeRetry() } +func (ssp *BrowserStateStatusPlugin) BeforeEachEvent(event *beat.Event) {} //noop + // LightweightStateStatusPlugin encapsulates the writing of the primary fields used by the summary, // those being `state.*`, `status.*` , `event.type`, and `monitor.check_group` type LightweightStateStatusPlugin struct { @@ -113,6 +115,8 @@ func (ssp *LightweightStateStatusPlugin) BeforeRetry() { ssp.cssp.BeforeRetry() } +func (ssp *LightweightStateStatusPlugin) BeforeEachEvent(event *beat.Event) {} // noop + type commonSSP struct { js *jobsummary.JobSummary stateTracker *monitorstate.Tracker diff --git a/heartbeat/monitors/wrappers/summarizer/plugurl.go b/heartbeat/monitors/wrappers/summarizer/plugurl.go index dc4394aa42ad..e47463575a31 100644 --- a/heartbeat/monitors/wrappers/summarizer/plugurl.go +++ b/heartbeat/monitors/wrappers/summarizer/plugurl.go @@ -52,3 +52,5 @@ func (busp *BrowserURLPlugin) BeforeSummary(event *beat.Event) BeforeSummaryActi func (busp *BrowserURLPlugin) BeforeRetry() { busp.urlFields = nil } + +func (busp *BrowserURLPlugin) BeforeEachEvent(event *beat.Event) {} //noop diff --git a/heartbeat/monitors/wrappers/summarizer/summarizer.go b/heartbeat/monitors/wrappers/summarizer/summarizer.go index 9c3f1bd8abdf..ad0902d45af7 100644 --- a/heartbeat/monitors/wrappers/summarizer/summarizer.go +++ b/heartbeat/monitors/wrappers/summarizer/summarizer.go @@ -42,6 +42,12 @@ type Summarizer struct { startedAt time.Time } +func (s Summarizer) beforeEachEvent(event *beat.Event) { + for _, plugin := range s.plugins { + plugin.BeforeEachEvent(event) + } +} + // EachEventActions is a set of options using bitmasks to inform execution after the EachEvent callback type EachEventActions uint8 @@ -58,6 +64,9 @@ const RetryBeforeSummary = 1 // in one location. Prior to this code was strewn about a bit more and following it was // a bit trickier. type SummarizerPlugin interface { + // BeforeEachEvent is called on each event, and allows for the mutation of events + // before monitor execution + BeforeEachEvent(event *beat.Event) // EachEvent is called on each event, and allows for the mutation of events EachEvent(event *beat.Event, err error) EachEventActions // BeforeSummary is run on the final (summary) event for each monitor. @@ -106,6 +115,10 @@ func (s *Summarizer) setupPlugins() { // This adds the state and summary top level fields. func (s *Summarizer) Wrap(j jobs.Job) jobs.Job { return func(event *beat.Event) ([]jobs.Job, error) { + + // call BeforeEachEvent for each plugin before running job + s.beforeEachEvent(event) + conts, eventErr := j(event) s.mtx.Lock() @@ -145,14 +158,14 @@ func (s *Summarizer) Wrap(j jobs.Job) jobs.Job { // kibana queries // 2. If the site error is very short 1s gives it a tiny bit of time to recover delayedRootJob := func(event *beat.Event) ([]jobs.Job, error) { + time.Sleep(s.retryDelay) for _, p := range s.plugins { p.BeforeRetry() } - time.Sleep(s.retryDelay) - return s.rootJob(event) + return s.Wrap(s.rootJob)(event) } - conts = []jobs.Job{delayedRootJob} + return []jobs.Job{delayedRootJob}, eventErr } } diff --git a/heartbeat/monitors/wrappers/summarizer/summarizer_test.go b/heartbeat/monitors/wrappers/summarizer/summarizer_test.go index 2a94b3e6f596..e579a649c8ef 100644 --- a/heartbeat/monitors/wrappers/summarizer/summarizer_test.go +++ b/heartbeat/monitors/wrappers/summarizer/summarizer_test.go @@ -19,11 +19,13 @@ package summarizer import ( "fmt" + "sync" "testing" "time" "github.com/stretchr/testify/require" + "github.com/elastic/beats/v7/heartbeat/look" "github.com/elastic/beats/v7/heartbeat/monitors/jobs" "github.com/elastic/beats/v7/heartbeat/monitors/stdfields" "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/monitorstate" @@ -219,3 +221,184 @@ func TestSummarizer(t *testing.T) { }) } } + +// Test wrapper plugin hook order. Guaranteed order for plugins to be called upon determines +// what data can be appended to the event at each stage through retries. With this guarantee, +// plugins just need to ascertain that their invariants apply through hook execution order +func TestSummarizerPluginOrder(t *testing.T) { + t.Parallel() + + // these tests use strings to describe sequences of events + tests := []struct { + name string + maxAttempts int + expectedOrder []string + }{ + { + "one attempt", + 1, + []string{"bee", "job", "ee", "bs"}, + }, + { + "two attempts", + 2, + []string{"bee", "job", "ee", "bs", "br", "bee", "job", "ee", "bs"}, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + // Monitor setup + tracker := monitorstate.NewTracker(monitorstate.NilStateLoader, false) + sf := stdfields.StdMonitorFields{ID: "testmon", Name: "testmon", Type: "http", MaxAttempts: uint16(tt.maxAttempts)} + + // Test locals + calls := make(chan string, 100) + mtx := sync.Mutex{} + appendCall := func(event string) { + mtx.Lock() + defer mtx.Unlock() + + // Append call to global chan + calls <- event + } + + // We simplify these to always down since hook order should not be + // determined by status + job := func(event *beat.Event) (j []jobs.Job, retErr error) { + + calls <- "job" + + event.Fields = mapstr.M{ + "monitor": mapstr.M{ + "id": "test", + "status": string(monitorstate.StatusDown), + }, + } + + return nil, fmt.Errorf("dummyerr") + } + + s := NewSummarizer(job, sf, tracker) + // Shorten retry delay to make tests run faster + s.retryDelay = 2 * time.Millisecond + // Add mock plugin + s.plugins = append(s.plugins, &MockPlugin{ + eachEvent: func(_ *beat.Event, _ error) { + appendCall("ee") + }, + beforeSummary: func(_ *beat.Event) { + appendCall("bs") + }, + beforeRetry: func() { + appendCall("br") + }, + beforeEachEvent: func(_ *beat.Event) { + appendCall("bee") + }, + }) + wrapped := s.Wrap(job) + + _, _ = jobs.ExecJobAndConts(t, wrapped) + + close(calls) + + // gather order + rcvdOrder := []string{} + for c := range calls { + rcvdOrder = append(rcvdOrder, c) + } + + require.Equal(t, tt.expectedOrder, rcvdOrder) + require.Len(t, rcvdOrder, len(tt.expectedOrder)) + }) + } +} + +func TestRetryLightweightMonitorDuration(t *testing.T) { + t.Parallel() + + // Monitor setup + tracker := monitorstate.NewTracker(monitorstate.NilStateLoader, false) + sf := stdfields.StdMonitorFields{ID: "testmon", Name: "testmon", Type: "http", MaxAttempts: uint16(2)} + + // We simplify these to always down + job := func(event *beat.Event) (j []jobs.Job, retErr error) { + + // some platforms don't have enough precision to track immediate monitors time + time.Sleep(100 * time.Millisecond) + + event.Fields = mapstr.M{ + "monitor": mapstr.M{ + "id": "test", + "status": string(monitorstate.StatusDown), + }, + } + + return nil, fmt.Errorf("dummyerr") + } + + var retryStart time.Time + + s := NewSummarizer(job, sf, tracker) + // Shorten retry delay to make tests run faster + s.retryDelay = 2 * time.Millisecond + // Add mock plugin + s.plugins = append(s.plugins, &MockPlugin{ + beforeRetry: func() { + retryStart = time.Now() + }, + eachEvent: func(_ *beat.Event, _ error) {}, + beforeSummary: func(_ *beat.Event) {}, + beforeEachEvent: func(_ *beat.Event) {}, + }) + wrapped := s.Wrap(job) + + events, _ := jobs.ExecJobAndConts(t, wrapped) + + retryElapsed := time.Since(retryStart) + require.False(t, retryStart.IsZero()) + var rcvdDuration interface{} + for _, event := range events { + summaryIface, _ := event.GetValue("summary") + summary := summaryIface.(*jobsummary.JobSummary) + + if summary.FinalAttempt { + rcvdDuration, _ = event.GetValue("monitor.duration.us") + } + } + require.Greater(t, rcvdDuration, int64(0)) + // Ensures monitor duration only takes into account the last attempt execution time + // by comparing it to the time spent after last retry started (retryElapsed) + require.GreaterOrEqual(t, look.RTTMS(retryElapsed), rcvdDuration) +} + +type MockPlugin struct { + eachEvent func(e *beat.Event, err error) + beforeSummary func(e *beat.Event) + beforeRetry func() + beforeEachEvent func(e *beat.Event) +} + +func (mp *MockPlugin) EachEvent(e *beat.Event, err error) EachEventActions { + mp.eachEvent(e, err) + + return 0 +} + +func (mp *MockPlugin) BeforeSummary(e *beat.Event) BeforeSummaryActions { + mp.beforeSummary(e) + + return 0 +} + +func (mp *MockPlugin) BeforeRetry() { + mp.beforeRetry() +} + +func (mp *MockPlugin) BeforeEachEvent(e *beat.Event) { + mp.beforeEachEvent(e) +}