Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Heartbeat] Fix monitor duration wrapper #36900

Merged
merged 12 commits into from
Oct 25, 2023
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ is collected by it.

- Fix panics when parsing dereferencing invalid parsed url. {pull}34702[34702]
- Fix retries to trigger on a down monitor with no previous state. {pull}36842[36842]
- Fix monitor duration calculation with retries. {pull}36900[36900]

*Metricbeat*

Expand Down
4 changes: 4 additions & 0 deletions heartbeat/monitors/wrappers/summarizer/plugdrop.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,7 @@ func (d DropBrowserExtraEvents) BeforeSummary(event *beat.Event) BeforeSummaryAc
func (d DropBrowserExtraEvents) BeforeRetry() {
// noop
}

func (d DropBrowserExtraEvents) BeforeEachEvent(event *beat.Event) {
// noop
}
6 changes: 6 additions & 0 deletions heartbeat/monitors/wrappers/summarizer/plugerr.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ func NewBrowserErrPlugin() *BrowserErrPlugin {
}
}

func (esp *BrowserErrPlugin) BeforeEachEvent(event *beat.Event) {} // noop

func (esp *BrowserErrPlugin) EachEvent(event *beat.Event, eventErr error) EachEventActions {
// track these to determine if the journey
// needs an error injected due to incompleteness
Expand Down Expand Up @@ -127,6 +129,10 @@ func (esp *LightweightErrPlugin) BeforeRetry() {
// noop
}

func (esp *LightweightErrPlugin) BeforeEachEvent(event *beat.Event) {
// noop
}

// errToFieldVal reflects on the error and returns either an *ecserr.ECSErr if possible, and a look.Reason otherwise
func errToFieldVal(eventErr error) (errVal interface{}) {
var asECS *ecserr.ECSErr
Expand Down
15 changes: 11 additions & 4 deletions heartbeat/monitors/wrappers/summarizer/plugmondur.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,26 @@ type LightweightDurationPlugin struct {
}

func (lwdsp *LightweightDurationPlugin) EachEvent(event *beat.Event, _ error) EachEventActions {
// Effectively only runs once, on the first event
return 0 // noop
}

func (lwdsp *LightweightDurationPlugin) BeforeEachEvent(event *beat.Event) {
// Effectively capture on the first event, on the first event
emilioalvap marked this conversation as resolved.
Show resolved Hide resolved
if lwdsp.startedAt == nil {
now := time.Now()
lwdsp.startedAt = &now
}
return 0
}

func (lwdsp *LightweightDurationPlugin) BeforeSummary(event *beat.Event) BeforeSummaryActions {
_, _ = event.PutValue("monitor.duration.us", look.RTTMS(time.Since(*lwdsp.startedAt)))
return 0
}

func (lwdsp *LightweightDurationPlugin) BeforeRetry() {}
func (lwdsp *LightweightDurationPlugin) BeforeRetry() {
// Reset event start time
lwdsp.startedAt = nil
}

// BrowserDurationPlugin handles the logic for writing the `monitor.duration.us` field
// for browser monitors.
Expand Down Expand Up @@ -82,4 +88,5 @@ func (bwdsp *BrowserDurationPlugin) BeforeSummary(event *beat.Event) BeforeSumma
return 0
}

func (bwdsp *BrowserDurationPlugin) BeforeRetry() {}
func (bwdsp *BrowserDurationPlugin) BeforeRetry() {}
func (bwdsp *BrowserDurationPlugin) BeforeEachEvent(event *beat.Event) {} // noop
4 changes: 4 additions & 0 deletions heartbeat/monitors/wrappers/summarizer/plugstatestat.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ func (ssp *BrowserStateStatusPlugin) BeforeRetry() {
ssp.cssp.BeforeRetry()
}

func (ssp *BrowserStateStatusPlugin) BeforeEachEvent(event *beat.Event) {} //noop

// LightweightStateStatusPlugin encapsulates the writing of the primary fields used by the summary,
// those being `state.*`, `status.*` , `event.type`, and `monitor.check_group`
type LightweightStateStatusPlugin struct {
Expand Down Expand Up @@ -113,6 +115,8 @@ func (ssp *LightweightStateStatusPlugin) BeforeRetry() {
ssp.cssp.BeforeRetry()
}

func (ssp *LightweightStateStatusPlugin) BeforeEachEvent(event *beat.Event) {} // noop

type commonSSP struct {
js *jobsummary.JobSummary
stateTracker *monitorstate.Tracker
Expand Down
2 changes: 2 additions & 0 deletions heartbeat/monitors/wrappers/summarizer/plugurl.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,5 @@ func (busp *BrowserURLPlugin) BeforeSummary(event *beat.Event) BeforeSummaryActi
func (busp *BrowserURLPlugin) BeforeRetry() {
busp.urlFields = nil
}

func (busp *BrowserURLPlugin) BeforeEachEvent(event *beat.Event) {} //noop
19 changes: 16 additions & 3 deletions heartbeat/monitors/wrappers/summarizer/summarizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ type Summarizer struct {
startedAt time.Time
}

func (s Summarizer) beforeEachEvent(event *beat.Event) {
for _, plugin := range s.plugins {
plugin.BeforeEachEvent(event)
}
}

// EachEventActions is a set of options using bitmasks to inform execution after the EachEvent callback
type EachEventActions uint8

Expand All @@ -58,6 +64,9 @@ const RetryBeforeSummary = 1
// in one location. Prior to this code was strewn about a bit more and following it was
// a bit trickier.
type SummarizerPlugin interface {
// BeforeEachEvent is called on each event, and allows for the mutation of events
// before monitor execution
BeforeEachEvent(event *beat.Event)
// EachEvent is called on each event, and allows for the mutation of events
EachEvent(event *beat.Event, err error) EachEventActions
// BeforeSummary is run on the final (summary) event for each monitor.
Expand Down Expand Up @@ -106,6 +115,10 @@ func (s *Summarizer) setupPlugins() {
// This adds the state and summary top level fields.
func (s *Summarizer) Wrap(j jobs.Job) jobs.Job {
return func(event *beat.Event) ([]jobs.Job, error) {

// call BeforeEachEvent for each plugin before running job
s.beforeEachEvent(event)

conts, eventErr := j(event)

s.mtx.Lock()
Expand Down Expand Up @@ -145,14 +158,14 @@ func (s *Summarizer) Wrap(j jobs.Job) jobs.Job {
// kibana queries
// 2. If the site error is very short 1s gives it a tiny bit of time to recover
delayedRootJob := func(event *beat.Event) ([]jobs.Job, error) {
time.Sleep(s.retryDelay)
emilioalvap marked this conversation as resolved.
Show resolved Hide resolved
for _, p := range s.plugins {
p.BeforeRetry()
}
time.Sleep(s.retryDelay)
return s.rootJob(event)
return s.Wrap(s.rootJob)(event)
}

conts = []jobs.Job{delayedRootJob}
return []jobs.Job{delayedRootJob}, eventErr
}
}

Expand Down
180 changes: 180 additions & 0 deletions heartbeat/monitors/wrappers/summarizer/summarizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ package summarizer

import (
"fmt"
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/heartbeat/look"
"github.com/elastic/beats/v7/heartbeat/monitors/jobs"
"github.com/elastic/beats/v7/heartbeat/monitors/stdfields"
"github.com/elastic/beats/v7/heartbeat/monitors/wrappers/monitorstate"
Expand Down Expand Up @@ -219,3 +221,181 @@ func TestSummarizer(t *testing.T) {
})
}
}

// Test wrapper plugin hook order. Guaranteed order for plugins to be called upon determines
// what data can be appended to the event at each stage through retries. With this guarantee,
// plugins just need to ascertain that their invariants apply through hook execution order
func TestSummarizerPluginOrder(t *testing.T) {
t.Parallel()

// these tests use strings to describe sequences of events
tests := []struct {
name string
maxAttempts int
// the attempt number of the given event
emilioalvap marked this conversation as resolved.
Show resolved Hide resolved
expectedOrder []string
}{
{
"one attempt",
1,
[]string{"bee", "job", "ee", "bs"},
},
{
"two attempts",
2,
[]string{"bee", "job", "ee", "bs", "br", "bee", "job", "ee", "bs"},
},
}

for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()

// Monitor setup
tracker := monitorstate.NewTracker(monitorstate.NilStateLoader, false)
sf := stdfields.StdMonitorFields{ID: "testmon", Name: "testmon", Type: "http", MaxAttempts: uint16(tt.maxAttempts)}

// Test locals
calls := make(chan string, 100)
mtx := sync.Mutex{}
appendCall := func(event string) {
mtx.Lock()
defer mtx.Unlock()

// Append call to global chan
calls <- event
}

// We simplify these to always down since hook order should not be
// determined by status
job := func(event *beat.Event) (j []jobs.Job, retErr error) {

calls <- "job"

event.Fields = mapstr.M{
"monitor": mapstr.M{
"id": "test",
"status": string(monitorstate.StatusDown),
},
}

return nil, fmt.Errorf("dummyerr")
}

s := NewSummarizer(job, sf, tracker)
// Shorten retry delay to make tests run faster
s.retryDelay = 2 * time.Millisecond
// Add mock plugin
s.plugins = append(s.plugins, &MockPlugin{
eachEvent: func(_ *beat.Event, _ error) {
appendCall("ee")
},
beforeSummary: func(_ *beat.Event) {
appendCall("bs")
},
beforeRetry: func() {
appendCall("br")
},
beforeEachEvent: func(_ *beat.Event) {
appendCall("bee")
},
})
wrapped := s.Wrap(job)

_, _ = jobs.ExecJobAndConts(t, wrapped)

close(calls)

// gather order
rcvdOrder := []string{}
for c := range calls {
rcvdOrder = append(rcvdOrder, c)
}

require.Equal(t, tt.expectedOrder, rcvdOrder)
require.Len(t, rcvdOrder, len(tt.expectedOrder))
})
}
}

func TestRetryLightweightMonitorDuration(t *testing.T) {
t.Parallel()

// Monitor setup
tracker := monitorstate.NewTracker(monitorstate.NilStateLoader, false)
sf := stdfields.StdMonitorFields{ID: "testmon", Name: "testmon", Type: "http", MaxAttempts: uint16(2)}

// We simplify these to always down
job := func(event *beat.Event) (j []jobs.Job, retErr error) {
time.Sleep(1 * time.Second)
vigneshshanmugam marked this conversation as resolved.
Show resolved Hide resolved

event.Fields = mapstr.M{
"monitor": mapstr.M{
"id": "test",
"status": string(monitorstate.StatusDown),
},
}

return nil, fmt.Errorf("dummyerr")
}

var retryStart time.Time

s := NewSummarizer(job, sf, tracker)
// Shorten retry delay to make tests run faster
s.retryDelay = 2 * time.Millisecond
// Add mock plugin
s.plugins = append(s.plugins, &MockPlugin{
beforeRetry: func() {
retryStart = time.Now()
},
eachEvent: func(_ *beat.Event, _ error) {},
beforeSummary: func(_ *beat.Event) {},
beforeEachEvent: func(_ *beat.Event) {},
})
wrapped := s.Wrap(job)

events, _ := jobs.ExecJobAndConts(t, wrapped)

retryElapsed := time.Since(retryStart)
require.False(t, retryStart.IsZero())
var rcvdDuration interface{}
for _, event := range events {
summaryIface, _ := event.GetValue("summary")
summary := summaryIface.(*jobsummary.JobSummary)

if summary.FinalAttempt {
rcvdDuration, _ = event.GetValue("monitor.duration.us")
}
}
require.Greater(t, rcvdDuration, int64(0))
require.GreaterOrEqual(t, look.RTTMS(retryElapsed), rcvdDuration)
emilioalvap marked this conversation as resolved.
Show resolved Hide resolved
}

type MockPlugin struct {
eachEvent func(e *beat.Event, err error)
beforeSummary func(e *beat.Event)
beforeRetry func()
beforeEachEvent func(e *beat.Event)
}

func (mp *MockPlugin) EachEvent(e *beat.Event, err error) EachEventActions {
mp.eachEvent(e, err)

return 0
}

func (mp *MockPlugin) BeforeSummary(e *beat.Event) BeforeSummaryActions {
mp.beforeSummary(e)

return 0
}

func (mp *MockPlugin) BeforeRetry() {
mp.beforeRetry()
}

func (mp *MockPlugin) BeforeEachEvent(e *beat.Event) {
mp.beforeEachEvent(e)
}