diff --git a/.go-version b/.go-version index 8909929f6e75..95393fc7d4de 100644 --- a/.go-version +++ b/.go-version @@ -1 +1 @@ -1.20.7 +1.20.8 diff --git a/.golangci.yml b/.golangci.yml index 834881d49d72..9c480e4a5253 100755 --- a/.golangci.yml +++ b/.golangci.yml @@ -113,7 +113,7 @@ linters-settings: gosimple: # Select the Go version to target. The default is '1.13'. - go: "1.20.7" + go: "1.20.8" nakedret: # make an issue if func has more lines of code than this setting and it has naked returns; default is 30 @@ -131,19 +131,19 @@ linters-settings: staticcheck: # Select the Go version to target. The default is '1.13'. - go: "1.20.7" + go: "1.20.8" checks: ["all"] stylecheck: # Select the Go version to target. The default is '1.13'. - go: "1.20.7" + go: "1.20.8" # Disabled: # ST1005: error strings should not be capitalized checks: ["all", "-ST1005"] unused: # Select the Go version to target. The default is '1.13'. - go: "1.20.7" + go: "1.20.8" gosec: excludes: diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index c286d73fef6e..19a7f4db7c97 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -3,6 +3,18 @@ :issue: https://github.com/elastic/beats/issues/ :pull: https://github.com/elastic/beats/pull/ +[[release-notes-8.10.1]] +=== Beats version 8.10.1 +https://github.com/elastic/beats/compare/v8.10.0\...v8.10.1[View commits] + +==== Bugfixes + +*Filebeat* + +- Revert error introduced in {pull}35734[35734] when symlinks can't be resolved in filestream. {pull}36557[36557] +- Fix ignoring external input configuration in `take_over: true` mode {issue}36378[36378] {pull}36395[36395] + + [[release-notes-8.10.0]] === Beats version 8.10.0 https://github.com/elastic/beats/compare/v8.9.2\...v8.10.0[View commits] diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 002e0d2243cd..56a4ff583fc6 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -9,10 +9,6 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] ==== Breaking changes *Affecting all Beats* -- Fix status reporting to Elastic-Agent when output configuration is invalid running under Elastic-Agent {pull}35719[35719] -- Upgrade Go to 1.20.7 {pull}36241[36241] -- [Enhanncement for host.ip and host.mac] Disabling netinfo.enabled option of add-host-metadata processor {pull}36506[36506] - Setting environmental variable ELASTIC_NETINFO:false in Elastic Agent pod will disable the netinfo.enabled option of add_host_metadata processor *Auditbeat* @@ -63,7 +59,6 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Support build of projects outside of beats directory {pull}36126[36126] - Add default cgroup regex for add_process_metadata processor {pull}36484[36484] {issue}32961[32961] - Fix environment capture by `add_process_metadata` processor. {issue}36469[36469] {pull}36471[36471] -- Support fattened `data_stream` object when running under Elastic-Agent {pr}36516[36516] *Auditbeat* @@ -106,6 +101,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Added a fix for Crowdstrike pipeline handling process arrays {pull}36496[36496] - Ensure winlog input retains metric collection when handling recoverable errors. {issue}36479[36479] {pull}36483[36483] - Revert error introduced in {pull}35734[35734] when symlinks can't be resolved in filestream. {pull}36557[36557] +- Fix ignoring external input configuration in `take_over: true` mode {issue}36378[36378] {pull}36395[36395] *Heartbeat* @@ -138,6 +134,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] *Packetbeat* +- Prevent panic when more than one interface is configured in fleet. {issue}36574[36574] {pull}36575[36575] *Winlogbeat* @@ -149,6 +146,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] *Affecting all Beats* +- Upgrade Go to 1.20.8 {pull}36597[36597] - Added append Processor which will append concrete values or values from a field to target. {issue}29934[29934] {pull}33364[33364] - When running under Elastic-Agent the status is now reported per Unit instead of the whole Beat {issue}35874[35874] {pull}36183[36183] - Add warning message to SysV init scripts for RPM-based systems that lack `/etc/rc.d/init.d/functions`. {issue}35708[35708] {pull}36188[36188] @@ -156,6 +154,8 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - dns processor: Add support for forward lookups (`A`, `AAAA`, and `TXT`). {issue}11416[11416] {pull}36394[36394] - Mark `syslog` processor as GA, improve docs about how processor handles syslog messages. {issue}36416[36416] {pull}36417[36417] - Add support for AWS external IDs. {issue}36321[36321] {pull}36322[36322] +- [Enhanncement for host.ip and host.mac] Disabling netinfo.enabled option of add-host-metadata processor {pull}36506[36506] + Setting environmental variable ELASTIC_NETINFO:false in Elastic Agent pod will disable the netinfo.enabled option of add_host_metadata processor *Auditbeat* @@ -301,3 +301,6 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] + + + diff --git a/auditbeat/Dockerfile b/auditbeat/Dockerfile index 97199e89a0e2..7addfc95d5ff 100644 --- a/auditbeat/Dockerfile +++ b/auditbeat/Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.20.7 +FROM golang:1.20.8 RUN \ apt-get update \ diff --git a/dev-tools/kubernetes/filebeat/Dockerfile.debug b/dev-tools/kubernetes/filebeat/Dockerfile.debug index 5184138de531..e83f5fa7a57d 100644 --- a/dev-tools/kubernetes/filebeat/Dockerfile.debug +++ b/dev-tools/kubernetes/filebeat/Dockerfile.debug @@ -1,4 +1,4 @@ -FROM golang:1.20.7 as builder +FROM golang:1.20.8 as builder ENV PATH=/usr/bin:/bin:/usr/sbin:/sbin:/usr/local/bin:/go/bin:/usr/local/go/bin diff --git a/dev-tools/kubernetes/heartbeat/Dockerfile.debug b/dev-tools/kubernetes/heartbeat/Dockerfile.debug index 16defa414fdd..54eeeb2109e7 100644 --- a/dev-tools/kubernetes/heartbeat/Dockerfile.debug +++ b/dev-tools/kubernetes/heartbeat/Dockerfile.debug @@ -1,4 +1,4 @@ -FROM golang:1.20.7 as builder +FROM golang:1.20.8 as builder ENV PATH=/usr/bin:/bin:/usr/sbin:/sbin:/usr/local/bin:/go/bin:/usr/local/go/bin diff --git a/dev-tools/kubernetes/metricbeat/Dockerfile.debug b/dev-tools/kubernetes/metricbeat/Dockerfile.debug index 24c588d0f248..e355f7e4f4e0 100644 --- a/dev-tools/kubernetes/metricbeat/Dockerfile.debug +++ b/dev-tools/kubernetes/metricbeat/Dockerfile.debug @@ -1,4 +1,4 @@ -FROM golang:1.20.7 as builder +FROM golang:1.20.8 as builder ENV PATH=/usr/bin:/bin:/usr/sbin:/sbin:/usr/local/bin:/go/bin:/usr/local/go/bin diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index 2fddfd3bb4fb..78084c0fc294 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -502,9 +502,17 @@ func newPipelineLoaderFactory(esConfig *conf.C) fileset.PipelineLoaderFactory { // some of the filestreams might want to take over the loginput state // if their `take_over` flag is set to `true`. func processLogInputTakeOver(stateStore StateStore, config *cfg.Config) error { + inputs, err := fetchInputConfiguration(config) + if err != nil { + return fmt.Errorf("Failed to fetch input configuration when attempting take over: %w", err) + } + if len(inputs) == 0 { + return nil + } + store, err := stateStore.Access() if err != nil { - return fmt.Errorf("Failed to access state for attempting take over: %w", err) + return fmt.Errorf("Failed to access state when attempting take over: %w", err) } defer store.Close() logger := logp.NewLogger("filestream-takeover") @@ -514,5 +522,49 @@ func processLogInputTakeOver(stateStore StateStore, config *cfg.Config) error { backuper := backup.NewRegistryBackuper(logger, registryHome) - return takeover.TakeOverLogInputStates(logger, store, backuper, config) + return takeover.TakeOverLogInputStates(logger, store, backuper, inputs) +} + +// fetches all the defined input configuration available at Filebeat startup including external files. +func fetchInputConfiguration(config *cfg.Config) (inputs []*conf.C, err error) { + if len(config.Inputs) == 0 { + inputs = []*conf.C{} + } else { + inputs = config.Inputs + } + + // reading external input configuration if defined + var dynamicInputCfg cfgfile.DynamicConfig + if config.ConfigInput != nil { + err = config.ConfigInput.Unpack(&dynamicInputCfg) + if err != nil { + return nil, fmt.Errorf("failed to unpack the dynamic input configuration: %w", err) + } + } + if dynamicInputCfg.Path == "" { + return inputs, nil + } + + cfgPaths, err := filepath.Glob(dynamicInputCfg.Path) + if err != nil { + return nil, fmt.Errorf("failed to resolve external input configuration paths: %w", err) + } + + if len(cfgPaths) == 0 { + return inputs, nil + } + + // making a copy so we can safely extend the slice + inputs = make([]*conf.C, len(config.Inputs)) + copy(inputs, config.Inputs) + + for _, p := range cfgPaths { + externalInputs, err := cfgfile.LoadList(p) + if err != nil { + return nil, fmt.Errorf("failed to load external input configuration: %w", err) + } + inputs = append(inputs, externalInputs...) + } + + return inputs, nil } diff --git a/filebeat/beater/filebeat_test.go b/filebeat/beater/filebeat_test.go new file mode 100644 index 000000000000..66a6620bfca5 --- /dev/null +++ b/filebeat/beater/filebeat_test.go @@ -0,0 +1,163 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package beater + +import ( + "os" + "path/filepath" + "testing" + + "github.com/elastic/beats/v7/filebeat/config" + conf "github.com/elastic/elastic-agent-libs/config" + + "github.com/stretchr/testify/require" +) + +type inputEntry struct { + ID string `config:"id"` +} + +func TestFetchInputConfiguration(t *testing.T) { + dir := t.TempDir() + err := os.WriteFile(filepath.Join(dir, "config1.yml"), []byte(` +- type: filestream + id: external-1 + paths: + - "/some" +- type: filestream + id: external-2 + paths: + - "/another" +`), 0777) + require.NoError(t, err) + err = os.WriteFile(filepath.Join(dir, "config2.yml.disabled"), []byte(` +- type: filestream + id: disabled + paths: + - "/some" +`), 0777) + require.NoError(t, err) + + cases := []struct { + name string + configFile string + expected []inputEntry + }{ + { + name: "loads mixed configuration", + configFile: ` +filebeat.config.inputs: + enabled: true + path: ` + dir + `/*.yml +filebeat.inputs: + - type: filestream + id: internal + paths: + - "/another" +output.console: + enabled: true +`, + expected: []inputEntry{ + { + ID: "internal", + }, + { + ID: "external-1", + }, + { + ID: "external-2", + }, + }, + }, + { + name: "loads only internal configuration", + configFile: ` +filebeat.inputs: + - type: filestream + id: internal + paths: + - "/another" +output.console: + enabled: true +`, + expected: []inputEntry{ + { + ID: "internal", + }, + }, + }, + { + name: "loads only external configuration", + configFile: ` +filebeat.config.inputs: + enabled: true + path: ` + dir + `/*.yml +output.console: + enabled: true +`, + expected: []inputEntry{ + { + ID: "external-1", + }, + { + ID: "external-2", + }, + }, + }, + { + name: "loads nothing", + configFile: ` +filebeat.config.inputs: + enabled: true + path: ` + dir + `/*.nothing +output.console: + enabled: true +`, + expected: []inputEntry{}, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + rawConfig, err := conf.NewConfigFrom(tc.configFile) + require.NoError(t, err) + + cfg := struct { + Filebeat config.Config `config:"filebeat"` + }{ + Filebeat: config.DefaultConfig, + } + err = rawConfig.Unpack(&cfg) + require.NoError(t, err) + + inputs, err := fetchInputConfiguration(&cfg.Filebeat) + require.NoError(t, err) + + actual := []inputEntry{} + + for _, i := range inputs { + var entry inputEntry + err := i.Unpack(&entry) + require.NoError(t, err) + actual = append(actual, entry) + } + + require.Equal(t, tc.expected, actual) + }) + } +} diff --git a/filebeat/input/filestream/takeover/takeover.go b/filebeat/input/filestream/takeover/takeover.go index 7f4332efeb42..415ddcb4fdde 100644 --- a/filebeat/input/filestream/takeover/takeover.go +++ b/filebeat/input/filestream/takeover/takeover.go @@ -24,11 +24,11 @@ import ( "strings" "github.com/elastic/beats/v7/filebeat/backup" - cfg "github.com/elastic/beats/v7/filebeat/config" "github.com/elastic/beats/v7/filebeat/input/file" "github.com/elastic/beats/v7/filebeat/input/filestream" "github.com/elastic/beats/v7/libbeat/statestore" "github.com/elastic/beats/v7/libbeat/statestore/backend" + conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" ) @@ -47,8 +47,8 @@ type filestreamMatchers map[string]func(source string) bool // // This mode is created for a smooth loginput->filestream migration experience, so the filestream // inputs would pick up ingesting files from the same point where a loginput stopped. -func TakeOverLogInputStates(log *logp.Logger, store backend.Store, backuper backup.Backuper, cfg *cfg.Config) error { - filestreamMatchers, err := findFilestreams(log, cfg) +func TakeOverLogInputStates(log *logp.Logger, store backend.Store, backuper backup.Backuper, inputsCfg []*conf.C) error { + filestreamMatchers, err := findFilestreams(log, inputsCfg) if err != nil { return fmt.Errorf("failed to read input configuration: %w", err) } @@ -141,10 +141,10 @@ func takeOverStates(log *logp.Logger, store backend.Store, matchers filestreamMa // findFilestreams finds filestream inputs that are marked as `take_over: true` // and creates a file matcher for each such filestream for the future use in state // processing -func findFilestreams(log *logp.Logger, cfg *cfg.Config) (matchers filestreamMatchers, err error) { +func findFilestreams(log *logp.Logger, inputs []*conf.C) (matchers filestreamMatchers, err error) { matchers = make(filestreamMatchers) - for _, input := range cfg.Inputs { + for _, input := range inputs { inputCfg := defaultInputConfig() err := input.Unpack(&inputCfg) if err != nil { diff --git a/filebeat/input/filestream/takeover/takeover_test.go b/filebeat/input/filestream/takeover/takeover_test.go index d2849ae21390..efc0aebd87d1 100644 --- a/filebeat/input/filestream/takeover/takeover_test.go +++ b/filebeat/input/filestream/takeover/takeover_test.go @@ -21,7 +21,6 @@ import ( "testing" "github.com/elastic/beats/v7/filebeat/backup" - cfg "github.com/elastic/beats/v7/filebeat/config" "github.com/elastic/beats/v7/libbeat/statestore/backend" conf "github.com/elastic/elastic-agent-libs/config" @@ -31,62 +30,74 @@ import ( "github.com/stretchr/testify/require" ) +func newInputConfigFrom(t *testing.T, str ...string) []*conf.C { + results := make([]*conf.C, 0, len(str)) + for _, s := range str { + c, err := conf.NewConfigFrom(s) + require.NoError(t, err) + results = append(results, c) + } + return results +} + func TestTakeOverLogInputStates(t *testing.T) { - empty, err := conf.NewConfigFrom(``) - require.NoError(t, err) + empty := newInputConfigFrom(t) - noTakeOver, err := conf.NewConfigFrom(` -inputs: - - type: log - paths: - - "/path/log*.log" - - type: filestream - id: filestream-id-1 - enabled: true - paths: - - "/path/filestream1-*.log" + noTakeOver := newInputConfigFrom(t, + ` +type: log +paths: + - "/path/log*.log" +`, + ` +type: filestream +id: filestream-id-1 +enabled: true +paths: + - "/path/filestream1-*.log" `) - require.NoError(t, err) - - takeOver, err := conf.NewConfigFrom(` -inputs: - - type: filestream - id: filestream-id-1 - enabled: true - paths: - - "/path/filestream1-*.log" - - type: filestream - id: filestream-id-2 - take_over: true - enabled: true - paths: - - "/path/filestream2-*.log" - - "/path/log*.log" # taking over from the log input + takeOver := newInputConfigFrom(t, + ` +type: filestream +id: filestream-id-1 +enabled: true +paths: + - "/path/filestream1-*.log" +`, + ` +type: filestream +id: filestream-id-2 +take_over: true +enabled: true +paths: + - "/path/filestream2-*.log" + - "/path/log*.log" # taking over from the log input `) - require.NoError(t, err) - noUniqueID, err := conf.NewConfigFrom(` -inputs: - - type: filestream - id: filestream-id-2 - take_over: true - enabled: true - paths: - - "/path/filestream2-*.log" - - type: filestream - id: filestream-id-2 # not unique - take_over: true - enabled: true - paths: - - "/path/filestream3-*.log" - - type: filestream - take_over: true # no ID - enabled: true - paths: - - "/path/filestream-*.log" + noUniqueID := newInputConfigFrom(t, ` +type: filestream +id: filestream-id-2 +take_over: true +enabled: true +paths: + - "/path/filestream2-*.log" +`, + ` +type: filestream +id: filestream-id-2 # not unique +take_over: true +enabled: true +paths: + - "/path/filestream3-*.log" +`, + ` +type: filestream +take_over: true # no ID +enabled: true +paths: + - "/path/filestream-*.log" `) - require.NoError(t, err) states := []state{ // this state is to make sure the filestreams without `take_over` remain untouched @@ -162,7 +173,7 @@ inputs: cases := []struct { name string - cfg *conf.C + cfg []*conf.C states []state mustBackup bool mustRemove []string @@ -236,15 +247,11 @@ inputs: for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { - config := cfg.DefaultConfig - err := tc.cfg.Unpack(&config) - require.NoError(t, err) - store := storeMock{ states: tc.states, } backuper := backuperMock{} - err = TakeOverLogInputStates(log, &store, &backuper, &config) + err := TakeOverLogInputStates(log, &store, &backuper, tc.cfg) if tc.expErr != "" { require.Error(t, err) require.Contains(t, err.Error(), tc.expErr) diff --git a/heartbeat/Dockerfile b/heartbeat/Dockerfile index dade83297135..7a56f7219fb0 100644 --- a/heartbeat/Dockerfile +++ b/heartbeat/Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.20.7 +FROM golang:1.20.8 RUN \ apt-get update \ diff --git a/heartbeat/hbtest/hbtestutil.go b/heartbeat/hbtest/hbtestutil.go index 86c1e4a34d20..e73e1efe78fa 100644 --- a/heartbeat/hbtest/hbtestutil.go +++ b/heartbeat/hbtest/hbtestutil.go @@ -40,6 +40,7 @@ import ( "github.com/elastic/beats/v7/heartbeat/ecserr" "github.com/elastic/beats/v7/heartbeat/monitors/active/dialchain/tlsmeta" "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/summarizer/summarizertesthelper" + "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/wraputil" "github.com/elastic/beats/v7/heartbeat/hbtestllext" @@ -49,7 +50,6 @@ import ( "github.com/elastic/go-lookslike/isdef" "github.com/elastic/go-lookslike/validator" - "github.com/elastic/beats/v7/heartbeat/monitors/wrappers" "github.com/elastic/beats/v7/libbeat/common/x509util" ) @@ -172,6 +172,7 @@ func BaseChecks(ip string, status string, typ string) validator.Validator { } return lookslike.Compose( + hbtestllext.MaybeHasEventType, lookslike.MustCompile(map[string]interface{}{ "monitor": map[string]interface{}{ "ip": ipCheck, @@ -223,8 +224,10 @@ func SimpleURLChecks(t *testing.T, scheme string, host string, port uint16) vali // URLChecks returns a validator for the given URL's fields func URLChecks(t *testing.T, u *url.URL) validator.Validator { + t.Helper() + require.NotNil(t, u) return lookslike.MustCompile(map[string]interface{}{ - "url": wrappers.URLFields(u), + "url": wraputil.URLFields(u), }) } diff --git a/heartbeat/hbtestllext/validators.go b/heartbeat/hbtestllext/validators.go index 23a9df5d5cf2..a7c637c27b07 100644 --- a/heartbeat/hbtestllext/validators.go +++ b/heartbeat/hbtestllext/validators.go @@ -19,6 +19,7 @@ package hbtestllext import ( "github.com/elastic/go-lookslike" + "github.com/elastic/go-lookslike/isdef" ) // MonitorTimespanValidator is tests for the `next_run` and `next_run_in.us` keys. @@ -30,3 +31,14 @@ var MonitorTimespanValidator = lookslike.MustCompile(map[string]interface{}{ }, }, }) + +var MaybeHasEventType = lookslike.MustCompile(map[string]interface{}{ + "event": map[string]interface{}{ + "type": isdef.Optional(isdef.IsNonEmptyString), + }, + "synthetics.type": isdef.Optional(isdef.IsNonEmptyString), +}) + +var MaybeHasDuration = lookslike.MustCompile(map[string]interface{}{ + "monitor.duration.us": IsInt64, +}) diff --git a/heartbeat/look/look.go b/heartbeat/look/look.go index 75d23b973a11..39e92b0b6296 100644 --- a/heartbeat/look/look.go +++ b/heartbeat/look/look.go @@ -31,16 +31,21 @@ import ( // RTT formats a round-trip-time given as time.Duration into an // event field. The duration is stored in `{"us": rtt}`. func RTT(rtt time.Duration) mapstr.M { - if rtt < 0 { - rtt = 0 - } - return mapstr.M{ // cast to int64 since a go duration is a nano, but we want micros // This makes the types less confusing because other wise the duration // we get back has the wrong unit - "us": rtt.Microseconds(), + "us": RTTMS(rtt), + } +} + +// RTTMS returns the given time.Duration as an int64 in microseconds, with a value of 0 +// if input is negative. +func RTTMS(rtt time.Duration) int64 { + if rtt < 0 { + return 0 } + return rtt.Microseconds() } // Reason formats an error into an error event field. diff --git a/heartbeat/monitors/active/http/http.go b/heartbeat/monitors/active/http/http.go index acac759f8e01..ad9a9df98c0c 100644 --- a/heartbeat/monitors/active/http/http.go +++ b/heartbeat/monitors/active/http/http.go @@ -23,11 +23,11 @@ import ( "net/url" "github.com/elastic/beats/v7/heartbeat/monitors/plugin" + "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/wraputil" "github.com/elastic/beats/v7/libbeat/version" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/beats/v7/heartbeat/monitors/jobs" - "github.com/elastic/beats/v7/heartbeat/monitors/wrappers" "github.com/elastic/elastic-agent-libs/transport/httpcommon" "github.com/elastic/elastic-agent-libs/transport/tlscommon" "github.com/elastic/elastic-agent-libs/useragent" @@ -116,7 +116,7 @@ func create( // Assign any execution errors to the error field and // assign the url field - js[i] = wrappers.WithURLField(u, job) + js[i] = wraputil.WithURLField(u, job) } return plugin.Plugin{Jobs: js, Endpoints: len(config.Hosts)}, nil diff --git a/heartbeat/monitors/active/http/http_test.go b/heartbeat/monitors/active/http/http_test.go index 78a2f24599c8..20575210ac96 100644 --- a/heartbeat/monitors/active/http/http_test.go +++ b/heartbeat/monitors/active/http/http_test.go @@ -52,6 +52,7 @@ import ( "github.com/elastic/beats/v7/heartbeat/monitors/jobs" "github.com/elastic/beats/v7/heartbeat/monitors/stdfields" "github.com/elastic/beats/v7/heartbeat/monitors/wrappers" + "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/wraputil" "github.com/elastic/beats/v7/heartbeat/scheduler/schedule" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common/file" @@ -110,7 +111,7 @@ func checkServer(t *testing.T, handlerFunc http.HandlerFunc, useUrls bool) (*htt func urlChecks(urlStr string) validator.Validator { u, _ := url.Parse(urlStr) return lookslike.MustCompile(map[string]interface{}{ - "url": wrappers.URLFields(u), + "url": wraputil.URLFields(u), }) } diff --git a/heartbeat/monitors/active/icmp/icmp.go b/heartbeat/monitors/active/icmp/icmp.go index 19831407ba73..5bb3504014ac 100644 --- a/heartbeat/monitors/active/icmp/icmp.go +++ b/heartbeat/monitors/active/icmp/icmp.go @@ -23,6 +23,7 @@ import ( "net/url" "github.com/elastic/beats/v7/heartbeat/monitors/plugin" + "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/wraputil" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/mapstr" @@ -30,7 +31,6 @@ import ( "github.com/elastic/beats/v7/heartbeat/look" "github.com/elastic/beats/v7/heartbeat/monitors" "github.com/elastic/beats/v7/heartbeat/monitors/jobs" - "github.com/elastic/beats/v7/heartbeat/monitors/wrappers" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/elastic-agent-libs/logp" ) @@ -107,7 +107,7 @@ func (jf *jobFactory) makePlugin() (plugin2 plugin.Plugin, err error) { return plugin.Plugin{}, err } - j = append(j, wrappers.WithURLField(u, job)) + j = append(j, wraputil.WithURLField(u, job)) } return plugin.Plugin{Jobs: j, Endpoints: len(jf.config.Hosts)}, nil diff --git a/heartbeat/monitors/active/tcp/tcp.go b/heartbeat/monitors/active/tcp/tcp.go index 5fc3400c30d6..57305203b3aa 100644 --- a/heartbeat/monitors/active/tcp/tcp.go +++ b/heartbeat/monitors/active/tcp/tcp.go @@ -31,7 +31,7 @@ import ( "github.com/elastic/beats/v7/heartbeat/monitors/active/dialchain/tlsmeta" "github.com/elastic/beats/v7/heartbeat/monitors/jobs" "github.com/elastic/beats/v7/heartbeat/monitors/plugin" - "github.com/elastic/beats/v7/heartbeat/monitors/wrappers" + "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/wraputil" "github.com/elastic/beats/v7/heartbeat/reason" "github.com/elastic/beats/v7/libbeat/beat" conf "github.com/elastic/elastic-agent-libs/config" @@ -130,7 +130,7 @@ func (jf *jobFactory) makeJobs() ([]jobs.Job, error) { if err != nil { return nil, err } - jobs = append(jobs, wrappers.WithURLField(url, endpointJob)) + jobs = append(jobs, wraputil.WithURLField(url, endpointJob)) } } @@ -174,7 +174,7 @@ func (jf *jobFactory) makeDirectEndpointJob(endpointURL *url.URL) (jobs.Job, err // makeSocksLookupEndpointJob makes jobs that use a Socks5 proxy to perform DNS lookups func (jf *jobFactory) makeSocksLookupEndpointJob(endpointURL *url.URL) jobs.Job { - return wrappers.WithURLField(endpointURL, + return wraputil.WithURLField(endpointURL, jobs.MakeSimpleJob(func(event *beat.Event) error { hostPort := net.JoinHostPort(endpointURL.Hostname(), endpointURL.Port()) return jf.dial(event, hostPort, endpointURL) diff --git a/heartbeat/monitors/logger/logger.go b/heartbeat/monitors/logger/logger.go index d5018454aa47..734cae862a9b 100644 --- a/heartbeat/monitors/logger/logger.go +++ b/heartbeat/monitors/logger/logger.go @@ -23,6 +23,7 @@ import ( "sync" "time" + "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/summarizer/jobsummary" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/elastic-agent-libs/logp" ) @@ -44,6 +45,7 @@ type MonitorRunInfo struct { Duration int64 `json:"-"` Steps *int `json:"steps,omitempty"` Status string `json:"status"` + Attempt int `json:"attempt"` } func (m *MonitorRunInfo) MarshalJSON() ([]byte, error) { @@ -78,22 +80,33 @@ func extractRunInfo(event *beat.Event) (*MonitorRunInfo, error) { errors := []error{} monitorID, err := event.GetValue("monitor.id") if err != nil { - errors = append(errors, err) + errors = append(errors, fmt.Errorf("could not extract monitor.id: %w", err)) } durationUs, err := event.GetValue("monitor.duration.us") if err != nil { - errors = append(errors, err) + durationUs = int64(0) } monType, err := event.GetValue("monitor.type") if err != nil { - errors = append(errors, err) + errors = append(errors, fmt.Errorf("could not extract monitor.type: %w", err)) } status, err := event.GetValue("monitor.status") if err != nil { - errors = append(errors, err) + errors = append(errors, fmt.Errorf("could not extract monitor.status: %w", err)) + } + + jsIface, err := event.GetValue("summary") + var attempt int + if err != nil { + errors = append(errors, fmt.Errorf("could not extract summary to add attempt info: %w", err)) + } else { + js, ok := jsIface.(*jobsummary.JobSummary) + if ok && js != nil { + attempt = int(js.Attempt) + } } if len(errors) > 0 { @@ -105,6 +118,7 @@ func extractRunInfo(event *beat.Event) (*MonitorRunInfo, error) { Type: monType.(string), Duration: durationUs.(int64), Status: status.(string), + Attempt: attempt, } sc, _ := event.Meta.GetValue(META_STEP_COUNT) @@ -119,7 +133,7 @@ func extractRunInfo(event *beat.Event) (*MonitorRunInfo, error) { func LogRun(event *beat.Event) { monitor, err := extractRunInfo(event) if err != nil { - getLogger().Errorw("error gathering information to log event: ", err) + getLogger().Error(fmt.Errorf("error gathering information to log event: %w", err)) return } diff --git a/heartbeat/monitors/logger/logger_test.go b/heartbeat/monitors/logger/logger_test.go index 0e10f1bd6082..183d19447fc3 100644 --- a/heartbeat/monitors/logger/logger_test.go +++ b/heartbeat/monitors/logger/logger_test.go @@ -28,6 +28,7 @@ import ( "go.uber.org/zap/zaptest/observer" "github.com/elastic/beats/v7/heartbeat/eventext" + "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/summarizer/jobsummary" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" @@ -47,6 +48,7 @@ func TestLogRun(t *testing.T) { "monitor.duration.us": durationUs, "monitor.type": "browser", "monitor.status": "down", + "summary": jobsummary.NewJobSummary(1, 1, "abc"), } event := beat.Event{Fields: fields} @@ -64,6 +66,7 @@ func TestLogRun(t *testing.T) { Duration: durationUs, Status: "down", Steps: &steps, + Attempt: 1, } assert.ElementsMatch(t, []zap.Field{ diff --git a/heartbeat/monitors/mocks.go b/heartbeat/monitors/mocks.go index 0a7227c9986e..f8747a804005 100644 --- a/heartbeat/monitors/mocks.go +++ b/heartbeat/monitors/mocks.go @@ -195,6 +195,7 @@ func baseMockEventMonitorValidator(id string, name string, status string) valida func mockEventMonitorValidator(id string, name string) validator.Validator { return lookslike.Strict(lookslike.Compose( + hbtestllext.MaybeHasEventType, baseMockEventMonitorValidator(id, name, "up"), hbtestllext.MonitorTimespanValidator, hbtest.SummaryStateChecks(1, 0), diff --git a/heartbeat/monitors/util.go b/heartbeat/monitors/util.go index 570af6366e7f..fe45e419af67 100644 --- a/heartbeat/monitors/util.go +++ b/heartbeat/monitors/util.go @@ -26,7 +26,7 @@ import ( "github.com/elastic/beats/v7/heartbeat/eventext" "github.com/elastic/beats/v7/heartbeat/look" "github.com/elastic/beats/v7/heartbeat/monitors/jobs" - "github.com/elastic/beats/v7/heartbeat/monitors/wrappers" + "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/wraputil" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/elastic-agent-libs/mapstr" ) @@ -114,7 +114,7 @@ func MakeByIPJob( "monitor": mapstr.M{"ip": addr.String()}, } - return wrappers.WithFields(fields, pingFactory(addr)), nil + return wraputil.WithFields(fields, pingFactory(addr)), nil } // MakeByHostJob creates a new Job including host lookup. The pingFactory will be used to @@ -165,7 +165,7 @@ func makeByHostAnyIPJob( resolveRTT := resolveEnd.Sub(resolveStart) ipFields := resolveIPEvent(ip.String(), resolveRTT) - return wrappers.WithFields(ipFields, pingFactory(ip))(event) + return wraputil.WithFields(ipFields, pingFactory(ip))(event) } } @@ -206,7 +206,7 @@ func makeByHostAllIPJob( for i, ip := range ips { addr := &net.IPAddr{IP: ip} ipFields := resolveIPEvent(ip.String(), resolveRTT) - cont[i] = wrappers.WithFields(ipFields, pingFactory(addr)) + cont[i] = wraputil.WithFields(ipFields, pingFactory(addr)) } // Ideally we would test this invocation. This function however is really hard to to test given all the extra context it takes in // In a future refactor we could perhaps test that this in correctly invoked. diff --git a/heartbeat/monitors/wrappers/summarizer/jobsummary/jobsummary.go b/heartbeat/monitors/wrappers/summarizer/jobsummary/jobsummary.go new file mode 100644 index 000000000000..9264f33f0fac --- /dev/null +++ b/heartbeat/monitors/wrappers/summarizer/jobsummary/jobsummary.go @@ -0,0 +1,57 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package jobsummary + +import ( + "fmt" + + "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/monitorstate" +) + +// JobSummary is the struct that is serialized in the `summary` field in the emitted event. +type JobSummary struct { + Attempt uint16 `json:"attempt"` + MaxAttempts uint16 `json:"max_attempts"` + FinalAttempt bool `json:"final_attempt"` + Up uint16 `json:"up"` + Down uint16 `json:"down"` + Status monitorstate.StateStatus `json:"status"` + RetryGroup string `json:"retry_group"` +} + +func NewJobSummary(attempt uint16, maxAttempts uint16, retryGroup string) *JobSummary { + if maxAttempts < 1 { + maxAttempts = 1 + } + + return &JobSummary{ + MaxAttempts: maxAttempts, + Attempt: attempt, + RetryGroup: retryGroup, + } +} + +// BumpAttempt swaps the JobSummary object's pointer for a new job summary +// that is a clone of the current one but with the Attempt field incremented. +func (js *JobSummary) BumpAttempt() { + *js = *NewJobSummary(js.Attempt+1, js.MaxAttempts, js.RetryGroup) +} + +func (js *JobSummary) String() string { + return fmt.Sprintf("", js.Status, js.Attempt, js.MaxAttempts, js.FinalAttempt, js.Up, js.Down, js.RetryGroup) +} diff --git a/heartbeat/monitors/wrappers/summarizer/plugdrop.go b/heartbeat/monitors/wrappers/summarizer/plugdrop.go new file mode 100644 index 000000000000..fff6c143bf02 --- /dev/null +++ b/heartbeat/monitors/wrappers/summarizer/plugdrop.go @@ -0,0 +1,45 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package summarizer + +import ( + "github.com/elastic/beats/v7/heartbeat/eventext" + "github.com/elastic/beats/v7/libbeat/beat" +) + +type DropBrowserExtraEvents struct{} + +func (d DropBrowserExtraEvents) EachEvent(event *beat.Event, _ error) EachEventActions { + st := synthType(event) + // Sending these events can break the kibana UI in various places + // see: https://github.com/elastic/kibana/issues/166530 + if st == "cmd/status" { + eventext.CancelEvent(event) + } + + return 0 +} + +func (d DropBrowserExtraEvents) BeforeSummary(event *beat.Event) BeforeSummaryActions { + // noop + return 0 +} + +func (d DropBrowserExtraEvents) BeforeRetry() { + // noop +} diff --git a/heartbeat/monitors/wrappers/summarizer/plugerr.go b/heartbeat/monitors/wrappers/summarizer/plugerr.go new file mode 100644 index 000000000000..1010370f520c --- /dev/null +++ b/heartbeat/monitors/wrappers/summarizer/plugerr.go @@ -0,0 +1,145 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package summarizer + +import ( + "errors" + "fmt" + + "github.com/elastic/beats/v7/heartbeat/ecserr" + "github.com/elastic/beats/v7/heartbeat/eventext" + "github.com/elastic/beats/v7/heartbeat/look" + "github.com/elastic/beats/v7/heartbeat/monitors/logger" + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/elastic-agent-libs/mapstr" +) + +// BrowserErrPlugins handles the logic for writing the `error` field +// for browser monitors, preferentially using the journey/end event's +// error field for errors. +type BrowserErrPlugin struct { + summaryErrVal interface{} + summaryErr error + stepCount int + journeyEndRcvd bool + attempt int +} + +func NewBrowserErrPlugin() *BrowserErrPlugin { + return &BrowserErrPlugin{ + attempt: 1, + } +} + +func (esp *BrowserErrPlugin) EachEvent(event *beat.Event, eventErr error) EachEventActions { + // track these to determine if the journey + // needs an error injected due to incompleteness + st := synthType(event) + switch st { + case "step/end": + esp.stepCount++ + // track step count for error logging + // this is a bit of an awkward spot and combination of concerns, but it makes sense + eventext.SetMeta(event, logger.META_STEP_COUNT, esp.stepCount) + case "journey/end": + esp.journeyEndRcvd = true + } + + // Nothing else to do if there's no error + if eventErr == nil { + return 0 + } + + // Merge the error value into the event's "error" field + errVal := errToFieldVal(eventErr) + mergeErrVal(event, errVal) + + // If there is no error value OR this is the journey end event + // record this as the definitive error + if esp.summaryErrVal == nil || st == "journey/end" { + esp.summaryErr = eventErr + esp.summaryErrVal = errVal + } + + return DropErrEvent +} + +func (esp *BrowserErrPlugin) BeforeSummary(event *beat.Event) BeforeSummaryActions { + // If no journey end was received, make that the summary error + if !esp.journeyEndRcvd { + esp.summaryErr = fmt.Errorf("journey did not finish executing, %d steps ran (attempt: %d): %w", esp.stepCount, esp.attempt, esp.summaryErr) + esp.summaryErrVal = errToFieldVal(esp.summaryErr) + } + + if esp.summaryErrVal != nil { + mergeErrVal(event, esp.summaryErrVal) + } + + return 0 +} + +func (esp *BrowserErrPlugin) BeforeRetry() { + attempt := esp.attempt + 1 + *esp = *NewBrowserErrPlugin() + esp.attempt = attempt +} + +// LightweightErrPlugin simply takes error return values +// and maps them into the "error" field in the event, return nil +// for all events thereafter +type LightweightErrPlugin struct{} + +func NewLightweightErrPlugin() *LightweightErrPlugin { + return &LightweightErrPlugin{} +} + +func (esp *LightweightErrPlugin) EachEvent(event *beat.Event, eventErr error) EachEventActions { + if eventErr == nil { + return 0 + } + + errVal := errToFieldVal(eventErr) + mergeErrVal(event, errVal) + + return DropErrEvent +} + +func (esp *LightweightErrPlugin) BeforeSummary(event *beat.Event) BeforeSummaryActions { + return 0 +} + +func (esp *LightweightErrPlugin) BeforeRetry() { + // noop +} + +// errToFieldVal reflects on the error and returns either an *ecserr.ECSErr if possible, and a look.Reason otherwise +func errToFieldVal(eventErr error) (errVal interface{}) { + var asECS *ecserr.ECSErr + if errors.As(eventErr, &asECS) { + // Override the message of the error in the event it was wrapped + asECS.Message = eventErr.Error() + errVal = asECS + } else { + errVal = look.Reason(eventErr) + } + return errVal +} + +func mergeErrVal(event *beat.Event, errVal interface{}) { + eventext.MergeEventFields(event, mapstr.M{"error": errVal}) +} diff --git a/heartbeat/monitors/wrappers/summarizer/plugmondur.go b/heartbeat/monitors/wrappers/summarizer/plugmondur.go new file mode 100644 index 000000000000..f677e57693f8 --- /dev/null +++ b/heartbeat/monitors/wrappers/summarizer/plugmondur.go @@ -0,0 +1,85 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package summarizer + +import ( + "time" + + "github.com/elastic/beats/v7/heartbeat/look" + "github.com/elastic/beats/v7/libbeat/beat" +) + +// LightweightDurationPlugin handles the logic for writing the `monitor.duration.us` field +// for lightweight monitors. +type LightweightDurationPlugin struct { + startedAt *time.Time +} + +func (lwdsp *LightweightDurationPlugin) EachEvent(event *beat.Event, _ error) EachEventActions { + // Effectively only runs once, on the first event + if lwdsp.startedAt == nil { + now := time.Now() + lwdsp.startedAt = &now + } + return 0 +} + +func (lwdsp *LightweightDurationPlugin) BeforeSummary(event *beat.Event) BeforeSummaryActions { + _, _ = event.PutValue("monitor.duration.us", look.RTTMS(time.Since(*lwdsp.startedAt))) + return 0 +} + +func (lwdsp *LightweightDurationPlugin) BeforeRetry() {} + +// BrowserDurationPlugin handles the logic for writing the `monitor.duration.us` field +// for browser monitors. +type BrowserDurationPlugin struct { + startedAt *time.Time + endedAt *time.Time +} + +func (bwdsp *BrowserDurationPlugin) EachEvent(event *beat.Event, _ error) EachEventActions { + switch synthType(event) { + case "journey/start": + bwdsp.startedAt = &event.Timestamp + case "journey/end": + bwdsp.endedAt = &event.Timestamp + } + + return 0 +} + +func (bwdsp *BrowserDurationPlugin) BeforeSummary(event *beat.Event) BeforeSummaryActions { + // If we never even ran a journey, it's a zero duration + if bwdsp.startedAt == nil { + return 0 + } + + // if we never received an end event, just use the current time + if bwdsp.endedAt == nil { + now := time.Now() + bwdsp.endedAt = &now + } + + durUS := look.RTTMS(bwdsp.endedAt.Sub(*bwdsp.startedAt)) + _, _ = event.PutValue("monitor.duration.us", durUS) + + return 0 +} + +func (bwdsp *BrowserDurationPlugin) BeforeRetry() {} diff --git a/heartbeat/monitors/wrappers/summarizer/plugstatestat.go b/heartbeat/monitors/wrappers/summarizer/plugstatestat.go new file mode 100644 index 000000000000..f38c22d32abd --- /dev/null +++ b/heartbeat/monitors/wrappers/summarizer/plugstatestat.go @@ -0,0 +1,182 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package summarizer + +import ( + "fmt" + + "github.com/gofrs/uuid" + + "github.com/elastic/beats/v7/heartbeat/eventext" + "github.com/elastic/beats/v7/heartbeat/look" + "github.com/elastic/beats/v7/heartbeat/monitors/stdfields" + "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/monitorstate" + "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/summarizer/jobsummary" + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/mapstr" +) + +// StateStatusPlugin encapsulates the writing of the primary fields used by the summary, +// those being `state.*`, `status.*` , `event.type`, and `monitor.check_group` +type BrowserStateStatusPlugin struct { + cssp *commonSSP +} + +func NewBrowserStateStatusplugin(stateTracker *monitorstate.Tracker, sf stdfields.StdMonitorFields) *BrowserStateStatusPlugin { + return &BrowserStateStatusPlugin{ + cssp: newCommonSSP(stateTracker, sf), + } +} + +func (ssp *BrowserStateStatusPlugin) EachEvent(event *beat.Event, jobErr error) EachEventActions { + if jobErr != nil { + // Browser jobs only return either a single up or down + // any err will mark it as a down job + ssp.cssp.js.Down = 1 + } + ssp.cssp.BeforeEach(event, jobErr) + + return 0 +} + +func (ssp *BrowserStateStatusPlugin) BeforeSummary(event *beat.Event) BeforeSummaryActions { + if ssp.cssp.js.Down == 0 { + // Browsers don't have a prior increment of this, so set it to some + // non-zero value + ssp.cssp.js.Up = 1 + } + + res := ssp.cssp.BeforeSummary(event) + + _, _ = event.PutValue("monitor.status", string(ssp.cssp.js.Status)) + return res +} + +func (ssp *BrowserStateStatusPlugin) BeforeRetry() { + // noop +} + +// LightweightStateStatusPlugin encapsulates the writing of the primary fields used by the summary, +// those being `state.*`, `status.*` , `event.type`, and `monitor.check_group` +type LightweightStateStatusPlugin struct { + cssp *commonSSP +} + +func NewLightweightStateStatusPlugin(stateTracker *monitorstate.Tracker, sf stdfields.StdMonitorFields) *LightweightStateStatusPlugin { + return &LightweightStateStatusPlugin{ + cssp: newCommonSSP(stateTracker, sf), + } +} + +func (ssp *LightweightStateStatusPlugin) EachEvent(event *beat.Event, jobErr error) EachEventActions { + status := look.Status(jobErr) + _, _ = event.PutValue("monitor.status", status) + if !eventext.IsEventCancelled(event) { // if this event contains a status... + mss := monitorstate.StateStatus(status) + + if mss == monitorstate.StatusUp { + ssp.cssp.js.Up++ + } else { + ssp.cssp.js.Down++ + } + + } + + ssp.cssp.BeforeEach(event, jobErr) + + return 0 +} + +func (ssp *LightweightStateStatusPlugin) BeforeSummary(event *beat.Event) BeforeSummaryActions { + return ssp.cssp.BeforeSummary(event) +} + +func (ssp *LightweightStateStatusPlugin) BeforeRetry() { + // noop +} + +type commonSSP struct { + js *jobsummary.JobSummary + stateTracker *monitorstate.Tracker + sf stdfields.StdMonitorFields + checkGroup string +} + +func newCommonSSP(stateTracker *monitorstate.Tracker, sf stdfields.StdMonitorFields) *commonSSP { + uu, err := uuid.NewV1() + if err != nil { + logp.L().Errorf("could not create v1 UUID for retry group: %s", err) + } + js := jobsummary.NewJobSummary(1, sf.MaxAttempts, uu.String()) + return &commonSSP{ + js: js, + stateTracker: stateTracker, + sf: sf, + checkGroup: uu.String(), + } +} + +func (ssp *commonSSP) BeforeEach(event *beat.Event, err error) { + _, _ = event.PutValue("monitor.check_group", fmt.Sprintf("%s-%d", ssp.checkGroup, ssp.js.Attempt)) +} + +func (ssp *commonSSP) BeforeSummary(event *beat.Event) BeforeSummaryActions { + if ssp.js.Down > 0 { + ssp.js.Status = monitorstate.StatusDown + } else { + ssp.js.Status = monitorstate.StatusUp + } + + // Get the last status of this monitor, we use this later to + // determine if a retry is needed + lastStatus := ssp.stateTracker.GetCurrentStatus(ssp.sf) + + // FinalAttempt is true if no retries will occur + retry := ssp.js.Status == monitorstate.StatusDown && ssp.js.Attempt < ssp.js.MaxAttempts + ssp.js.FinalAttempt = !retry + + ms := ssp.stateTracker.RecordStatus(ssp.sf, ssp.js.Status, ssp.js.FinalAttempt) + + // dereference the pointer since the pointer is pointed at the next step + // after this + jsCopy := *ssp.js + + fields := mapstr.M{ + "event": mapstr.M{"type": "heartbeat/summary"}, + "summary": &jsCopy, + "state": ms, + } + if ssp.sf.Type == "browser" { + fields["synthetics"] = mapstr.M{"type": "heartbeat/summary"} + } + eventext.MergeEventFields(event, fields) + + if retry { + // mutate the js into the state for the next attempt + ssp.js.BumpAttempt() + } + + logp.L().Debugf("attempt info: %v == %v && %d < %d", ssp.js.Status, lastStatus, ssp.js.Attempt, ssp.js.MaxAttempts) + + if retry { + return RetryBeforeSummary + } + + return 0 +} diff --git a/heartbeat/monitors/wrappers/summarizer/plugurl.go b/heartbeat/monitors/wrappers/summarizer/plugurl.go new file mode 100644 index 000000000000..dc4394aa42ad --- /dev/null +++ b/heartbeat/monitors/wrappers/summarizer/plugurl.go @@ -0,0 +1,54 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package summarizer + +import ( + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/mapstr" +) + +// BrowserURLPlugin handles the logic for writing the error.* fields +type BrowserURLPlugin struct { + urlFields mapstr.M +} + +func (busp *BrowserURLPlugin) EachEvent(event *beat.Event, eventErr error) EachEventActions { + if len(busp.urlFields) == 0 { + if urlFields, err := event.GetValue("url"); err == nil { + if ufMap, ok := urlFields.(mapstr.M); ok { + busp.urlFields = ufMap + } + } + } + return 0 +} + +func (busp *BrowserURLPlugin) BeforeSummary(event *beat.Event) BeforeSummaryActions { + if busp.urlFields != nil { + _, err := event.PutValue("url", busp.urlFields) + if err != nil { + logp.L().Errorf("could not set URL value for browser job: %s", err) + } + } + return 0 +} + +func (busp *BrowserURLPlugin) BeforeRetry() { + busp.urlFields = nil +} diff --git a/heartbeat/monitors/wrappers/summarizer/summarizer.go b/heartbeat/monitors/wrappers/summarizer/summarizer.go index 49d3ca9422ad..9c3f1bd8abdf 100644 --- a/heartbeat/monitors/wrappers/summarizer/summarizer.go +++ b/heartbeat/monitors/wrappers/summarizer/summarizer.go @@ -18,69 +18,86 @@ package summarizer import ( - "fmt" "sync" "time" - "github.com/gofrs/uuid" - - "github.com/elastic/beats/v7/heartbeat/eventext" "github.com/elastic/beats/v7/heartbeat/monitors/jobs" + "github.com/elastic/beats/v7/heartbeat/monitors/logger" "github.com/elastic/beats/v7/heartbeat/monitors/stdfields" "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/monitorstate" "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/elastic-agent-libs/logp" - "github.com/elastic/elastic-agent-libs/mapstr" ) +// Summarizer produces summary events (with summary.* and other asssociated fields). +// It accumulates state as it processes the whole event field in order to produce +// this summary. type Summarizer struct { rootJob jobs.Job contsRemaining uint16 mtx *sync.Mutex - jobSummary *JobSummary - checkGroup string - stateTracker *monitorstate.Tracker sf stdfields.StdMonitorFields + mst *monitorstate.Tracker retryDelay time.Duration + plugins []SummarizerPlugin + startedAt time.Time } -type JobSummary struct { - Attempt uint16 `json:"attempt"` - MaxAttempts uint16 `json:"max_attempts"` - FinalAttempt bool `json:"final_attempt"` - Up uint16 `json:"up"` - Down uint16 `json:"down"` - Status monitorstate.StateStatus `json:"status"` - RetryGroup string `json:"retry_group"` +// EachEventActions is a set of options using bitmasks to inform execution after the EachEvent callback +type EachEventActions uint8 + +// DropErrEvent if will remove the error from the job return. +const DropErrEvent = 1 + +// BeforeSummaryActions is a set of options using bitmasks to inform execution after the BeforeSummary callback +type BeforeSummaryActions uint8 + +// RetryBeforeSummary will retry the job once complete. +const RetryBeforeSummary = 1 + +// SummarizerPlugin encapsulates functionality for the Summarizer that's easily expressed +// in one location. Prior to this code was strewn about a bit more and following it was +// a bit trickier. +type SummarizerPlugin interface { + // EachEvent is called on each event, and allows for the mutation of events + EachEvent(event *beat.Event, err error) EachEventActions + // BeforeSummary is run on the final (summary) event for each monitor. + BeforeSummary(event *beat.Event) BeforeSummaryActions + // BeforeRetry is called before the first EachEvent in the event of a retry + // can be used for resetting state between retries + BeforeRetry() } func NewSummarizer(rootJob jobs.Job, sf stdfields.StdMonitorFields, mst *monitorstate.Tracker) *Summarizer { - uu, err := uuid.NewV1() - if err != nil { - logp.L().Errorf("could not create v1 UUID for retry group: %s", err) - } - return &Summarizer{ + s := &Summarizer{ rootJob: rootJob, contsRemaining: 1, mtx: &sync.Mutex{}, - jobSummary: NewJobSummary(1, sf.MaxAttempts, uu.String()), - checkGroup: uu.String(), - stateTracker: mst, + mst: mst, sf: sf, - // private property, but can be overridden in tests to speed them up - retryDelay: time.Second, + retryDelay: time.Second, + startedAt: time.Now(), } + s.setupPlugins() + return s } -func NewJobSummary(attempt uint16, maxAttempts uint16, retryGroup string) *JobSummary { - if maxAttempts < 1 { - maxAttempts = 1 - } - - return &JobSummary{ - MaxAttempts: maxAttempts, - Attempt: attempt, - RetryGroup: retryGroup, +func (s *Summarizer) setupPlugins() { + // ssp must appear before Err plugin since + // it intercepts errors + if s.sf.Type == "browser" { + s.plugins = []SummarizerPlugin{ + DropBrowserExtraEvents{}, + &BrowserDurationPlugin{}, + &BrowserURLPlugin{}, + NewBrowserStateStatusplugin(s.mst, s.sf), + NewBrowserErrPlugin(), + } + } else { + s.plugins = []SummarizerPlugin{ + &LightweightDurationPlugin{}, + NewLightweightStateStatusPlugin(s.mst, s.sf), + NewLightweightErrPlugin(), + } } } @@ -89,56 +106,37 @@ func NewJobSummary(attempt uint16, maxAttempts uint16, retryGroup string) *JobSu // This adds the state and summary top level fields. func (s *Summarizer) Wrap(j jobs.Job) jobs.Job { return func(event *beat.Event) ([]jobs.Job, error) { - conts, jobErr := j(event) - - _, _ = event.PutValue("monitor.check_group", fmt.Sprintf("%s-%d", s.checkGroup, s.jobSummary.Attempt)) + conts, eventErr := j(event) s.mtx.Lock() defer s.mtx.Unlock() - js := s.jobSummary - s.contsRemaining-- // we just ran one cont, discount it // these many still need to be processed s.contsRemaining += uint16(len(conts)) - monitorStatus, err := event.GetValue("monitor.status") - if err == nil && !eventext.IsEventCancelled(event) { // if this event contains a status... - mss := monitorstate.StateStatus(monitorStatus.(string)) - - if mss == monitorstate.StatusUp { - js.Up++ - } else { - js.Down++ + for _, plugin := range s.plugins { + actions := plugin.EachEvent(event, eventErr) + if actions&DropErrEvent != 0 { + eventErr = nil } } if s.contsRemaining == 0 { - if js.Down > 0 { - js.Status = monitorstate.StatusDown - } else { - js.Status = monitorstate.StatusUp - } - - // Get the last status of this monitor, we use this later to - // determine if a retry is needed - lastStatus := s.stateTracker.GetCurrentStatus(s.sf) - - // FinalAttempt is true if no retries will occur - js.FinalAttempt = js.Status != monitorstate.StatusDown || js.Attempt >= js.MaxAttempts + var retry bool + for _, plugin := range s.plugins { + actions := plugin.BeforeSummary(event) + if actions&RetryBeforeSummary != 0 { + retry = true + } - ms := s.stateTracker.RecordStatus(s.sf, js.Status, js.FinalAttempt) - - eventext.MergeEventFields(event, mapstr.M{ - "summary": js, - "state": ms, - }) + } - logp.L().Debugf("attempt info: %v == %v && %d < %d", js.Status, lastStatus, js.Attempt, js.MaxAttempts) - if !js.FinalAttempt { - // Reset the job summary for the next attempt - // We preserve `s` across attempts - s.jobSummary = NewJobSummary(js.Attempt+1, js.MaxAttempts, js.RetryGroup) + if !retry { + // on final run emits a metric for the service when summary events are complete + logger.LogRun(event) + } else { + // Bump the job summary for the next attempt s.contsRemaining = 1 // Delay retries by 1s for two reasons: @@ -146,12 +144,14 @@ func (s *Summarizer) Wrap(j jobs.Job) jobs.Job { // that it's hard to tell the sequence in which jobs executed apart in our // kibana queries // 2. If the site error is very short 1s gives it a tiny bit of time to recover - delayedRootJob := jobs.Wrap(s.rootJob, func(j jobs.Job) jobs.Job { - return func(event *beat.Event) ([]jobs.Job, error) { - time.Sleep(s.retryDelay) - return j(event) + delayedRootJob := func(event *beat.Event) ([]jobs.Job, error) { + for _, p := range s.plugins { + p.BeforeRetry() } - }) + time.Sleep(s.retryDelay) + return s.rootJob(event) + } + conts = []jobs.Job{delayedRootJob} } } @@ -162,6 +162,6 @@ func (s *Summarizer) Wrap(j jobs.Job) jobs.Job { conts[i] = s.Wrap(cont) } - return conts, jobErr + return conts, eventErr } } diff --git a/heartbeat/monitors/wrappers/summarizer/summarizer_test.go b/heartbeat/monitors/wrappers/summarizer/summarizer_test.go index de86cd7b49a7..64472eb1c9ad 100644 --- a/heartbeat/monitors/wrappers/summarizer/summarizer_test.go +++ b/heartbeat/monitors/wrappers/summarizer/summarizer_test.go @@ -27,6 +27,7 @@ import ( "github.com/elastic/beats/v7/heartbeat/monitors/jobs" "github.com/elastic/beats/v7/heartbeat/monitors/stdfields" "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/monitorstate" + "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/summarizer/jobsummary" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/elastic-agent-libs/mapstr" ) @@ -41,6 +42,7 @@ func TestSummarizer(t *testing.T) { } } + testURL := "https://example.net" // these tests use strings to describe sequences of events tests := []struct { name string @@ -51,7 +53,9 @@ func TestSummarizer(t *testing.T) { // The expected states on each event expectedStates string // the attempt number of the given event - expectedAttempts string + expectedAttempts string + expectedSummaries int + url string }{ { "start down, transition to up", @@ -59,6 +63,8 @@ func TestSummarizer(t *testing.T) { "du", "du", "12", + 2, + testURL, }, { "start up, stay up", @@ -66,6 +72,8 @@ func TestSummarizer(t *testing.T) { "uuuuuuuu", "uuuuuuuu", "11111111", + 8, + testURL, }, { "start down, stay down", @@ -73,6 +81,8 @@ func TestSummarizer(t *testing.T) { "dddddddd", "dddddddd", "12121212", + 8, + testURL, }, { "start up - go down with one retry - thenrecover", @@ -80,6 +90,8 @@ func TestSummarizer(t *testing.T) { "udddduuu", "uuddduuu", "11212111", + 8, + testURL, }, { "start up, transient down, recover", @@ -87,6 +99,8 @@ func TestSummarizer(t *testing.T) { "uuuduuuu", "uuuuuuuu", "11112111", + 8, + testURL, }, { "start up, multiple transient down, recover", @@ -94,6 +108,8 @@ func TestSummarizer(t *testing.T) { "uuudududu", "uuuuuuuuu", "111121212", + 9, + testURL, }, { "no retries, single down", @@ -101,6 +117,8 @@ func TestSummarizer(t *testing.T) { "uuuduuuu", "uuuduuuu", "11111111", + 8, + testURL, }, } @@ -130,13 +148,15 @@ func TestSummarizer(t *testing.T) { } tracker := monitorstate.NewTracker(monitorstate.NilStateLoader, false) - sf := stdfields.StdMonitorFields{ID: "testmon", Name: "testmon", MaxAttempts: uint16(tt.maxAttempts)} + sf := stdfields.StdMonitorFields{ID: "testmon", Name: "testmon", Type: "http", MaxAttempts: uint16(tt.maxAttempts)} rcvdStatuses := "" rcvdStates := "" rcvdAttempts := "" + rcvdEvents := []*beat.Event{} + rcvdSummaries := []*jobsummary.JobSummary{} i := 0 - var lastSummary *JobSummary + var lastSummary *jobsummary.JobSummary for { s := NewSummarizer(job, sf, tracker) // Shorten retry delay to make tests run faster @@ -144,6 +164,7 @@ func TestSummarizer(t *testing.T) { wrapped := s.Wrap(job) events, _ := jobs.ExecJobAndConts(t, wrapped) for _, event := range events { + rcvdEvents = append(rcvdEvents, event) eventStatus, _ := event.GetValue("monitor.status") eventStatusStr := eventStatus.(string) rcvdStatuses += eventStatusStr[:1] @@ -154,9 +175,25 @@ func TestSummarizer(t *testing.T) { rcvdStates += "_" } summaryIface, _ := event.GetValue("summary") - summary := summaryIface.(*JobSummary) + summary := summaryIface.(*jobsummary.JobSummary) + duration, _ := event.GetValue("monitor.duration.us") + + // Ensure that only summaries have a duration + if summary != nil { + rcvdSummaries = append(rcvdSummaries, summary) + require.GreaterOrEqual(t, duration, int64(0)) + // down summaries should always have errors + if eventStatusStr == "down" { + require.NotNil(t, event.Fields["error"]) + } else { + require.Nil(t, event.Fields["error"]) + } + } else { + require.Nil(t, duration) + } if summary == nil { + // note missing summaries rcvdAttempts += "!" } else if lastSummary != nil { if summary.Attempt > 1 { @@ -165,6 +202,7 @@ func TestSummarizer(t *testing.T) { require.NotEqual(t, lastSummary.RetryGroup, summary.RetryGroup) } } + rcvdAttempts += fmt.Sprintf("%d", summary.Attempt) lastSummary = summary } @@ -176,6 +214,8 @@ func TestSummarizer(t *testing.T) { require.Equal(t, tt.statusSequence, rcvdStatuses) require.Equal(t, tt.expectedStates, rcvdStates) require.Equal(t, tt.expectedAttempts, rcvdAttempts) + require.Len(t, rcvdEvents, len(tt.statusSequence)) + require.Len(t, rcvdSummaries, tt.expectedSummaries) }) } } diff --git a/heartbeat/monitors/wrappers/summarizer/summarizertesthelper/testhelper.go b/heartbeat/monitors/wrappers/summarizer/summarizertesthelper/testhelper.go index def27bde0b0e..bcea2bd803ed 100644 --- a/heartbeat/monitors/wrappers/summarizer/summarizertesthelper/testhelper.go +++ b/heartbeat/monitors/wrappers/summarizer/summarizertesthelper/testhelper.go @@ -24,7 +24,8 @@ package summarizertesthelper import ( "fmt" - "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/summarizer" + "github.com/elastic/beats/v7/heartbeat/hbtestllext" + "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/summarizer/jobsummary" "github.com/elastic/go-lookslike" "github.com/elastic/go-lookslike/isdef" "github.com/elastic/go-lookslike/llpath" @@ -36,15 +37,16 @@ import ( // It could be refactored out, but it just isn't worth it. func SummaryValidator(up uint16, down uint16) validator.Validator { return lookslike.MustCompile(map[string]interface{}{ - "summary": summaryIsdef(up, down), + "summary": summaryIsdef(up, down), + "monitor.duration.us": hbtestllext.IsInt64, }) } func summaryIsdef(up uint16, down uint16) isdef.IsDef { return isdef.Is("summary", func(path llpath.Path, v interface{}) *llresult.Results { - js, ok := v.(summarizer.JobSummary) + js, ok := v.(jobsummary.JobSummary) if !ok { - return llresult.SimpleResult(path, false, fmt.Sprintf("expected a *JobSummary, got %v", v)) + return llresult.SimpleResult(path, false, fmt.Sprintf("expected a *jobsummary.JobSummary, got %v", v)) } if js.Up != up || js.Down != down { diff --git a/heartbeat/monitors/wrappers/summarizer/util.go b/heartbeat/monitors/wrappers/summarizer/util.go new file mode 100644 index 000000000000..1fd76ffaeee0 --- /dev/null +++ b/heartbeat/monitors/wrappers/summarizer/util.go @@ -0,0 +1,33 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package summarizer + +import "github.com/elastic/beats/v7/libbeat/beat" + +func synthType(event *beat.Event) string { + synthType, err := event.GetValue("synthetics.type") + if err != nil { + return "" + } + + str, ok := synthType.(string) + if !ok { + return "" + } + return str +} diff --git a/heartbeat/monitors/wrappers/wrappers.go b/heartbeat/monitors/wrappers/wrappers.go index 233effa0acec..411634cac771 100644 --- a/heartbeat/monitors/wrappers/wrappers.go +++ b/heartbeat/monitors/wrappers/wrappers.go @@ -18,7 +18,6 @@ package wrappers import ( - "errors" "fmt" "time" @@ -27,11 +26,8 @@ import ( "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" - "github.com/elastic/beats/v7/heartbeat/ecserr" "github.com/elastic/beats/v7/heartbeat/eventext" - "github.com/elastic/beats/v7/heartbeat/look" "github.com/elastic/beats/v7/heartbeat/monitors/jobs" - "github.com/elastic/beats/v7/heartbeat/monitors/logger" "github.com/elastic/beats/v7/heartbeat/monitors/stdfields" "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/monitorstate" "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/summarizer" @@ -67,10 +63,6 @@ func WrapLightweight(js []jobs.Job, stdMonFields stdfields.StdMonitorFields, mst addMonitorTimespan(stdMonFields), addServiceName(stdMonFields), addMonitorMeta(stdMonFields, len(js) > 1), - addMonitorStatus(nil), - addMonitorErr, - addMonitorDuration, - logMonitorRun(nil), ) } @@ -83,9 +75,6 @@ func WrapBrowser(js []jobs.Job, stdMonFields stdfields.StdMonitorFields, mst *mo addMonitorTimespan(stdMonFields), addServiceName(stdMonFields), addMonitorMeta(stdMonFields, false), - addMonitorStatus(byEventType("heartbeat/summary")), - addMonitorErr, - logMonitorRun(byEventType("heartbeat/summary")), ) } @@ -173,94 +162,3 @@ func timespan(started time.Time, sched *schedule.Schedule, timeout time.Duration "lt": maxEnd, } } - -// addMonitorStatus wraps the given Job's execution such that any error returned -// by the original Job will be set as a field. The original error will not be -// passed through as a return value. Errors may still be present but only if there -// is an actual error wrapping the error. -func addMonitorStatus(match EventMatcher) jobs.JobWrapper { - return func(origJob jobs.Job) jobs.Job { - return func(event *beat.Event) ([]jobs.Job, error) { - cont, err := origJob(event) - - if match == nil || match(event) { - eventext.MergeEventFields(event, mapstr.M{ - "monitor": mapstr.M{ - "status": look.Status(err), - }, - }) - } - - return cont, err - } - } -} - -func addMonitorErr(origJob jobs.Job) jobs.Job { - return func(event *beat.Event) ([]jobs.Job, error) { - cont, err := origJob(event) - - if err != nil { - var errVal interface{} - var asECS *ecserr.ECSErr - if errors.As(err, &asECS) { - // Override the message of the error in the event it was wrapped - asECS.Message = err.Error() - errVal = asECS - } else { - errVal = look.Reason(err) - } - eventext.MergeEventFields(event, mapstr.M{"error": errVal}) - } - - return cont, nil - } -} - -// addMonitorDuration adds duration correctly for all non-browser jobs -func addMonitorDuration(job jobs.Job) jobs.Job { - return func(event *beat.Event) ([]jobs.Job, error) { - start := time.Now() - cont, err := job(event) - duration := time.Since(start) - - if event != nil { - eventext.MergeEventFields(event, mapstr.M{ - "monitor": mapstr.M{ - "duration": look.RTT(duration), - }, - }) - event.Timestamp = start - } - - return cont, err - } -} - -// logMonitorRun emits a metric for the service when summary events are complete. -func logMonitorRun(match EventMatcher) jobs.JobWrapper { - return func(job jobs.Job) jobs.Job { - return func(event *beat.Event) ([]jobs.Job, error) { - cont, err := job(event) - - if match == nil || match(event) { - logger.LogRun(event) - } - - return cont, err - } - } -} - -func byEventType(t string) func(event *beat.Event) bool { - return func(event *beat.Event) bool { - eventType, err := event.Fields.GetValue("event.type") - if err != nil { - return false - } - - return eventType == t - } -} - -type EventMatcher func(event *beat.Event) bool diff --git a/heartbeat/monitors/wrappers/wrappers_test.go b/heartbeat/monitors/wrappers/wrappers_test.go index 4ebc653d8fc8..ffdb161e62f0 100644 --- a/heartbeat/monitors/wrappers/wrappers_test.go +++ b/heartbeat/monitors/wrappers/wrappers_test.go @@ -44,8 +44,9 @@ import ( "github.com/elastic/beats/v7/heartbeat/monitors/logger" "github.com/elastic/beats/v7/heartbeat/monitors/stdfields" "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/monitorstate" - "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/summarizer" + "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/summarizer/jobsummary" "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/summarizer/summarizertesthelper" + "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/wraputil" "github.com/elastic/beats/v7/heartbeat/scheduler/schedule" "github.com/elastic/beats/v7/libbeat/beat" ) @@ -91,8 +92,7 @@ func testCommonWrap(t *testing.T, tt testDef) { for idx, r := range results { t.Run(fmt.Sprintf("result at index %d", idx), func(t *testing.T) { - want := tt.want[idx] - testslike.Test(t, lookslike.Strict(want), r.Fields) + _ = tt.want[idx] if tt.metaWant != nil { metaWant := tt.metaWant[idx] @@ -127,6 +127,7 @@ func TestSimpleJob(t *testing.T) { }, }), hbtestllext.MonitorTimespanValidator, + hbtestllext.MaybeHasEventType, stateValidator(), summarizertesthelper.SummaryValidator(1, 0), )}, @@ -143,6 +144,7 @@ func TestSimpleJob(t *testing.T) { Type: testMonFields.Type, Duration: durationUs.(int64), Status: "up", + Attempt: 1, } require.ElementsMatch(t, []zap.Field{ logp.Any("event", map[string]string{"action": logger.ActionMonitorRun}), @@ -204,6 +206,7 @@ func TestAdditionalStdFields(t *testing.T) { "check_group": isdef.IsString, }, }), + hbtestllext.MaybeHasEventType, stateValidator(), hbtestllext.MonitorTimespanValidator, summarizertesthelper.SummaryValidator(1, 0), @@ -223,6 +226,7 @@ func TestErrorJob(t *testing.T) { errorJobValidator := lookslike.Compose( stateValidator(), + hbtestllext.MaybeHasEventType, lookslike.MustCompile(map[string]interface{}{"error": map[string]interface{}{"message": "myerror", "type": "io"}}), lookslike.MustCompile(map[string]interface{}{ "monitor": map[string]interface{}{ @@ -268,6 +272,7 @@ func TestMultiJobNoConts(t *testing.T) { }, }), stateValidator(), + hbtestllext.MaybeHasEventType, hbtestllext.MonitorTimespanValidator, summarizertesthelper.SummaryValidator(1, 0), ) @@ -291,11 +296,11 @@ func TestMultiJobConts(t *testing.T) { eventext.MergeEventFields(event, mapstr.M{"cont": "1st"}) u, err := url.Parse(u) require.NoError(t, err) - eventext.MergeEventFields(event, mapstr.M{"url": URLFields(u)}) + eventext.MergeEventFields(event, mapstr.M{"url": wraputil.URLFields(u)}) return []jobs.Job{ func(event *beat.Event) ([]jobs.Job, error) { eventext.MergeEventFields(event, mapstr.M{"cont": "2nd"}) - eventext.MergeEventFields(event, mapstr.M{"url": URLFields(u)}) + eventext.MergeEventFields(event, mapstr.M{"url": wraputil.URLFields(u)}) return nil, nil }, }, nil @@ -306,9 +311,10 @@ func TestMultiJobConts(t *testing.T) { return lookslike.Compose( urlValidator(t, u), lookslike.MustCompile(map[string]interface{}{"cont": msg}), + hbtestllext.MaybeHasEventType, lookslike.MustCompile(map[string]interface{}{ "monitor": map[string]interface{}{ - "duration.us": hbtestllext.IsInt64, + "duration.us": isdef.Optional(hbtestllext.IsInt64), "id": uniqScope.IsUniqueTo(u), "name": testMonFields.Name, "type": testMonFields.Type, @@ -350,12 +356,12 @@ func TestRetryMultiCont(t *testing.T) { expected := []struct { monStatus string - js summarizer.JobSummary + js jobsummary.JobSummary state monitorstate.State }{ { "down", - summarizer.JobSummary{ + jobsummary.JobSummary{ Status: "down", FinalAttempt: true, // we expect two up since this is a lightweight @@ -375,7 +381,7 @@ func TestRetryMultiCont(t *testing.T) { }, { "down", - summarizer.JobSummary{ + jobsummary.JobSummary{ Status: "down", FinalAttempt: true, Up: 0, @@ -400,12 +406,12 @@ func TestRetryMultiCont(t *testing.T) { eventext.MergeEventFields(event, mapstr.M{"cont": "1st"}) u, err := url.Parse(u) require.NoError(t, err) - eventext.MergeEventFields(event, mapstr.M{"url": URLFields(u)}) + eventext.MergeEventFields(event, mapstr.M{"url": wraputil.URLFields(u)}) return []jobs.Job{ func(event *beat.Event) ([]jobs.Job, error) { eventext.MergeEventFields(event, mapstr.M{"cont": "2nd"}) - eventext.MergeEventFields(event, mapstr.M{"url": URLFields(u)}) + eventext.MergeEventFields(event, mapstr.M{"url": wraputil.URLFields(u)}) expIdx++ if expIdx >= len(expected)-1 { @@ -425,6 +431,7 @@ func TestRetryMultiCont(t *testing.T) { contJobValidator := func(u string, msg string) validator.Validator { return lookslike.Compose( urlValidator(t, u), + hbtestllext.MaybeHasEventType, lookslike.MustCompile(map[string]interface{}{"cont": msg}), lookslike.MustCompile(map[string]interface{}{ "error": map[string]interface{}{ @@ -432,7 +439,6 @@ func TestRetryMultiCont(t *testing.T) { "type": isdef.IsString, }, "monitor": map[string]interface{}{ - "duration.us": hbtestllext.IsInt64, "id": uniqScope.IsUniqueTo(u), "name": testMonFields.Name, "type": testMonFields.Type, @@ -458,11 +464,13 @@ func TestRetryMultiCont(t *testing.T) { lookslike.Compose( contJobValidator("http://foo.com", "2nd"), summarizertesthelper.SummaryValidator(expected.js.Up, expected.js.Down), + hbtestllext.MaybeHasDuration, ), contJobValidator("http://foo.com", "1st"), lookslike.Compose( contJobValidator("http://foo.com", "2nd"), summarizertesthelper.SummaryValidator(expected.js.Up, expected.js.Down), + hbtestllext.MaybeHasDuration, ), }, nil, @@ -480,11 +488,11 @@ func TestMultiJobContsCancelledEvents(t *testing.T) { eventext.CancelEvent(event) u, err := url.Parse(u) require.NoError(t, err) - eventext.MergeEventFields(event, mapstr.M{"url": URLFields(u)}) + eventext.MergeEventFields(event, mapstr.M{"url": wraputil.URLFields(u)}) return []jobs.Job{ func(event *beat.Event) ([]jobs.Job, error) { eventext.MergeEventFields(event, mapstr.M{"cont": "2nd"}) - eventext.MergeEventFields(event, mapstr.M{"url": URLFields(u)}) + eventext.MergeEventFields(event, mapstr.M{"url": wraputil.URLFields(u)}) return nil, nil }, }, nil @@ -494,10 +502,10 @@ func TestMultiJobContsCancelledEvents(t *testing.T) { contJobValidator := func(u string, msg string) validator.Validator { return lookslike.Compose( urlValidator(t, u), + hbtestllext.MaybeHasEventType, lookslike.MustCompile(map[string]interface{}{"cont": msg}), lookslike.MustCompile(map[string]interface{}{ "monitor": map[string]interface{}{ - "duration.us": hbtestllext.IsInt64, "id": uniqScope.IsUniqueTo(u), "name": testMonFields.Name, "type": testMonFields.Type, @@ -522,6 +530,7 @@ func TestMultiJobContsCancelledEvents(t *testing.T) { lookslike.Compose( contJobValidator("http://foo.com", "2nd"), summarizertesthelper.SummaryValidator(1, 0), + hbtestllext.MaybeHasDuration, ), lookslike.Compose( contJobValidator("http://bar.com", "1st"), @@ -529,6 +538,7 @@ func TestMultiJobContsCancelledEvents(t *testing.T) { lookslike.Compose( contJobValidator("http://bar.com", "2nd"), summarizertesthelper.SummaryValidator(1, 0), + hbtestllext.MaybeHasDuration, ), }, []validator.Validator{ @@ -545,7 +555,7 @@ func makeURLJob(t *testing.T, u string) jobs.Job { parsed, err := url.Parse(u) require.NoError(t, err) return func(event *beat.Event) (i []jobs.Job, e error) { - eventext.MergeEventFields(event, mapstr.M{"url": URLFields(parsed)}) + eventext.MergeEventFields(event, mapstr.M{"url": wraputil.URLFields(parsed)}) return nil, nil } } @@ -553,7 +563,7 @@ func makeURLJob(t *testing.T, u string) jobs.Job { func urlValidator(t *testing.T, u string) validator.Validator { parsed, err := url.Parse(u) require.NoError(t, err) - return lookslike.MustCompile(map[string]interface{}{"url": map[string]interface{}(URLFields(parsed))}) + return lookslike.MustCompile(map[string]interface{}{"url": map[string]interface{}(wraputil.URLFields(parsed))}) } func stateValidator() validator.Validator { @@ -621,7 +631,7 @@ func makeInlineBrowserJob(t *testing.T, u string) jobs.Job { require.NoError(t, err) return func(event *beat.Event) (i []jobs.Job, e error) { eventext.MergeEventFields(event, mapstr.M{ - "url": URLFields(parsed), + "url": wraputil.URLFields(parsed), "monitor": mapstr.M{ "type": "browser", "status": "up", @@ -642,6 +652,7 @@ func TestInlineBrowserJob(t *testing.T) { []validator.Validator{ lookslike.Strict( lookslike.Compose( + hbtestllext.MaybeHasEventType, urlValidator(t, "http://foo.com"), lookslike.MustCompile(map[string]interface{}{ "state": isdef.Optional(hbtestllext.IsMonitorState), @@ -673,16 +684,16 @@ var projectMonitorValues = BrowserMonitor{ func makeProjectBrowserJob(t *testing.T, u string, summary bool, projectErr error, bm BrowserMonitor) jobs.Job { parsed, err := url.Parse(u) require.NoError(t, err) + return func(event *beat.Event) (i []jobs.Job, e error) { eventext.SetMeta(event, logger.META_STEP_COUNT, 2) eventext.MergeEventFields(event, mapstr.M{ - "url": URLFields(parsed), + "url": wraputil.URLFields(parsed), "monitor": mapstr.M{ - "type": "browser", - "id": bm.id, - "name": bm.name, - "status": "up", - "duration": mapstr.M{"us": bm.durationMs}, + "type": "browser", + "id": bm.id, + "name": bm.name, + "status": "up", }, }) if summary { @@ -707,10 +718,12 @@ var browserLogValidator = func(monId string, expectedDurationUs int64, stepCount Duration: expectedDurationUs, Status: status, Steps: &stepCount, + Attempt: 1, } + actionE := logp.Any("event", map[string]string{"action": logger.ActionMonitorRun}) + monE := logp.Any("monitor", &expectedMonitor) require.ElementsMatch(t, []zap.Field{ - logp.Any("event", map[string]string{"action": logger.ActionMonitorRun}), - logp.Any("monitor", &expectedMonitor), + actionE, monE, }, observed[0].Context) } } @@ -724,13 +737,13 @@ func TestProjectBrowserJob(t *testing.T) { urlU, _ := url.Parse(urlStr) expectedMonFields := lookslike.Compose( + hbtestllext.MaybeHasDuration, lookslike.MustCompile(map[string]interface{}{ "state": isdef.Optional(hbtestllext.IsMonitorState), "monitor": map[string]interface{}{ "type": "browser", "id": projectMonitorValues.id, "name": projectMonitorValues.name, - "duration": mapstr.M{"us": time.Second.Microseconds()}, "origin": "my-origin", "check_group": isdef.IsString, "timespan": mapstr.M{ @@ -739,7 +752,7 @@ func TestProjectBrowserJob(t *testing.T) { }, "status": isdef.IsString, }, - "url": URLFields(urlU), + "url": wraputil.URLFields(urlU), }), ) @@ -750,6 +763,7 @@ func TestProjectBrowserJob(t *testing.T) { []validator.Validator{ lookslike.Strict( lookslike.Compose( + hbtestllext.MaybeHasEventType, summarizertesthelper.SummaryValidator(1, 0), urlValidator(t, urlStr), expectedMonFields, @@ -766,6 +780,7 @@ func TestProjectBrowserJob(t *testing.T) { lookslike.Compose( urlValidator(t, urlStr), expectedMonFields, + hbtestllext.MaybeHasEventType, summarizertesthelper.SummaryValidator(1, 0), lookslike.MustCompile(map[string]interface{}{ "monitor": map[string]interface{}{"status": "up"}, @@ -775,7 +790,8 @@ func TestProjectBrowserJob(t *testing.T) { }), ))}, nil, - browserLogValidator(projectMonitorValues.id, time.Second.Microseconds(), 2, "up"), + // Duration is zero here, see summarizer test for actual test of this + browserLogValidator(projectMonitorValues.id, 0, 2, "up"), }) testCommonWrap(t, testDef{ "with down summary", @@ -786,6 +802,7 @@ func TestProjectBrowserJob(t *testing.T) { lookslike.Compose( urlValidator(t, urlStr), expectedMonFields, + hbtestllext.MaybeHasEventType, summarizertesthelper.SummaryValidator(0, 1), lookslike.MustCompile(map[string]interface{}{ "monitor": map[string]interface{}{"status": "down"}, @@ -799,7 +816,7 @@ func TestProjectBrowserJob(t *testing.T) { }), ))}, nil, - browserLogValidator(projectMonitorValues.id, time.Second.Microseconds(), 2, "down"), + browserLogValidator(projectMonitorValues.id, 0, 2, "down"), }) } @@ -810,17 +827,17 @@ func TestECSErrors(t *testing.T) { "on non-summary event": false, } - ecse := ecserr.NewBadCmdStatusErr(123, "mycommand") - wrappedECSErr := fmt.Errorf("wrapped: %w", ecse) - expectedECSErr := ecserr.NewECSErr( - ecse.Type, - ecse.Code, - wrappedECSErr.Error(), - ) - for name, makeSummaryEvent := range testCases { t.Run(name, func(t *testing.T) { - j := WrapCommon([]jobs.Job{makeProjectBrowserJob(t, "http://example.net", makeSummaryEvent, wrappedECSErr, projectMonitorValues)}, testBrowserMonFields, nil) + ecse := ecserr.NewBadCmdStatusErr(123, "mycommand") + wrappedECSErr := fmt.Errorf("journey did not finish executing, 0 steps ran (attempt: 1): %w", ecse) + expectedECSErr := ecserr.NewECSErr( + ecse.Type, + ecse.Code, + wrappedECSErr.Error(), + ) + + j := WrapCommon([]jobs.Job{makeProjectBrowserJob(t, "http://example.net", makeSummaryEvent, ecse, projectMonitorValues)}, testBrowserMonFields, nil) event := &beat.Event{} _, err := j[0](event) require.NoError(t, err) diff --git a/heartbeat/monitors/wrappers/util.go b/heartbeat/monitors/wrappers/wraputil/util.go similarity index 99% rename from heartbeat/monitors/wrappers/util.go rename to heartbeat/monitors/wrappers/wraputil/util.go index 831ea19bb74f..fcdb1e52e423 100644 --- a/heartbeat/monitors/wrappers/util.go +++ b/heartbeat/monitors/wrappers/wraputil/util.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package wrappers +package wraputil import ( "net/url" diff --git a/heartbeat/monitors/wrappers/util_test.go b/heartbeat/monitors/wrappers/wraputil/util_test.go similarity index 99% rename from heartbeat/monitors/wrappers/util_test.go rename to heartbeat/monitors/wrappers/wraputil/util_test.go index 022fb57f5f89..0c1672b2b870 100644 --- a/heartbeat/monitors/wrappers/util_test.go +++ b/heartbeat/monitors/wrappers/wraputil/util_test.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package wrappers +package wraputil import ( "net/url" diff --git a/heartbeat/tracer/tracer_test.go b/heartbeat/tracer/tracer_test.go index 87953d5de5ae..45d0a4125e79 100644 --- a/heartbeat/tracer/tracer_test.go +++ b/heartbeat/tracer/tracer_test.go @@ -85,7 +85,9 @@ func TestSockTracerWaitFail(t *testing.T) { started := time.Now() _, err := NewSockTracer(filepath.Join(os.TempDir(), "garbagenonsegarbagenooonseeense"), waitFor) require.Error(t, err) - require.GreaterOrEqual(t, time.Now(), started.Add(waitFor)) + // Compare unix millis because things get a little weird with nanos + // with errors like: "2023-09-08 02:27:46.939107458 +0000 UTC m=+1.002235710" is not greater than or equal to "2023-09-08 02:27:46.939868055 +0000 UTC m=+1.001015793" + require.GreaterOrEqual(t, time.Now().UnixMilli(), started.Add(waitFor).UnixMilli()) } func TestSockTracerWaitSuccess(t *testing.T) { diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index 6a0c6f5f10e5..fde397dbdab1 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -665,7 +665,7 @@ func (b *Beat) Setup(settings Settings, bt beat.Creator, setup SetupSettings) er return fmt.Errorf("error unpacking ILM config: %w", err) } if ilmCfg.Ilm.Enabled() && esClient.IsServerless() { - fmt.Println("WARNING: ILM is not supported under serverless") + fmt.Println("WARNING: ILM is not supported in Serverless projects") } loadTemplate, loadILM := idxmgmt.LoadModeUnset, idxmgmt.LoadModeUnset diff --git a/libbeat/docs/release.asciidoc b/libbeat/docs/release.asciidoc index 22c3ffb2621b..cca199441ae5 100644 --- a/libbeat/docs/release.asciidoc +++ b/libbeat/docs/release.asciidoc @@ -8,6 +8,7 @@ This section summarizes the changes in each release. Also read <> for more detail about changes that affect upgrade. +* <> * <> * <> * <> diff --git a/libbeat/docs/version.asciidoc b/libbeat/docs/version.asciidoc index c40537b07041..4b7fcd5eaa4c 100644 --- a/libbeat/docs/version.asciidoc +++ b/libbeat/docs/version.asciidoc @@ -1,6 +1,6 @@ :stack-version: 8.11.0 :doc-branch: main -:go-version: 1.20.7 +:go-version: 1.20.8 :release-state: unreleased :python: 3.7 :docker: 1.12 diff --git a/metricbeat/Dockerfile b/metricbeat/Dockerfile index b3450fb8c24a..58b0691291ba 100644 --- a/metricbeat/Dockerfile +++ b/metricbeat/Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.20.7 +FROM golang:1.20.8 RUN \ apt update \ diff --git a/metricbeat/module/elasticsearch/node_stats/data.go b/metricbeat/module/elasticsearch/node_stats/data.go index 14b03d504e46..1c0e00f9991a 100644 --- a/metricbeat/module/elasticsearch/node_stats/data.go +++ b/metricbeat/module/elasticsearch/node_stats/data.go @@ -368,6 +368,12 @@ func eventsMapping(r mb.ReporterV2, m elasticsearch.MetricSetAPI, info elasticse continue } + if transportAddress, hasTransportAddress := node["transport_address"]; hasTransportAddress { + if transportAddress, ok := transportAddress.(string); ok { + event.Host = transportAddress + } + } + roles := node["roles"] event.ModuleFields = mapstr.M{ diff --git a/metricbeat/module/http/_meta/Dockerfile b/metricbeat/module/http/_meta/Dockerfile index 4bcd674a2470..33da87702cd8 100644 --- a/metricbeat/module/http/_meta/Dockerfile +++ b/metricbeat/module/http/_meta/Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.20.7 +FROM golang:1.20.8 COPY test/main.go main.go diff --git a/metricbeat/module/nats/_meta/Dockerfile b/metricbeat/module/nats/_meta/Dockerfile index 85751dda114e..a1628a0388b1 100644 --- a/metricbeat/module/nats/_meta/Dockerfile +++ b/metricbeat/module/nats/_meta/Dockerfile @@ -2,7 +2,7 @@ ARG NATS_VERSION=2.0.4 FROM nats:$NATS_VERSION # build stage -FROM golang:1.20.7 AS build-env +FROM golang:1.20.8 AS build-env RUN apt-get install git mercurial gcc RUN git clone https://github.com/nats-io/nats.go.git /nats-go RUN cd /nats-go/examples/nats-bench && git checkout tags/v1.10.0 && go build . diff --git a/metricbeat/module/vsphere/_meta/Dockerfile b/metricbeat/module/vsphere/_meta/Dockerfile index f54e001b936b..ef2119e2bcdf 100644 --- a/metricbeat/module/vsphere/_meta/Dockerfile +++ b/metricbeat/module/vsphere/_meta/Dockerfile @@ -1,5 +1,5 @@ ARG VSPHERE_GOLANG_VERSION -FROM golang:1.20.7 +FROM golang:1.20.8 RUN apt-get install curl git RUN go install github.com/vmware/govmomi/vcsim@v0.30.4 diff --git a/packetbeat/Dockerfile b/packetbeat/Dockerfile index 9a522cbc2302..41b7dd5a2fc4 100644 --- a/packetbeat/Dockerfile +++ b/packetbeat/Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.20.7 +FROM golang:1.20.8 RUN \ apt-get update \ diff --git a/packetbeat/protos/tcp/tcp.go b/packetbeat/protos/tcp/tcp.go index 874f7ddba8ae..57a3c64c4812 100644 --- a/packetbeat/protos/tcp/tcp.go +++ b/packetbeat/protos/tcp/tcp.go @@ -60,7 +60,7 @@ type TCP struct { } // Creates and returns a new Tcp. -func NewTCP(p protos.Protocols, id, device string) (*TCP, error) { +func NewTCP(p protos.Protocols, id, device string, idx int) (*TCP, error) { isDebug = logp.IsDebug("tcp") portMap, err := buildPortsMap(p.GetAllTCP()) @@ -71,7 +71,7 @@ func NewTCP(p protos.Protocols, id, device string) (*TCP, error) { tcp := &TCP{ protocols: p, portMap: portMap, - metrics: newInputMetrics(id, device, portMap), + metrics: newInputMetrics(fmt.Sprintf("%s_%d", id, idx), device, portMap), } tcp.streams = common.NewCacheWithRemovalListener( protos.DefaultTransactionExpiration, diff --git a/packetbeat/protos/tcp/tcp_test.go b/packetbeat/protos/tcp/tcp_test.go index 6cf782cb3ff4..3be35b758e5b 100644 --- a/packetbeat/protos/tcp/tcp_test.go +++ b/packetbeat/protos/tcp/tcp_test.go @@ -305,7 +305,7 @@ func TestTCSeqPayload(t *testing.T) { parse: makeCollectPayload(&state, true), }, }, - }, "test", "test") + }, "test", "test", 0) if err != nil { t.Fatal(err) } @@ -343,7 +343,7 @@ func BenchmarkParallelProcess(b *testing.B) { p := protocols{} p.tcp = make(map[protos.Protocol]protos.TCPPlugin) p.tcp[1] = &TestProtocol{Ports: []int{ServerPort}} - tcp, _ := NewTCP(p, "", "") + tcp, _ := NewTCP(p, "", "", 0) b.ResetTimer() b.RunParallel(func(pb *testing.PB) { diff --git a/packetbeat/protos/udp/udp.go b/packetbeat/protos/udp/udp.go index 86f56fbd8fb0..8ed9ffd1c35d 100644 --- a/packetbeat/protos/udp/udp.go +++ b/packetbeat/protos/udp/udp.go @@ -48,7 +48,7 @@ type UDP struct { } // NewUDP creates and returns a new UDP. -func NewUDP(p protos.Protocols, id, device string) (*UDP, error) { +func NewUDP(p protos.Protocols, id, device string, idx int) (*UDP, error) { portMap, err := buildPortsMap(p.GetAllUDP()) if err != nil { return nil, err @@ -57,7 +57,7 @@ func NewUDP(p protos.Protocols, id, device string) (*UDP, error) { udp := &UDP{ protocols: p, portMap: portMap, - metrics: newInputMetrics(id, device, portMap), + metrics: newInputMetrics(fmt.Sprintf("%s_%d", id, idx), device, portMap), } logp.Debug("udp", "Port map: %v", portMap) diff --git a/packetbeat/protos/udp/udp_test.go b/packetbeat/protos/udp/udp_test.go index 6eed6aef2436..bc165444b912 100644 --- a/packetbeat/protos/udp/udp_test.go +++ b/packetbeat/protos/udp/udp_test.go @@ -110,7 +110,7 @@ func testSetup(t *testing.T) *TestStruct { plugin := &TestProtocol{Ports: []int{PORT}} protocols.udp[PROTO] = plugin - udp, err := NewUDP(protocols, "test", "test") + udp, err := NewUDP(protocols, "test", "test", 0) if err != nil { t.Error("Error creating UDP handler: ", err) } diff --git a/packetbeat/sniffer/decoders.go b/packetbeat/sniffer/decoders.go index e4d9c7f72d8b..992dd813c4c2 100644 --- a/packetbeat/sniffer/decoders.go +++ b/packetbeat/sniffer/decoders.go @@ -33,13 +33,15 @@ import ( // Decoders functions return a Decoder able to process the provided network // link type for use with a Sniffer. The cleanup closure should be called after -// the decoders are no longer needed to clean up resources. -type Decoders func(_ layers.LinkType, device string) (decoders *decoder.Decoder, cleanup func(), err error) +// the decoders are no longer needed to clean up resources. The idx parameter +// is the index into the list of devices obtained from the interfaces provided +// to New. +type Decoders func(_ layers.LinkType, device string, idx int) (decoders *decoder.Decoder, cleanup func(), err error) // DecodersFor returns a source of Decoders using the provided configuration // components. The id string is expected to be the ID of the beat. func DecodersFor(id string, publisher *publish.TransactionPublisher, protocols *protos.ProtocolsStruct, watcher *procs.ProcessesWatcher, flows *flows.Flows, cfg config.Config) Decoders { - return func(dl layers.LinkType, device string) (*decoder.Decoder, func(), error) { + return func(dl layers.LinkType, device string, idx int) (*decoder.Decoder, func(), error) { var icmp4 icmp.ICMPv4Processor var icmp6 icmp.ICMPv6Processor icmpCfg, err := cfg.ICMP() @@ -61,12 +63,12 @@ func DecodersFor(id string, publisher *publish.TransactionPublisher, protocols * icmp6 = icmp } - tcp, err := tcp.NewTCP(protocols, id, device) + tcp, err := tcp.NewTCP(protocols, id, device, idx) if err != nil { return nil, nil, err } - udp, err := udp.NewUDP(protocols, id, device) + udp, err := udp.NewUDP(protocols, id, device, idx) if err != nil { return nil, nil, err } diff --git a/packetbeat/sniffer/sniffer.go b/packetbeat/sniffer/sniffer.go index 21402ef61d56..3cbff09483b2 100644 --- a/packetbeat/sniffer/sniffer.go +++ b/packetbeat/sniffer/sniffer.go @@ -64,8 +64,9 @@ type sniffer struct { // filter is the bpf filter program used by the sniffer. filter string - // id identifies the sniffer for metric collection. - id string + // id and idx identify the sniffer for metric collection. + id string + idx int decoders Decoders @@ -100,6 +101,7 @@ func New(id string, testMode bool, _ string, decoders Decoders, interfaces []con state: atomic.MakeInt32(snifferInactive), followDefault: iface.PollDefaultRoute > 0 && strings.HasPrefix(iface.Device, "default_route"), id: id, + idx: i, decoders: decoders, log: s.log, } @@ -287,7 +289,7 @@ func (s *sniffer) sniffStatic(ctx context.Context, device string) error { } defer handle.Close() - dec, cleanup, err := s.decoders(handle.LinkType(), device) + dec, cleanup, err := s.decoders(handle.LinkType(), device, s.idx) if err != nil { return err } @@ -330,7 +332,7 @@ func (s *sniffer) sniffOneDynamic(ctx context.Context, device string, last layer if dec == nil || linkType != last { s.log.Infof("changing link type: %d -> %d", last, linkType) var cleanup func() - dec, cleanup, err = s.decoders(linkType, device) + dec, cleanup, err = s.decoders(linkType, device, s.idx) if err != nil { return linkType, dec, err } @@ -464,7 +466,7 @@ func (s *sniffer) open(device string) (snifferHandle, error) { case "pcap": return openPcap(device, s.filter, &s.config) case "af_packet": - return openAFPacket(s.id, device, s.filter, &s.config) + return openAFPacket(fmt.Sprintf("%s_%d", s.id, s.idx), device, s.filter, &s.config) default: return nil, fmt.Errorf("unknown sniffer type for %s: %q", device, s.config.Type) } diff --git a/testing/environments/snapshot.yml b/testing/environments/snapshot.yml index ddb7d19042d0..95b12616908c 100644 --- a/testing/environments/snapshot.yml +++ b/testing/environments/snapshot.yml @@ -3,7 +3,7 @@ version: '2.3' services: elasticsearch: - image: docker.elastic.co/elasticsearch/elasticsearch:8.11.0-7dab12da-SNAPSHOT + image: docker.elastic.co/elasticsearch/elasticsearch:8.11.0-2b227d89-SNAPSHOT # When extend is used it merges healthcheck.tests, see: # https://github.com/docker/compose/issues/8962 # healthcheck: @@ -31,7 +31,7 @@ services: - "./docker/elasticsearch/users_roles:/usr/share/elasticsearch/config/users_roles" logstash: - image: docker.elastic.co/logstash/logstash:8.11.0-7dab12da-SNAPSHOT + image: docker.elastic.co/logstash/logstash:8.11.0-2b227d89-SNAPSHOT healthcheck: test: ["CMD", "curl", "-f", "http://localhost:9600/_node/stats"] retries: 600 @@ -44,7 +44,7 @@ services: - 5055:5055 kibana: - image: docker.elastic.co/kibana/kibana:8.11.0-7dab12da-SNAPSHOT + image: docker.elastic.co/kibana/kibana:8.11.0-2b227d89-SNAPSHOT environment: - "ELASTICSEARCH_USERNAME=kibana_system_user" - "ELASTICSEARCH_PASSWORD=testing" diff --git a/x-pack/filebeat/docs/inputs/input-entity-analytics.asciidoc b/x-pack/filebeat/docs/inputs/input-entity-analytics.asciidoc index ce21d4e0c5ee..bb86de8ebcc5 100644 --- a/x-pack/filebeat/docs/inputs/input-entity-analytics.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-entity-analytics.asciidoc @@ -17,6 +17,7 @@ external identity providers. The following identity providers are supported: - <> +- <> ==== Configuration options diff --git a/x-pack/functionbeat/Dockerfile b/x-pack/functionbeat/Dockerfile index b49fdc7bfd89..78eb698976a8 100644 --- a/x-pack/functionbeat/Dockerfile +++ b/x-pack/functionbeat/Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.20.7 +FROM golang:1.20.8 RUN \ apt-get update \ diff --git a/x-pack/heartbeat/monitors/browser/synthexec/enrich.go b/x-pack/heartbeat/monitors/browser/synthexec/enrich.go index 627f97aebb8a..05d726d6398a 100644 --- a/x-pack/heartbeat/monitors/browser/synthexec/enrich.go +++ b/x-pack/heartbeat/monitors/browser/synthexec/enrich.go @@ -16,7 +16,6 @@ import ( "github.com/gofrs/uuid" "github.com/elastic/beats/v7/heartbeat/eventext" - "github.com/elastic/beats/v7/heartbeat/monitors/logger" "github.com/elastic/beats/v7/heartbeat/monitors/stdfields" "github.com/elastic/beats/v7/libbeat/beat" ) @@ -44,16 +43,7 @@ func (senr *streamEnricher) enrich(event *beat.Event, se *SynthEvent) error { // journeyEnricher holds state across received SynthEvents retaining fields // where relevant to properly enrich *beat.Event instances. type journeyEnricher struct { - journeyComplete bool - journey *Journey - errorCount int - error error - stepCount int - // The first URL we visit is the URL for this journey, which is set on the summary event. - // We store the URL fields here for use on the summary event. - urlFields mapstr.M - start time.Time - end time.Time + journey *Journey streamEnricher *streamEnricher } @@ -81,11 +71,8 @@ func (je *journeyEnricher) enrich(event *beat.Event, se *SynthEvent) error { // Record start and end so we can calculate journey duration accurately later switch se.Type { case JourneyStart: - je.error = nil je.journey = se.Journey - je.start = event.Timestamp case JourneyEnd, CmdStatus: - je.end = event.Timestamp } } else { event.Timestamp = time.Now() @@ -102,9 +89,6 @@ func (je *journeyEnricher) enrichSynthEvent(event *beat.Event, se *SynthEvent) e var jobErr error if se.Error != nil { jobErr = stepError(se.Error) - if je.error == nil { - je.error = jobErr - } } // Needed for the edge case where a console log is emitted after one journey ends @@ -120,20 +104,7 @@ func (je *journeyEnricher) enrichSynthEvent(event *beat.Event, se *SynthEvent) e switch se.Type { case CmdStatus: - // If a command failed _after_ the journey was complete, as it happens - // when an `afterAll` hook fails, for example, we don't wan't to include - // a summary in the cmd/status event. - if !je.journeyComplete { - if se.Error != nil { - je.error = se.Error.toECSErr() - } - return je.createSummary(event) - } - case JourneyEnd: - je.journeyComplete = true - return je.createSummary(event) - case StepEnd: - je.stepCount++ + // noop case StepScreenshot, StepScreenshotRef, ScreenshotBlock: add_data_stream.SetEventDataset(event, "browser.screenshot") case JourneyNetworkInfo: @@ -149,50 +120,9 @@ func (je *journeyEnricher) enrichSynthEvent(event *beat.Event, se *SynthEvent) e eventext.MergeEventFields(event, se.ToMap()) - if len(je.urlFields) == 0 { - if urlFields, err := event.GetValue("url"); err == nil { - if ufMap, ok := urlFields.(mapstr.M); ok { - je.urlFields = ufMap - } - } - } return jobErr } -func (je *journeyEnricher) createSummary(event *beat.Event) error { - // In case of syntax errors or incorrect runner options, the Synthetics - // runner would exit immediately with exitCode 1 and we do not set the duration - // to inform the journey never ran - if !je.start.IsZero() { - duration := je.end.Sub(je.start) - eventext.MergeEventFields(event, mapstr.M{ - "monitor": mapstr.M{ - "duration": mapstr.M{ - "us": duration.Microseconds(), - }, - }, - }) - } - eventext.MergeEventFields(event, mapstr.M{ - "url": je.urlFields, - "event": mapstr.M{ - "type": "heartbeat/summary", - }, - "synthetics": mapstr.M{ - "type": "heartbeat/summary", - "journey": je.journey, - }, - }) - - // Add step count meta for log wrapper - eventext.SetMeta(event, logger.META_STEP_COUNT, je.stepCount) - - if je.journeyComplete { - return je.error - } - return fmt.Errorf("journey did not finish executing, %d steps ran: %w", je.stepCount, je.error) -} - func stepError(e *SynthError) error { return fmt.Errorf("error executing step: %w", e.toECSErr()) } diff --git a/x-pack/heartbeat/monitors/browser/synthexec/enrich_test.go b/x-pack/heartbeat/monitors/browser/synthexec/enrich_test.go index 2f660b09642c..607c14256968 100644 --- a/x-pack/heartbeat/monitors/browser/synthexec/enrich_test.go +++ b/x-pack/heartbeat/monitors/browser/synthexec/enrich_test.go @@ -7,14 +7,11 @@ package synthexec import ( "fmt" - "net/url" "testing" - "time" "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/heartbeat/monitors/stdfields" - "github.com/elastic/beats/v7/heartbeat/monitors/wrappers" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/beat/events" "github.com/elastic/beats/v7/libbeat/processors/add_data_stream" @@ -95,13 +92,9 @@ func TestJourneyEnricher(t *testing.T) { // version of the event v = append(v, lookslike.MustCompile(se.ToMap())) } else { - u, _ := url.Parse(url1) - // journey end gets a summary v = append(v, lookslike.MustCompile(map[string]interface{}{ - "event.type": "heartbeat/summary", - "synthetics.type": "heartbeat/summary", - "url": wrappers.URLFields(u), - "monitor.duration.us": int64(journeyEnd.Timestamp().Sub(journeyStart.Timestamp()) / time.Microsecond), + "event.type": "journey/end", + "synthetics.type": "journey/end", })) } return lookslike.Compose(v...) @@ -209,11 +202,7 @@ func TestEnrichSynthEvent(t *testing.T) { }, true, func(t *testing.T, e *beat.Event, je *journeyEnricher) { - v := lookslike.MustCompile(mapstr.M{ - "event": map[string]string{ - "type": "heartbeat/summary", - }, - }) + v := lookslike.MustCompile(mapstr.M{}) testslike.Test(t, v, e.Fields) }, }, @@ -225,36 +214,20 @@ func TestEnrichSynthEvent(t *testing.T) { Type: CmdStatus, Error: nil, }, - true, - func(t *testing.T, e *beat.Event, je *journeyEnricher) { - v := lookslike.MustCompile(mapstr.M{ - "event": map[string]string{ - "type": "heartbeat/summary", - }, - }) - testslike.Test(t, v, e.Fields) - }, + false, + nil, }, { "journey/end", &SynthEvent{Type: JourneyEnd}, false, - func(t *testing.T, e *beat.Event, je *journeyEnricher) { - v := lookslike.MustCompile(mapstr.M{ - "event": map[string]string{ - "type": "heartbeat/summary", - }, - }) - testslike.Test(t, v, e.Fields) - }, + nil, }, { "step/end", &SynthEvent{Type: "step/end"}, false, - func(t *testing.T, e *beat.Event, je *journeyEnricher) { - require.Equal(t, 1, je.stepCount) - }, + nil, }, { "step/screenshot", @@ -299,242 +272,9 @@ func TestEnrichSynthEvent(t *testing.T) { if err := je.enrichSynthEvent(e, tt.se); (err == nil && tt.wantErr) || (err != nil && !tt.wantErr) { t.Errorf("journeyEnricher.enrichSynthEvent() error = %v, wantErr %v", err, tt.wantErr) } - tt.check(t, e, je) - }) - } -} - -func TestNoSummaryOnAfterHook(t *testing.T) { - journey := &Journey{ - Name: "A journey that fails after completing", - ID: "my-bad-after-all-hook", - } - journeyStart := &SynthEvent{ - Type: JourneyStart, - TimestampEpochMicros: 1000, - PackageVersion: "1.0.0", - Journey: journey, - Payload: mapstr.M{}, - } - syntherr := &SynthError{ - Message: "my-errmsg", - Name: "my-errname", - Stack: "my\nerr\nstack", - } - journeyEnd := &SynthEvent{ - Type: JourneyEnd, - TimestampEpochMicros: 2000, - PackageVersion: "1.0.0", - Journey: journey, - Payload: mapstr.M{}, - } - cmdStatus := &SynthEvent{ - Type: CmdStatus, - Error: &SynthError{Name: "cmdexit", Message: "cmd err msg"}, - TimestampEpochMicros: 3000, - } - - badStepUrl := "https://example.com/bad-step" - synthEvents := []*SynthEvent{ - journeyStart, - makeStepEvent("step/start", 10, "Step1", 1, "", "", nil), - makeStepEvent("step/end", 20, "Step1", 2, "failed", badStepUrl, syntherr), - journeyEnd, - cmdStatus, - } - - stdFields := stdfields.StdMonitorFields{} - je := makeTestJourneyEnricher(stdFields) - for idx, se := range synthEvents { - e := &beat.Event{} - - t.Run(fmt.Sprintf("event %d", idx), func(t *testing.T) { - enrichErr := je.enrich(e, se) - - if se != nil && se.Type == CmdStatus { - t.Run("no summary in cmd/status", func(t *testing.T) { - require.NotContains(t, e.Fields, "summary") - }) - } - - // Only the journey/end event should get a summary when - // it's emitted before the cmd/status (when an afterX hook fails). - if se != nil && se.Type == JourneyEnd { - require.Equal(t, stepError(syntherr), enrichErr) - - u, _ := url.Parse(badStepUrl) - t.Run("summary in journey/end", func(t *testing.T) { - v := lookslike.MustCompile(mapstr.M{ - "synthetics.type": "heartbeat/summary", - "url": wrappers.URLFields(u), - "monitor.duration.us": int64(journeyEnd.Timestamp().Sub(journeyStart.Timestamp()) / time.Microsecond), - }) - - testslike.Test(t, v, e.Fields) - }) - } - }) - } -} - -func TestSummaryWithoutJourneyEnd(t *testing.T) { - journey := &Journey{ - Name: "A journey that never emits journey/end but exits successfully", - ID: "no-journey-end-but-success", - } - journeyStart := &SynthEvent{ - Type: "journey/start", - TimestampEpochMicros: 1000, - PackageVersion: "1.0.0", - Journey: journey, - Payload: mapstr.M{}, - } - - cmdStatus := &SynthEvent{ - Type: CmdStatus, - Error: nil, - TimestampEpochMicros: 3000, - } - - url1 := "http://example.net/url1" - synthEvents := []*SynthEvent{ - journeyStart, - makeStepEvent("step/end", 20, "Step1", 1, "", url1, nil), - cmdStatus, - } - - hasCmdStatus := false - - stdFields := stdfields.StdMonitorFields{} - je := makeTestJourneyEnricher(stdFields) - for idx, se := range synthEvents { - e := &beat.Event{} - t.Run(fmt.Sprintf("event %d", idx), func(t *testing.T) { - enrichErr := je.enrich(e, se) - - if se != nil && se.Type == CmdStatus { - hasCmdStatus = true - require.Error(t, enrichErr, "journey did not finish executing, 1 steps ran") - - u, _ := url.Parse(url1) - - v := lookslike.MustCompile(mapstr.M{ - "synthetics.type": "heartbeat/summary", - "url": wrappers.URLFields(u), - "monitor.duration.us": int64(cmdStatus.Timestamp().Sub(journeyStart.Timestamp()) / time.Microsecond), - }) - - testslike.Test(t, v, e.Fields) - } - }) - } - - require.True(t, hasCmdStatus) -} - -func TestCreateSummaryEvent(t *testing.T) { - baseTime := time.Now() - - testJourney := Journey{ - ID: "my-monitor", - Name: "My Monitor", - } - - tests := []struct { - name string - je *journeyEnricher - expected mapstr.M - wantErr bool - }{{ - name: "completed without errors", - je: &journeyEnricher{ - journey: &testJourney, - start: baseTime, - end: baseTime.Add(10 * time.Microsecond), - journeyComplete: true, - stepCount: 3, - }, - expected: mapstr.M{ - "monitor.duration.us": int64(10), - "event": mapstr.M{ - "type": "heartbeat/summary", - }, - }, - wantErr: false, - }, { - name: "completed with error", - je: &journeyEnricher{ - journey: &testJourney, - start: baseTime, - end: baseTime.Add(10 * time.Microsecond), - journeyComplete: true, - errorCount: 1, - error: fmt.Errorf("journey errored"), - }, - expected: mapstr.M{ - "monitor.duration.us": int64(10), - "event": mapstr.M{ - "type": "heartbeat/summary", - }, - }, - wantErr: true, - }, { - name: "started, but exited without running steps", - je: &journeyEnricher{ - journey: &testJourney, - start: baseTime, - end: baseTime.Add(10 * time.Microsecond), - stepCount: 0, - journeyComplete: false, - streamEnricher: newStreamEnricher(stdfields.StdMonitorFields{}), - }, - expected: mapstr.M{ - "monitor.duration.us": int64(10), - "event": mapstr.M{ - "type": "heartbeat/summary", - }, - }, - wantErr: true, - }, { - name: "syntax error - exited without starting", - je: &journeyEnricher{ - journey: &testJourney, - end: time.Now().Add(10 * time.Microsecond), - journeyComplete: false, - errorCount: 1, - streamEnricher: newStreamEnricher(stdfields.StdMonitorFields{}), - }, - expected: mapstr.M{ - "event": mapstr.M{ - "type": "heartbeat/summary", - }, - }, - wantErr: true, - }} - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - monitorField := mapstr.M{"id": "my-monitor", "type": "browser"} - - e := &beat.Event{ - Fields: mapstr.M{"monitor": monitorField}, - } - err := tt.je.createSummary(e) - if tt.wantErr { - require.Error(t, err) - } else { - require.NoError(t, err) + if tt.check != nil { + tt.check(t, e, je) } - // linter has been activated in the meantime. We'll cleanup separately. - err = mapstr.MergeFields(tt.expected, mapstr.M{ - "monitor": monitorField, - "url": mapstr.M{}, - "event.type": "heartbeat/summary", - "synthetics.type": "heartbeat/summary", - "synthetics.journey": testJourney, - }, true) - require.NoError(t, err) - testslike.Test(t, lookslike.Strict(lookslike.MustCompile(tt.expected)), e.Fields) }) } } diff --git a/x-pack/heartbeat/monitors/browser/synthexec/synthtypes.go b/x-pack/heartbeat/monitors/browser/synthexec/synthtypes.go index 974a53174350..a0ad7f05a97c 100644 --- a/x-pack/heartbeat/monitors/browser/synthexec/synthtypes.go +++ b/x-pack/heartbeat/monitors/browser/synthexec/synthtypes.go @@ -15,7 +15,7 @@ import ( "github.com/elastic/elastic-agent-libs/mapstr" "github.com/elastic/beats/v7/heartbeat/ecserr" - "github.com/elastic/beats/v7/heartbeat/monitors/wrappers" + "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/wraputil" ) // These constants define all known synthetics event types @@ -97,7 +97,7 @@ func (se SynthEvent) ToMap() (m mapstr.M) { if e != nil { logp.L().Warn("Could not parse synthetics URL '%s': %s", se.URL, e.Error()) } else { - _, _ = m.Put("url", wrappers.URLFields(u)) + _, _ = m.Put("url", wraputil.URLFields(u)) } } diff --git a/x-pack/heartbeat/monitors/browser/synthexec/synthtypes_test.go b/x-pack/heartbeat/monitors/browser/synthexec/synthtypes_test.go index b26868b5b692..af1a9822a064 100644 --- a/x-pack/heartbeat/monitors/browser/synthexec/synthtypes_test.go +++ b/x-pack/heartbeat/monitors/browser/synthexec/synthtypes_test.go @@ -16,7 +16,7 @@ import ( "github.com/elastic/go-lookslike/testslike" "github.com/elastic/beats/v7/heartbeat/ecserr" - "github.com/elastic/beats/v7/heartbeat/monitors/wrappers" + "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/wraputil" "github.com/stretchr/testify/require" ) @@ -55,7 +55,7 @@ func TestToMap(t *testing.T) { "package_version": "1.2.3", "nested": "v1", }, - "url": wrappers.URLFields(testUrl), + "url": wraputil.URLFields(testUrl), "truly_at_root": "v2", }, }, diff --git a/x-pack/heartbeat/scenarios/basics_test.go b/x-pack/heartbeat/scenarios/basics_test.go index da19b2264f74..a8b39dbfaf10 100644 --- a/x-pack/heartbeat/scenarios/basics_test.go +++ b/x-pack/heartbeat/scenarios/basics_test.go @@ -12,19 +12,22 @@ import ( "github.com/elastic/go-lookslike" "github.com/elastic/go-lookslike/isdef" "github.com/elastic/go-lookslike/testslike" + "github.com/elastic/go-lookslike/validator" + "github.com/elastic/beats/v7/heartbeat/hbtest" "github.com/elastic/beats/v7/heartbeat/hbtestllext" _ "github.com/elastic/beats/v7/heartbeat/monitors/active/http" _ "github.com/elastic/beats/v7/heartbeat/monitors/active/icmp" _ "github.com/elastic/beats/v7/heartbeat/monitors/active/tcp" - "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/summarizer" + "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/monitorstate" + "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/summarizer/jobsummary" "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/summarizer/summarizertesthelper" "github.com/elastic/beats/v7/x-pack/heartbeat/scenarios/framework" ) type CheckHistItem struct { cg string - summary *summarizer.JobSummary + summary *jobsummary.JobSummary } func TestSimpleScenariosBasicFields(t *testing.T) { @@ -47,10 +50,10 @@ func TestSimpleScenariosBasicFields(t *testing.T) { require.NoError(t, err) cg := cgIface.(string) - var summary *summarizer.JobSummary + var summary *jobsummary.JobSummary summaryIface, err := e.GetValue("summary") if err == nil { - summary = summaryIface.(*summarizer.JobSummary) + summary = summaryIface.(*jobsummary.JobSummary) } var lastCheck *CheckHistItem @@ -100,7 +103,27 @@ func TestLightweightSummaries(t *testing.T) { all := mtr.Events() lastEvent, firstEvents := all[len(all)-1], all[:len(all)-1] testslike.Test(t, - summarizertesthelper.SummaryValidator(1, 0), + SummaryValidatorForStatus(mtr.Meta.Status), + lastEvent.Fields) + + for _, e := range firstEvents { + summary, _ := e.GetValue("summary") + require.Nil(t, summary) + } + }) +} + +func TestBrowserSummaries(t *testing.T) { + t.Parallel() + scenarioDB.RunTag(t, "browser", func(t *testing.T, mtr *framework.MonitorTestRun, err error) { + all := mtr.Events() + lastEvent, firstEvents := all[len(all)-1], all[:len(all)-1] + + testslike.Test(t, + lookslike.Compose( + SummaryValidatorForStatus(mtr.Meta.Status), + hbtest.URLChecks(t, mtr.Meta.URL), + ), lastEvent.Fields) for _, e := range firstEvents { @@ -133,3 +156,11 @@ func TestRunFromOverride(t *testing.T) { } }) } + +func SummaryValidatorForStatus(ss monitorstate.StateStatus) validator.Validator { + var expectedUp, expectedDown uint16 = 1, 0 + if ss == monitorstate.StatusDown { + expectedUp, expectedDown = 0, 1 + } + return summarizertesthelper.SummaryValidator(expectedUp, expectedDown) +} diff --git a/x-pack/heartbeat/scenarios/browserscenarios.go b/x-pack/heartbeat/scenarios/browserscenarios.go index 0cfce6831f43..1760ef587502 100644 --- a/x-pack/heartbeat/scenarios/browserscenarios.go +++ b/x-pack/heartbeat/scenarios/browserscenarios.go @@ -8,9 +8,11 @@ package scenarios import ( "fmt" + "net/url" "os" "testing" + "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/monitorstate" _ "github.com/elastic/beats/v7/x-pack/heartbeat/monitors/browser" "github.com/elastic/beats/v7/x-pack/heartbeat/scenarios/framework" "github.com/elastic/elastic-agent-libs/mapstr" @@ -22,25 +24,56 @@ func init() { Name: "simple-browser", Type: "browser", Tags: []string{"browser", "browser-inline"}, - Runner: func(t *testing.T) (config mapstr.M, close func(), err error) { + Runner: func(t *testing.T) (config mapstr.M, meta framework.ScenarioRunMeta, close func(), err error) { err = os.Setenv("ELASTIC_SYNTHETICS_CAPABLE", "true") if err != nil { - return nil, nil, err + return nil, meta, nil, err } server := startTestWebserver(t) + + // Add / to normalize with test output + meta.URL, _ = url.Parse(server.URL + "/") + meta.Status = monitorstate.StatusUp config = mapstr.M{ "id": "browser-test-id", "name": "browser-test-name", "type": "browser", "schedule": "@every 1m", - "hosts": []string{"127.0.0.1"}, "source": mapstr.M{ "inline": mapstr.M{ "script": fmt.Sprintf("step('load server', async () => {await page.goto('%s')})", server.URL), }, }, } - return config, nil, nil + return config, meta, nil, nil + }, + }, + framework.Scenario{ + Name: "failing-browser", + Type: "browser", + Tags: []string{"browser", "browser-inline", "down"}, + Runner: func(t *testing.T) (config mapstr.M, meta framework.ScenarioRunMeta, close func(), err error) { + err = os.Setenv("ELASTIC_SYNTHETICS_CAPABLE", "true") + if err != nil { + return nil, meta, nil, err + } + server := startTestWebserver(t) + + // Add / to normalize with test output + meta.URL, _ = url.Parse(server.URL + "/") + meta.Status = monitorstate.StatusDown + config = mapstr.M{ + "id": "browser-test-id", + "name": "browser-test-name", + "type": "browser", + "schedule": "@every 1m", + "source": mapstr.M{ + "inline": mapstr.M{ + "script": fmt.Sprintf("step('load server', async () => {await page.goto('%s'); throw(\"anerr\")})", meta.URL), + }, + }, + } + return config, meta, nil, nil }, }, ) diff --git a/x-pack/heartbeat/scenarios/framework/framework.go b/x-pack/heartbeat/scenarios/framework/framework.go index 2a092bb73ef4..c4e3e54b5bc7 100644 --- a/x-pack/heartbeat/scenarios/framework/framework.go +++ b/x-pack/heartbeat/scenarios/framework/framework.go @@ -6,6 +6,7 @@ package framework import ( "fmt" + "net/url" "os" "sync" "testing" @@ -29,7 +30,11 @@ import ( beatversion "github.com/elastic/beats/v7/libbeat/version" ) -type ScenarioRun func(t *testing.T) (config mapstr.M, close func(), err error) +type ScenarioRun func(t *testing.T) (config mapstr.M, meta ScenarioRunMeta, close func(), err error) +type ScenarioRunMeta struct { + URL *url.URL + Status monitorstate.StateStatus +} type Scenario struct { Name string @@ -38,6 +43,7 @@ type Scenario struct { Tags []string RunFrom *hbconfig.LocationWithID NumberOfRuns int + URL string } type Twist struct { @@ -83,7 +89,7 @@ func (s Scenario) Run(t *testing.T, twist *Twist, callback func(t *testing.T, mt runS = twist.Fn(s.clone()) } - cfgMap, rClose, err := runS.Runner(t) + cfgMap, meta, rClose, err := runS.Runner(t) if rClose != nil { defer rClose() } @@ -109,7 +115,7 @@ func (s Scenario) Run(t *testing.T, twist *Twist, callback func(t *testing.T, mt var conf mapstr.M for i := 0; i < numberRuns; i++ { var mtr *MonitorTestRun - mtr, err = runMonitorOnce(t, cfgMap, runS.RunFrom, loaderDB.StateLoader()) + mtr, err = runMonitorOnce(t, cfgMap, meta, runS.RunFrom, loaderDB.StateLoader()) mtr.wait() events = append(events, mtr.Events()...) @@ -127,6 +133,7 @@ func (s Scenario) Run(t *testing.T, twist *Twist, callback func(t *testing.T, mt sumMtr := MonitorTestRun{ StdFields: sf, Config: conf, + Meta: meta, Events: func() []*beat.Event { return events }, @@ -209,6 +216,7 @@ func (sdb *ScenarioDB) RunTagWithATwist(t *testing.T, tagName string, twist *Twi type MonitorTestRun struct { StdFields stdfields.StdMonitorFields + Meta ScenarioRunMeta Config mapstr.M Events func() []*beat.Event monitor *monitors.Monitor @@ -216,9 +224,10 @@ type MonitorTestRun struct { close func() } -func runMonitorOnce(t *testing.T, monitorConfig mapstr.M, location *hbconfig.LocationWithID, stateLoader monitorstate.StateLoader) (mtr *MonitorTestRun, err error) { +func runMonitorOnce(t *testing.T, monitorConfig mapstr.M, meta ScenarioRunMeta, location *hbconfig.LocationWithID, stateLoader monitorstate.StateLoader) (mtr *MonitorTestRun, err error) { mtr = &MonitorTestRun{ Config: monitorConfig, + Meta: meta, StdFields: stdfields.StdMonitorFields{ RunFrom: location, }, diff --git a/x-pack/heartbeat/scenarios/framework/framework_test.go b/x-pack/heartbeat/scenarios/framework/framework_test.go index 97316106e7fb..243f7c254777 100644 --- a/x-pack/heartbeat/scenarios/framework/framework_test.go +++ b/x-pack/heartbeat/scenarios/framework/framework_test.go @@ -17,13 +17,13 @@ import ( var testScenario Scenario = Scenario{ Name: "My Scenario", Tags: []string{"testTag"}, - Runner: func(t *testing.T) (config mapstr.M, close func(), err error) { + Runner: func(t *testing.T) (config mapstr.M, meta ScenarioRunMeta, close func(), err error) { return mapstr.M{ "type": "http", "id": "testID", "name": "testName", "schedule": "@every 10s", - }, nil, nil + }, meta, nil, nil }, RunFrom: &config.LocationWithID{ ID: "TestID", diff --git a/x-pack/heartbeat/scenarios/scenarios.go b/x-pack/heartbeat/scenarios/scenarios.go index fe0e1bbee164..31f95270ee1e 100644 --- a/x-pack/heartbeat/scenarios/scenarios.go +++ b/x-pack/heartbeat/scenarios/scenarios.go @@ -12,11 +12,13 @@ import ( "github.com/elastic/elastic-agent-libs/mapstr" + "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/monitorstate" "github.com/elastic/beats/v7/x-pack/heartbeat/scenarios/framework" ) var scenarioDB = framework.NewScenarioDB() var testWs *httptest.Server +var failingTestWs *httptest.Server // Note, no browser scenarios here, those all go in browserscenarios.go // since they have different build tags @@ -25,9 +27,11 @@ func init() { framework.Scenario{ Name: "http-simple", Type: "http", - Tags: []string{"lightweight", "http"}, - Runner: func(t *testing.T) (config mapstr.M, close func(), err error) { + Tags: []string{"lightweight", "http", "up"}, + Runner: func(t *testing.T) (config mapstr.M, meta framework.ScenarioRunMeta, close func(), err error) { server := startTestWebserver(t) + meta.URL, _ = url.Parse(server.URL) + meta.Status = monitorstate.StatusUp config = mapstr.M{ "id": "http-test-id", "name": "http-test-name", @@ -35,19 +39,62 @@ func init() { "schedule": "@every 1m", "urls": []string{server.URL}, } - return config, nil, nil + return config, meta, nil, nil + }, + }, + framework.Scenario{ + Name: "http-down", + Type: "http", + Tags: []string{"lightweight", "http", "down"}, + Runner: func(t *testing.T) (config mapstr.M, meta framework.ScenarioRunMeta, close func(), err error) { + server := startFailingTestWebserver(t) + u := server.URL + meta.URL, _ = url.Parse(u) + meta.Status = monitorstate.StatusDown + config = mapstr.M{ + "id": "http-test-id", + "name": "http-test-name", + "type": "http", + "schedule": "@every 1m", + "urls": []string{u}, + } + return config, meta, nil, nil }, }, framework.Scenario{ Name: "tcp-simple", Type: "tcp", - Tags: []string{"lightweight", "tcp"}, - Runner: func(t *testing.T) (config mapstr.M, close func(), err error) { + Tags: []string{"lightweight", "tcp", "up"}, + Runner: func(t *testing.T) (config mapstr.M, meta framework.ScenarioRunMeta, close func(), err error) { server := startTestWebserver(t) parsedUrl, err := url.Parse(server.URL) if err != nil { panic(fmt.Sprintf("URL %s should always be parsable: %s", server.URL, err)) } + parsedUrl.Scheme = "tcp" + meta.URL = parsedUrl + meta.Status = monitorstate.StatusUp + config = mapstr.M{ + "id": "tcp-test-id", + "name": "tcp-test-name", + "type": "tcp", + "schedule": "@every 1m", + "hosts": []string{parsedUrl.Host}, // Host includes host:port + } + return config, meta, nil, nil + }, + }, + framework.Scenario{ + Name: "tcp-down", + Type: "tcp", + Tags: []string{"lightweight", "tcp", "down"}, + Runner: func(t *testing.T) (config mapstr.M, meta framework.ScenarioRunMeta, close func(), err error) { + // This ip should never route anywhere + // see https://stackoverflow.com/questions/528538/non-routable-ip-address + parsedUrl, _ := url.Parse("tcp://192.0.2.0:8282") + parsedUrl.Scheme = "tcp" + meta.URL = parsedUrl + meta.Status = monitorstate.StatusDown config = mapstr.M{ "id": "tcp-test-id", "name": "tcp-test-name", @@ -55,21 +102,23 @@ func init() { "schedule": "@every 1m", "hosts": []string{parsedUrl.Host}, // Host includes host:port } - return config, nil, nil + return config, meta, nil, nil }, }, framework.Scenario{ Name: "simple-icmp", Type: "icmp", - Tags: []string{"icmp"}, - Runner: func(t *testing.T) (config mapstr.M, close func(), err error) { + Tags: []string{"icmp", "up"}, + Runner: func(t *testing.T) (config mapstr.M, meta framework.ScenarioRunMeta, close func(), err error) { + meta.URL, _ = url.Parse("icp://127.0.0.1") + meta.Status = monitorstate.StatusUp return mapstr.M{ "id": "icmp-test-id", "name": "icmp-test-name", "type": "icmp", "schedule": "@every 1m", "hosts": []string{"127.0.0.1"}, - }, func() {}, nil + }, meta, nil, nil }, }, ) diff --git a/x-pack/heartbeat/scenarios/stateloader_test.go b/x-pack/heartbeat/scenarios/stateloader_test.go index e3ea54a06910..c83ebafc0c76 100644 --- a/x-pack/heartbeat/scenarios/stateloader_test.go +++ b/x-pack/heartbeat/scenarios/stateloader_test.go @@ -9,7 +9,6 @@ import ( "github.com/stretchr/testify/assert" - "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/monitorstate" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/x-pack/heartbeat/scenarios/framework" ) @@ -35,7 +34,7 @@ func TestStateContinuity(t *testing.T) { lastSS := framework.LastState(mtr.Events()) - assert.Equal(t, monitorstate.StatusUp, lastSS.State.Status, "monitor was unexpectedly down, synthetics console output: %s, errors", sout, errors) + assert.Equal(t, mtr.Meta.Status, lastSS.State.Status, "monitor had unexpected state %v, synthetics console output: %s, errors", lastSS.State.Status, sout, errors) allSS := framework.AllStates(mtr.Events()) assert.Len(t, allSS, numRuns) diff --git a/x-pack/heartbeat/scenarios/testws.go b/x-pack/heartbeat/scenarios/testws.go index badfdb272364..bbcc193592b0 100644 --- a/x-pack/heartbeat/scenarios/testws.go +++ b/x-pack/heartbeat/scenarios/testws.go @@ -19,18 +19,29 @@ import ( ) var testWsOnce = &sync.Once{} +var failingTestWsOnce = &sync.Once{} // Starting this thing up is expensive, let's just do it once func startTestWebserver(t *testing.T) *httptest.Server { testWsOnce.Do(func() { testWs = httptest.NewServer(hbtest.HelloWorldHandler(200)) - waitForWs(t, testWs.URL) + waitForWs(t, testWs.URL, 200) }) return testWs } +func startFailingTestWebserver(t *testing.T) *httptest.Server { + failingTestWsOnce.Do(func() { + failingTestWs = httptest.NewServer(hbtest.HelloWorldHandler(400)) + + waitForWs(t, failingTestWs.URL, 400) + }) + + return failingTestWs +} + func StartStatefulTestWS(t *testing.T, statuses []int) *httptest.Server { mtx := sync.Mutex{} statusIdx := 0 @@ -49,19 +60,19 @@ func StartStatefulTestWS(t *testing.T, statuses []int) *httptest.Server { })) // wait for ws to become available - waitForWs(t, testWs.URL) + waitForWs(t, testWs.URL, 200) return testWs } -func waitForWs(t *testing.T, url string) { +func waitForWs(t *testing.T, url string, statusCode int) { require.Eventuallyf( t, func() bool { req, _ := http.NewRequestWithContext(context.Background(), http.MethodGet, url, nil) resp, _ := http.DefaultClient.Do(req) resp.Body.Close() - return resp.StatusCode == 200 + return resp.StatusCode == statusCode }, 10*time.Second, 250*time.Millisecond, "could not start webserver", ) diff --git a/x-pack/heartbeat/scenarios/twists.go b/x-pack/heartbeat/scenarios/twists.go index 3109b5e73d9b..5f4d1093020a 100644 --- a/x-pack/heartbeat/scenarios/twists.go +++ b/x-pack/heartbeat/scenarios/twists.go @@ -40,10 +40,10 @@ func TwistMaxAttempts(maxAttempts int) *framework.Twist { return framework.MakeTwist(fmt.Sprintf("run with %d max_attempts", maxAttempts), func(s framework.Scenario) framework.Scenario { s.Tags = append(s.Tags, "retry") origRunner := s.Runner - s.Runner = func(t *testing.T) (config mapstr.M, close func(), err error) { - config, close, err = origRunner(t) + s.Runner = func(t *testing.T) (config mapstr.M, meta framework.ScenarioRunMeta, close func(), err error) { + config, meta, close, err = origRunner(t) config["max_attempts"] = maxAttempts - return config, close, err + return config, meta, close, err } return s }) diff --git a/x-pack/libbeat/management/generate.go b/x-pack/libbeat/management/generate.go index 3bdb1f29c6ae..59537e066862 100644 --- a/x-pack/libbeat/management/generate.go +++ b/x-pack/libbeat/management/generate.go @@ -5,11 +5,8 @@ package management import ( - "errors" "fmt" - "google.golang.org/protobuf/types/known/structpb" - "github.com/elastic/beats/v7/libbeat/common/reload" "github.com/elastic/elastic-agent-client/v7/pkg/client" "github.com/elastic/elastic-agent-client/v7/pkg/proto" @@ -85,78 +82,10 @@ func handleSimpleConfig(raw *proto.UnitExpectedConfig) (map[string]any, error) { return m, nil } -// dataStreamAndSource is a generic way to represent proto mesages -// that contain a source field and a datastream field. -type dataStreamAndSource interface { - GetDataStream() *proto.DataStream - GetSource() *structpb.Struct -} - -// deDotDataStream reads any datastream value from the dotted notation -// (data_stream.*) and returns it as a *proto.DataStream. If raw already -// contains a DataStream but no fields are duplicated, then the values are merged. -func deDotDataStream(raw dataStreamAndSource) (*proto.DataStream, error) { - ds := raw.GetDataStream() - if ds == nil { - ds = &proto.DataStream{} - } - - tmp := struct { - DataStream struct { - Dataset string `config:"dataset" yaml:"dataset"` - Type string `config:"type" yaml:"type"` - Namespace string `config:"namespace" yaml:"namespace"` - } `config:"data_stream" yaml:"data_stream"` - }{} - - cfg, err := conf.NewConfigFrom(raw.GetSource().AsMap()) - if err != nil { - return nil, fmt.Errorf("cannot generate config from source field: %w", err) - } - - if err := cfg.Unpack(&tmp); err != nil { - return nil, fmt.Errorf("cannot unpack source field into struct: %w", err) - } - - if ds.Dataset != "" && tmp.DataStream.Dataset != "" { - return nil, errors.New("duplicated key 'datastream.dataset'") - } - - if ds.Type != "" && tmp.DataStream.Type != "" { - return nil, errors.New("duplicated key 'datastream.type'") - } - - if ds.Namespace != "" && tmp.DataStream.Namespace != "" { - return nil, errors.New("duplicated key 'datastream.namespace'") - } - - ret := &proto.DataStream{ - Dataset: merge(tmp.DataStream.Dataset, ds.Dataset), - Type: merge(tmp.DataStream.Type, ds.Type), - Namespace: merge(tmp.DataStream.Namespace, ds.Namespace), - } - - return ret, nil -} - -// merge returns b if a is an empty string -func merge(a, b string) string { - if a == "" { - return b - } - return a -} - // CreateInputsFromStreams breaks down the raw Expected config into an array of individual inputs/modules from the Streams values // that can later be formatted into the reloader's ConfigWithMetaData and sent to an indvidual beat/ // This also performs the basic task of inserting module-level add_field processors into the inputs/modules. func CreateInputsFromStreams(raw *proto.UnitExpectedConfig, defaultDataStreamType string, agentInfo *client.AgentInfo, defaultProcessors ...mapstr.M) ([]map[string]interface{}, error) { - ds, err := deDotDataStream(raw) - if err != nil { - return nil, fmt.Errorf("could not read 'data_stream': %w", err) - } - raw.DataStream = ds - // If there are no streams, we fall into the 'simple input config' case, // this means the key configuration values are on the root level instead of // an element in the `streams` array. @@ -177,14 +106,8 @@ func CreateInputsFromStreams(raw *proto.UnitExpectedConfig, defaultDataStreamTyp inputs := make([]map[string]interface{}, len(raw.GetStreams())) for iter, stream := range raw.GetStreams() { - ds, err := deDotDataStream(stream) - if err != nil { - return nil, fmt.Errorf("could not read 'data_stream' from stream ID '%s': %w", - stream.GetId(), err) - } - stream.DataStream = ds streamSource := raw.GetStreams()[iter].GetSource().AsMap() - streamSource, err = createStreamRules(raw, streamSource, stream, defaultDataStreamType, agentInfo, defaultProcessors...) + streamSource, err := createStreamRules(raw, streamSource, stream, defaultDataStreamType, agentInfo, defaultProcessors...) if err != nil { return nil, fmt.Errorf("error creating stream rules: %w", err) } diff --git a/x-pack/libbeat/management/generate_test.go b/x-pack/libbeat/management/generate_test.go index 9c0c7df72a16..fb7f88ff7759 100644 --- a/x-pack/libbeat/management/generate_test.go +++ b/x-pack/libbeat/management/generate_test.go @@ -7,10 +7,8 @@ package management import ( "testing" - "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "google.golang.org/protobuf/testing/protocmp" "google.golang.org/protobuf/types/known/structpb" "github.com/elastic/elastic-agent-client/v7/pkg/client" @@ -232,85 +230,3 @@ func buildConfigMap(t *testing.T, unitRaw *proto.UnitExpectedConfig, agentInfo * require.NoError(t, err, "error in unpack for config %#v", reloadCfg[0].Config) return cfgMap } - -func TestDeDotDataStream(t *testing.T) { - testCases := map[string]struct { - source map[string]any - dataStream *proto.DataStream - wantError bool - expectedDataStream *proto.DataStream - }{ - "all data is flattened": { - source: map[string]any{ - "data_stream.dataset": "my dataset", - "data_stream.namespace": "my namespace", - "data_stream.type": "my type", - }, - expectedDataStream: &proto.DataStream{ - Dataset: "my dataset", - Namespace: "my namespace", - Type: "my type", - }, - }, - "no data is flattened": { - dataStream: &proto.DataStream{ - Dataset: "my dataset", - Namespace: "my namespace", - Type: "my type", - }, - expectedDataStream: &proto.DataStream{ - Dataset: "my dataset", - Namespace: "my namespace", - Type: "my type", - }, - }, - "mix of flattened and data_stream": { - dataStream: &proto.DataStream{ - Dataset: "my dataset", - Type: "my type", - }, - source: map[string]any{ - "data_stream.namespace": "my namespace", - }, - expectedDataStream: &proto.DataStream{ - Dataset: "my dataset", - Namespace: "my namespace", - Type: "my type", - }, - }, - "duplicated keys generate error": { - dataStream: &proto.DataStream{ - Dataset: "my dataset", - }, - source: map[string]any{ - "data_stream.dataset": "another dataset", - }, - wantError: true, - }, - } - - for name, tc := range testCases { - t.Run(name, func(t *testing.T) { - raw := &proto.UnitExpectedConfig{ - Source: requireNewStruct(t, tc.source), - DataStream: tc.dataStream, - } - - final, err := deDotDataStream(raw) - if tc.wantError { - if err == nil { - t.Error("expecting an error") - } - return - } - if err != nil { - t.Fatalf("deDotDataStream returned an error: %s", err) - } - - if !cmp.Equal(final, tc.expectedDataStream, protocmp.Transform()) { - t.Errorf("expecting a different value: --got/++want\n'%s'", - cmp.Diff(final, tc.expectedDataStream, protocmp.Transform())) - } - }) - } -} diff --git a/x-pack/libbeat/management/managerV2_test.go b/x-pack/libbeat/management/managerV2_test.go index 217e54c8fe31..9fe238605b49 100644 --- a/x-pack/libbeat/management/managerV2_test.go +++ b/x-pack/libbeat/management/managerV2_test.go @@ -511,136 +511,6 @@ func TestErrorPerUnit(t *testing.T) { }, 10*time.Second, 100*time.Millisecond, "desired state, was not reached") } -func TestFlattenedDataStreams(t *testing.T) { - stateReached := atomic.Bool{} - - expectedDataset := "my-dataset" - expectedNamespace := "my-namespace" - expectedType := "my-type" - expectedIndex := fmt.Sprintf("%s-%s-%s", - expectedType, expectedDataset, expectedNamespace) - - r := reload.NewRegistry() - - output := &mockOutput{ - ReloadFn: func(config *reload.ConfigWithMeta) error { - return nil - }, - } - r.MustRegisterOutput(output) - - inputs := &mockReloadable{ - ReloadFn: func(configs []*reload.ConfigWithMeta) error { - for _, input := range configs { - tmp := struct { - Index string `config:"index" yaml:"index"` - }{} - - if err := input.Config.Unpack(&tmp); err != nil { - t.Fatalf("error unpacking config: %s", err) - } - - if tmp.Index != expectedIndex { - t.Fatalf("expecting index %q, got %q", expectedIndex, tmp.Index) - } - - stateReached.Store(true) - } - return nil - }, - } - r.MustRegisterInput(inputs) - - outputUnit := proto.UnitExpected{ - Id: "output-unit", - Type: proto.UnitType_OUTPUT, - State: proto.State_HEALTHY, - ConfigStateIdx: 1, - LogLevel: proto.UnitLogLevel_DEBUG, - Config: &proto.UnitExpectedConfig{ - Id: "default", - Type: "mock", - Name: "mock", - Source: integration.RequireNewStruct(t, - map[string]interface{}{ - "Is": "this", - "required?": "Yes!", - }), - }, - } - - inputUnit1 := proto.UnitExpected{ - Id: "input-unit1", - Type: proto.UnitType_INPUT, - State: proto.State_HEALTHY, - ConfigStateIdx: 1, - LogLevel: proto.UnitLogLevel_DEBUG, - Config: &proto.UnitExpectedConfig{ - Id: "input-unit-config-id", - Type: "filestream", - Name: "foo", - Source: requireNewStruct(t, map[string]any{ - "data_stream.dataset": expectedDataset, - "data_stream.namespace": expectedNamespace, - "data_stream.type": expectedType, - }), - Streams: []*proto.Stream{ - { - Id: "filestream-id", - Source: integration.RequireNewStruct(t, map[string]interface{}{ - "id": "input-unit1", - }), - }, - }, - }, - } - units := []*proto.UnitExpected{ - &outputUnit, - &inputUnit1, - } - server := &mock.StubServerV2{ - CheckinV2Impl: func(observed *proto.CheckinObserved) *proto.CheckinExpected { - // Nothing to do here, just keep sending the same units. - return &proto.CheckinExpected{ - Units: units, - } - }, - ActionImpl: func(response *proto.ActionResponse) error { return nil }, - } - - if err := server.Start(); err != nil { - t.Fatalf("could not start mock Elastic-Agent server: %s", err) - } - defer server.Stop() - - client := client.NewV2( - fmt.Sprintf(":%d", server.Port), - "", - client.VersionInfo{}, - grpc.WithTransportCredentials(insecure.NewCredentials())) - - m, err := NewV2AgentManagerWithClient( - &Config{ - Enabled: true, - }, - r, - client, - ) - if err != nil { - t.Fatalf("could not instantiate ManagerV2: %s", err) - } - - if err := m.Start(); err != nil { - t.Fatalf("could not start ManagerV2: %s", err) - } - defer m.Stop() - - require.Eventually(t, func() bool { - return stateReached.Load() - }, 10*time.Second, 100*time.Millisecond, - "did not find expected 'index' field on input final config") -} - type reloadable struct { mx sync.Mutex config *reload.ConfigWithMeta diff --git a/x-pack/metricbeat/module/stan/_meta/Dockerfile b/x-pack/metricbeat/module/stan/_meta/Dockerfile index 3f707023d1a3..2ca6c91c50c5 100644 --- a/x-pack/metricbeat/module/stan/_meta/Dockerfile +++ b/x-pack/metricbeat/module/stan/_meta/Dockerfile @@ -2,7 +2,7 @@ ARG STAN_VERSION=0.15.1 FROM nats-streaming:$STAN_VERSION # build stage -FROM golang:1.20.7 AS build-env +FROM golang:1.20.8 AS build-env RUN apt-get install git mercurial gcc RUN git clone https://github.com/nats-io/stan.go.git /stan-go RUN cd /stan-go/examples/stan-bench && git checkout tags/v0.5.2 && go build .