diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 8e63616d80f8..9c6e506a9fc6 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -61,6 +61,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Setting period for counter cache for Prometheus remote_write at least to 60sec {pull}38553[38553] - Add support of Graphite series 1.1.0+ tagging extension for statsd module. {pull}39619[39619] +- Allow metricsets to report their status via control v2 protocol. {pull}40025[40025] - Remove fallback to the node limit for the `kubernetes.pod.cpu.usage.limit.pct` and `kubernetes.pod.memory.usage.limit.pct` metrics calculation - Add support for Kibana status metricset in v8 format {pull}40275[40275] diff --git a/NOTICE.txt b/NOTICE.txt index 67076b74e924..23714152db4d 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -13180,11 +13180,11 @@ Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-l -------------------------------------------------------------------------------- Dependency : github.com/elastic/elastic-agent-system-metrics -Version: v0.10.8 +Version: v0.11.0 Licence type (autodetected): Apache-2.0 -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-system-metrics@v0.10.8/LICENSE.txt: +Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-system-metrics@v0.11.0/LICENSE.txt: Apache License Version 2.0, January 2004 diff --git a/go.mod b/go.mod index 24c3f6e05d65..49a19fc29fd7 100644 --- a/go.mod +++ b/go.mod @@ -194,7 +194,7 @@ require ( github.com/elastic/ebpfevents v0.6.0 github.com/elastic/elastic-agent-autodiscover v0.8.1 github.com/elastic/elastic-agent-libs v0.9.15 - github.com/elastic/elastic-agent-system-metrics v0.10.8 + github.com/elastic/elastic-agent-system-metrics v0.11.0 github.com/elastic/go-elasticsearch/v8 v8.14.0 github.com/elastic/go-sfdc v0.0.0-20240621062639-bcc8456508ff github.com/elastic/mito v1.15.0 diff --git a/go.sum b/go.sum index ea4ec0dee0d2..96eb13a36d23 100644 --- a/go.sum +++ b/go.sum @@ -558,8 +558,8 @@ github.com/elastic/elastic-agent-client/v7 v7.15.0 h1:nDB7v8TBoNuD6IIzC3z7Q0y+7b github.com/elastic/elastic-agent-client/v7 v7.15.0/go.mod h1:6h+f9QdIr3GO2ODC0Y8+aEXRwzbA5W4eV4dd/67z7nI= github.com/elastic/elastic-agent-libs v0.9.15 h1:WCLtuErafUxczT/rXJa4Vr6mxwC8dgtqMbEq+qWGD4M= github.com/elastic/elastic-agent-libs v0.9.15/go.mod h1:2VgYxHaeM+cCDBjiS2wbmTvzPGbnlXAtYrlcLefheS8= -github.com/elastic/elastic-agent-system-metrics v0.10.8 h1:YoX3GfWWDtL5YrBkIbl7jQ/usOxBi+0N9jHke2EzFCk= -github.com/elastic/elastic-agent-system-metrics v0.10.8/go.mod h1:3QiMu9wTKJFvpCN+5klgGqasTMNKJbgY3xcoN1KQXJk= +github.com/elastic/elastic-agent-system-metrics v0.11.0 h1:/bWrgTsHZWLUhdT7WPNuQDFkrSfm+A4qf6QDQnZo9d8= +github.com/elastic/elastic-agent-system-metrics v0.11.0/go.mod h1:3QiMu9wTKJFvpCN+5klgGqasTMNKJbgY3xcoN1KQXJk= github.com/elastic/elastic-transport-go/v8 v8.6.0 h1:Y2S/FBjx1LlCv5m6pWAF2kDJAHoSjSRSJCApolgfthA= github.com/elastic/elastic-transport-go/v8 v8.6.0/go.mod h1:YLHer5cj0csTzNFXoNQ8qhtGY1GTvSqPnKWKaqQE3Hk= github.com/elastic/fsevents v0.0.0-20181029231046-e1d381a4d270 h1:cWPqxlPtir4RoQVCpGSRXmLqjEHpJKbR60rxh1nQZY4= diff --git a/metricbeat/helper/http_test.go b/metricbeat/helper/http_test.go index 2fbfea0d1ad4..3fcb25578bae 100644 --- a/metricbeat/helper/http_test.go +++ b/metricbeat/helper/http_test.go @@ -19,7 +19,7 @@ package helper import ( "fmt" - "io/ioutil" + "io" "net" "net/http" "net/http/httptest" @@ -31,6 +31,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/elastic/beats/v7/libbeat/management/status" "github.com/elastic/beats/v7/metricbeat/helper/dialer" "github.com/elastic/beats/v7/metricbeat/mb" "github.com/elastic/beats/v7/metricbeat/mb/parse" @@ -55,7 +56,7 @@ func TestGetAuthHeaderFromToken(t *testing.T) { for _, test := range tests { t.Run(test.Name, func(t *testing.T) { content := []byte(test.Content) - tmpfile, err := ioutil.TempFile("", "token") + tmpfile, err := os.CreateTemp("", "token") if err != nil { t.Fatal(err) } @@ -236,14 +237,14 @@ func TestOverUnixSocket(t *testing.T) { fmt.Fprintf(w, "ehlo!") }) - go http.Serve(l, mux) + go http.Serve(l, mux) //nolint:errcheck,gosec // Ignore the error, it's a test file return l } for title, c := range cases { t.Run(title, func(t *testing.T) { - tmpDir, err := ioutil.TempDir("", "testsocket") + tmpDir, err := os.MkdirTemp("", "testsocket") require.NoError(t, err) defer os.RemoveAll(tmpDir) @@ -262,7 +263,7 @@ func TestOverUnixSocket(t *testing.T) { r, err := h.FetchResponse() require.NoError(t, err) defer r.Body.Close() - content, err := ioutil.ReadAll(r.Body) + content, err := io.ReadAll(r.Body) require.NoError(t, err) assert.Equal(t, []byte("ehlo!"), content) }) @@ -327,3 +328,5 @@ func (*dummyModule) Config() mb.ModuleConfig { func (*dummyModule) UnpackConfig(interface{}) error { return nil } +func (dummyModule) UpdateStatus(_ status.Status, _ string) {} +func (dummyModule) SetStatusReporter(_ status.StatusReporter) {} diff --git a/metricbeat/mb/mb.go b/metricbeat/mb/mb.go index 7e18dc9029d2..0be1db7cef37 100644 --- a/metricbeat/mb/mb.go +++ b/metricbeat/mb/mb.go @@ -27,6 +27,7 @@ import ( "net/url" "time" + "github.com/elastic/beats/v7/libbeat/management/status" "github.com/elastic/beats/v7/metricbeat/helper/dialer" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" @@ -62,9 +63,11 @@ const ( // Module is the common interface for all Module implementations. type Module interface { - Name() string // Name returns the name of the Module. - Config() ModuleConfig // Config returns the ModuleConfig used to create the Module. - UnpackConfig(to interface{}) error // UnpackConfig unpacks the raw module config to the given object. + Name() string // Name returns the name of the Module. + Config() ModuleConfig // Config returns the ModuleConfig used to create the Module. + UnpackConfig(to interface{}) error // UnpackConfig unpacks the raw module config to the given object. + UpdateStatus(status status.Status, msg string) // UpdateStatus updates the status of the module. Reflected on elastic-agent. + SetStatusReporter(statusReporter status.StatusReporter) // SetStatusReporter updates the status reporter for the given module. } // BaseModule implements the Module interface. @@ -73,9 +76,10 @@ type Module interface { // MetricSets, it can embed this type into another struct to satisfy the // Module interface requirements. type BaseModule struct { - name string - config ModuleConfig - rawConfig *conf.C + name string + config ModuleConfig + rawConfig *conf.C + statusReporter status.StatusReporter } func (m *BaseModule) String() string { @@ -95,6 +99,18 @@ func (m *BaseModule) UnpackConfig(to interface{}) error { return m.rawConfig.Unpack(to) } +// UpdateStatus updates the status of the module. Reflected on elastic-agent. +func (m *BaseModule) UpdateStatus(status status.Status, msg string) { + if m.statusReporter != nil { + m.statusReporter.UpdateStatus(status, msg) + } +} + +// SetStatusReporter sets the status repoter of the module. +func (m *BaseModule) SetStatusReporter(statusReporter status.StatusReporter) { + m.statusReporter = statusReporter +} + // WithConfig re-configures the module with the given raw configuration and returns a // copy of the module. // Intended to be called from module factories. Note that if metricsets are specified diff --git a/metricbeat/mb/module/runner.go b/metricbeat/mb/module/runner.go index 1b0a621d7055..aedb443e9a85 100644 --- a/metricbeat/mb/module/runner.go +++ b/metricbeat/mb/module/runner.go @@ -25,6 +25,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/cfgfile" "github.com/elastic/beats/v7/libbeat/common/diagnostics" + "github.com/elastic/beats/v7/libbeat/management/status" "github.com/elastic/elastic-agent-libs/monitoring" ) @@ -123,3 +124,7 @@ func (mr *runner) Diagnostics() []diagnostics.DiagnosticSetup { func (mr *runner) String() string { return fmt.Sprintf("%s [metricsets=%d]", mr.mod.Name(), len(mr.mod.metricSets)) } + +func (mr *runner) SetStatusReporter(reporter status.StatusReporter) { + mr.mod.SetStatusReporter(reporter) +} diff --git a/metricbeat/mb/module/runner_group.go b/metricbeat/mb/module/runner_group.go index e020cd87d554..b4d92d29f561 100644 --- a/metricbeat/mb/module/runner_group.go +++ b/metricbeat/mb/module/runner_group.go @@ -23,6 +23,7 @@ import ( "github.com/elastic/beats/v7/libbeat/cfgfile" "github.com/elastic/beats/v7/libbeat/common/diagnostics" + "github.com/elastic/beats/v7/libbeat/management/status" ) type runnerGroup struct { @@ -40,6 +41,14 @@ func newRunnerGroup(runners []cfgfile.Runner) cfgfile.Runner { } } +func (rg *runnerGroup) SetStatusReporter(reporter status.StatusReporter) { + for _, runner := range rg.runners { + if runnerWithStatus, ok := runner.(status.WithStatusReporter); ok { + runnerWithStatus.SetStatusReporter(reporter) + } + } +} + func (rg *runnerGroup) Start() { rg.startOnce.Do(func() { for _, runner := range rg.runners { diff --git a/metricbeat/mb/module/wrapper.go b/metricbeat/mb/module/wrapper.go index d41bdf014971..5243d956365e 100644 --- a/metricbeat/mb/module/wrapper.go +++ b/metricbeat/mb/module/wrapper.go @@ -26,6 +26,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/management/status" "github.com/elastic/beats/v7/metricbeat/mb" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" @@ -146,6 +147,7 @@ func (mw *Wrapper) Start(done <-chan struct{}) <-chan beat.Event { registry.Add(metricsPath, msw.Metrics(), monitoring.Full) monitoring.NewString(msw.Metrics(), "starttime").Set(common.Time(time.Now()).String()) + msw.module.UpdateStatus(status.Starting, fmt.Sprintf("%s/%s is starting", msw.module.Name(), msw.Name())) msw.run(done, out) }(msw) } @@ -253,14 +255,20 @@ func (msw *metricSetWrapper) fetch(ctx context.Context, reporter reporter) { err := fetcher.Fetch(reporter.V2()) if err != nil { reporter.V2().Error(err) + msw.module.UpdateStatus(status.Degraded, fmt.Sprintf("Error fetching data for metricset %s.%s: %v", msw.module.Name(), msw.MetricSet.Name(), err)) logp.Err("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.Name(), err) + } else { + msw.module.UpdateStatus(status.Running, "") } case mb.ReportingMetricSetV2WithContext: reporter.StartFetchTimer() err := fetcher.Fetch(ctx, reporter.V2()) if err != nil { reporter.V2().Error(err) + msw.module.UpdateStatus(status.Degraded, fmt.Sprintf("Error fetching data for metricset %s.%s: %v", msw.module.Name(), msw.MetricSet.Name(), err)) logp.Err("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.Name(), err) + } else { + msw.module.UpdateStatus(status.Running, "") } default: panic(fmt.Sprintf("unexpected fetcher type for %v", msw)) diff --git a/metricbeat/mb/testing/modules.go b/metricbeat/mb/testing/modules.go index 8c6e09df537f..736bb1f40e62 100644 --- a/metricbeat/mb/testing/modules.go +++ b/metricbeat/mb/testing/modules.go @@ -53,6 +53,7 @@ that Metricbeat does it and with the same validations. } } */ + package testing import ( @@ -60,6 +61,7 @@ import ( "testing" "time" + "github.com/elastic/beats/v7/libbeat/management/status" "github.com/elastic/go-concert/timed" "github.com/elastic/beats/v7/metricbeat/mb" @@ -72,9 +74,11 @@ type TestModule struct { RawConfig *conf.C } -func (m *TestModule) Name() string { return m.ModName } -func (m *TestModule) Config() mb.ModuleConfig { return m.ModConfig } -func (m *TestModule) UnpackConfig(to interface{}) error { return m.RawConfig.Unpack(to) } +func (m *TestModule) Name() string { return m.ModName } +func (m *TestModule) Config() mb.ModuleConfig { return m.ModConfig } +func (m *TestModule) UnpackConfig(to interface{}) error { return m.RawConfig.Unpack(to) } +func (m *TestModule) UpdateStatus(_ status.Status, _ string) {} +func (m *TestModule) SetStatusReporter(_ status.StatusReporter) {} func NewTestModule(t testing.TB, config interface{}) *TestModule { c, err := conf.NewConfigFrom(config) diff --git a/metricbeat/module/elasticsearch/node_stats/data_test.go b/metricbeat/module/elasticsearch/node_stats/data_test.go index e6151555701d..2317418eeaf8 100644 --- a/metricbeat/module/elasticsearch/node_stats/data_test.go +++ b/metricbeat/module/elasticsearch/node_stats/data_test.go @@ -22,6 +22,7 @@ package node_stats import ( "testing" + "github.com/elastic/beats/v7/libbeat/management/status" "github.com/elastic/beats/v7/metricbeat/mb" "github.com/elastic/beats/v7/metricbeat/module/elasticsearch" ) @@ -60,3 +61,6 @@ func (m mockModule) Config() mb.ModuleConfig { func (m mockModule) UnpackConfig(to interface{}) error { return nil } + +func (m mockModule) UpdateStatus(_ status.Status, _ string) {} +func (m mockModule) SetStatusReporter(_ status.StatusReporter) {} diff --git a/metricbeat/module/system/process/process.go b/metricbeat/module/system/process/process.go index ad9fa8d5ac06..684c87059c91 100644 --- a/metricbeat/module/system/process/process.go +++ b/metricbeat/module/system/process/process.go @@ -20,6 +20,7 @@ package process import ( + "errors" "fmt" "os" "runtime" @@ -111,7 +112,8 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error { // monitor either a single PID, or the configured set of processes. if m.setpid == 0 { procs, roots, err := m.stats.Get() - if err != nil { + if err != nil && !errors.Is(err, process.NonFatalErr{}) { + // return only if the error is fatal in nature return fmt.Errorf("process stats: %w", err) } @@ -121,9 +123,10 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error { RootFields: roots[evtI], }) if !isOpen { - return nil + return err } } + return err } else { proc, root, err := m.stats.GetOneRootEvent(m.setpid) if err != nil { diff --git a/metricbeat/module/system/process/process_test.go b/metricbeat/module/system/process/process_test.go index 98b48b75d6ea..18841b68c09e 100644 --- a/metricbeat/module/system/process/process_test.go +++ b/metricbeat/module/system/process/process_test.go @@ -37,13 +37,17 @@ func TestFetch(t *testing.T) { f := mbtest.NewReportingMetricSetV2Error(t, getConfig()) events, errs := mbtest.ReportingFetchV2Error(f) - assert.Empty(t, errs) + for _, err := range errs { + assert.ErrorIsf(t, err, process.NonFatalErr{}, "Expected non-fatal error, got %v", err) + } assert.NotEmpty(t, events) time.Sleep(2 * time.Second) events, errs = mbtest.ReportingFetchV2Error(f) - assert.Empty(t, errs) + for _, err := range errs { + assert.ErrorIsf(t, err, process.NonFatalErr{}, "Expected non-fatal error, got %v", err) + } assert.NotEmpty(t, events) t.Logf("fetched %d events, showing events[0]:", len(events)) diff --git a/metricbeat/module/system/process_summary/process_summary.go b/metricbeat/module/system/process_summary/process_summary.go index c64a0c1d3e12..cbf1c63a2fe5 100644 --- a/metricbeat/module/system/process_summary/process_summary.go +++ b/metricbeat/module/system/process_summary/process_summary.go @@ -20,8 +20,9 @@ package process_summary import ( + "errors" "fmt" - "io/ioutil" + "os" "runtime" "strconv" "strings" @@ -68,9 +69,10 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // descriptive error must be returned. func (m *MetricSet) Fetch(r mb.ReporterV2) error { - procList, err := process.ListStates(m.sys) - if err != nil { - return fmt.Errorf("error fetching process list: %w", err) + procList, degradeErr := process.ListStates(m.sys) + if degradeErr != nil && !errors.Is(degradeErr, process.NonFatalErr{}) { + // return only if the error is fatal in nature + return fmt.Errorf("error fetching process list: %w", degradeErr) } procStates := map[string]int{} @@ -83,7 +85,7 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error { } outMap := mapstr.M{} - err = typeconv.Convert(&outMap, procStates) + err := typeconv.Convert(&outMap, procStates) if err != nil { return fmt.Errorf("error formatting process stats: %w", err) } @@ -101,13 +103,13 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error { MetricSetFields: outMap, }) - return nil + return degradeErr } // threadStats returns a map of state counts for running threads on a system func threadStats(sys resolve.Resolver) (mapstr.M, error) { statPath := sys.ResolveHostFS("/proc/stat") - procData, err := ioutil.ReadFile(statPath) + procData, err := os.ReadFile(statPath) if err != nil { return nil, fmt.Errorf("error reading procfs file %s: %w", statPath, err) } diff --git a/metricbeat/module/system/process_summary/process_summary_test.go b/metricbeat/module/system/process_summary/process_summary_test.go index 7ec35634e434..042148f37133 100644 --- a/metricbeat/module/system/process_summary/process_summary_test.go +++ b/metricbeat/module/system/process_summary/process_summary_test.go @@ -46,7 +46,9 @@ func TestFetch(t *testing.T) { f := mbtest.NewReportingMetricSetV2Error(t, getConfig()) events, errs := mbtest.ReportingFetchV2Error(f) - require.Empty(t, errs) + for _, err := range errs { + assert.ErrorIsf(t, err, process.NonFatalErr{}, "Expected non-fatal error, got %v", err) + } require.NotEmpty(t, events) event := events[0].BeatEvent("system", "process_summary").Fields t.Logf("%s/%s event: %+v", f.Module().Name(), f.Name(), @@ -62,7 +64,9 @@ func TestStateNames(t *testing.T) { f := mbtest.NewReportingMetricSetV2Error(t, getConfig()) events, errs := mbtest.ReportingFetchV2Error(f) - require.Empty(t, errs) + for _, err := range errs { + assert.ErrorIsf(t, err, process.NonFatalErr{}, "Expected non-fatal error, got %v", err) + } require.NotEmpty(t, events) event := events[0].BeatEvent("system", "process_summary").Fields diff --git a/metricbeat/module/system/test_system.py b/metricbeat/module/system/test_system.py index de113e5e4b0e..dda6a0a6fddd 100644 --- a/metricbeat/module/system/test_system.py +++ b/metricbeat/module/system/test_system.py @@ -385,9 +385,18 @@ def test_process_summary(self): output = self.read_output_json() self.assertGreater(len(output), 0) + only_errors_encountered = True for evt in output: self.assert_fields_are_documented(evt) + if evt.get("error", None) is not None: + # Here, we assume that the error is non-fatal and we move forward the test execution. + # If the error is non-fatal, the test should pass with assertions. + # If the error is fatal, the test should fail. + continue + + # we've encoutered an event. Turn off the flag + only_errors_encountered = False summary = evt["system"]["process"]["summary"] assert isinstance(summary["total"], int) @@ -396,6 +405,10 @@ def test_process_summary(self): assert isinstance(summary["running"], int) assert isinstance(summary["total"], int) + # If the flag is true, we've only encountered error (fatal errors) + # If the flag is false, we've encoutered events and probably some non-fatal errors. + assert not only_errors_encountered + @unittest.skipUnless(re.match("(?i)win|linux|darwin|freebsd", sys.platform), "os") def test_process(self): """ @@ -419,7 +432,17 @@ def test_process(self): self.assertGreater(len(output), 0) found_cmdline = False + only_errors_encountered = True for evt in output: + if evt.get("error", None) is not None: + # Here, we assume that the error is non-fatal and we move forward the test execution. + # If the error is non-fatal, the test should pass with assertions. + # If the error is fatal, the test should fail. + continue + + # we've encoutered an event. Turn off the flag + only_errors_encountered = False + process = evt["system"]["process"] # Not all process will have 'cmdline' due to permission issues, # especially on Windows. Therefore we ensure at least some of @@ -442,6 +465,10 @@ def test_process(self): self.assertTrue( found_cmdline, "cmdline not found in any process events") + # If the flag is true, we've only encountered error (fatal errors) + # If the flag is false, we've encoutered events and probably some non-fatal errors. + assert not only_errors_encountered + @unittest.skipUnless(re.match("(?i)linux|darwin|freebsd", sys.platform), "os") def test_process_unix(self): """ @@ -477,7 +504,16 @@ def test_process_unix(self): found_fd = False found_env = False found_cwd = not sys.platform.startswith("linux") + only_errors_encountered = True for evt in output: + if evt.get("error", None) is not None: + # Here, we assume that the error is non-fatal and we move forward the test execution. + # If the error is non-fatal, the test should pass with assertions. + # If the error is fatal, the test should fail. + continue + # we've encoutered an event. Turn off the flag + only_errors_encountered = False + found_cwd |= "working_directory" in evt["process"] process = evt["system"]["process"] @@ -499,6 +535,9 @@ def test_process_unix(self): if not sys.platform.startswith("darwin"): self.assertTrue(found_fd, "fd not found in any process events") + # If the flag is true, we've only encountered error (fatal errors) + # If the flag is false, we've encoutered events and probably some non-fatal errors. + assert not only_errors_encountered self.assertTrue(found_env, "env not found in any process events") self.assertTrue( found_cwd, "working_directory not found in any process events") diff --git a/x-pack/libbeat/management/tests/mbtest/system/process_integration_test.go b/x-pack/libbeat/management/tests/mbtest/system/process_integration_test.go new file mode 100644 index 000000000000..660e95255582 --- /dev/null +++ b/x-pack/libbeat/management/tests/mbtest/system/process_integration_test.go @@ -0,0 +1,221 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build integration + +package tests + +import ( + "fmt" + "os" + "path/filepath" + "testing" + "time" + + "github.com/elastic/beats/v7/libbeat/common/reload" + lbmanagement "github.com/elastic/beats/v7/libbeat/management" + "github.com/elastic/beats/v7/x-pack/libbeat/management" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + "github.com/stretchr/testify/require" + + "github.com/elastic/elastic-agent-client/v7/pkg/client" + "github.com/elastic/elastic-agent-client/v7/pkg/client/mock" + "github.com/elastic/elastic-agent-client/v7/pkg/proto" + + "github.com/elastic/beats/v7/x-pack/libbeat/management/tests" + "github.com/elastic/beats/v7/x-pack/metricbeat/cmd" + + conf "github.com/elastic/elastic-agent-libs/config" +) + +func TestProcessStatusReporter(t *testing.T) { + unitOneID := mock.NewID() + unitOutID := mock.NewID() + token := mock.NewID() + + tests.InitBeatsForTest(t, cmd.RootCmd) + + filename := fmt.Sprintf("test-%d", time.Now().Unix()) + outPath := filepath.Join(t.TempDir(), filename) + t.Logf("writing output to file %s", outPath) + err := os.Mkdir(outPath, 0775) + require.NoError(t, err) + defer func() { + err := os.RemoveAll(outPath) + require.NoError(t, err) + }() + + // process with pid=-1 doesn't exist. This should degrade the input for a while + inputStreamIncorrectPid := getInputStream(unitOneID, -1, 1) + + // process with valid pid. This should change state to healthy + inputStreamCorrectPid := getInputStream(unitOneID, os.Getpid(), 2) + + outputExpectedStream := proto.UnitExpected{ + Id: unitOutID, + Type: proto.UnitType_OUTPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + Config: &proto.UnitExpectedConfig{ + DataStream: &proto.DataStream{ + Namespace: "default", + }, + Type: "file", + Revision: 1, + Meta: &proto.Meta{ + Package: &proto.Package{ + Name: "system", + Version: "1.17.0", + }, + }, + Source: tests.RequireNewStruct(map[string]interface{}{ + "type": "file", + "enabled": true, + "path": outPath, + "filename": "beat-out", + "number_of_files": 7, + }), + }, + } + + observedStates := make(chan *proto.CheckinObserved) + expectedUnits := make(chan []*proto.UnitExpected) + done := make(chan struct{}) + + server := &mock.StubServerV2{ + CheckinV2Impl: func(observed *proto.CheckinObserved) *proto.CheckinExpected { + select { + case observedStates <- observed: + return &proto.CheckinExpected{ + Units: <-expectedUnits, + } + case <-done: + return nil + } + }, + ActionImpl: func(response *proto.ActionResponse) error { + return nil + }, + } + require.NoError(t, server.Start(), "could not start V2 mock server") + defer server.Stop() + + // start the client + client := client.NewV2(fmt.Sprintf(":%d", server.Port), token, client.VersionInfo{ + Name: "program", + Meta: map[string]string{ + "key": "value", + }, + }, client.WithGRPCDialOptions(grpc.WithTransportCredentials(insecure.NewCredentials()))) + + lbmanagement.SetManagerFactory(func(cfg *conf.C, registry *reload.Registry) (lbmanagement.Manager, error) { + c := management.DefaultConfig() + if err := cfg.Unpack(&c); err != nil { + return nil, err + } + return management.NewV2AgentManagerWithClient(c, registry, client, management.WithStopOnEmptyUnits) + }) + + go func() { + t.Logf("Running beats...") + err := cmd.RootCmd.Execute() + require.NoError(t, err) + }() + + scenarios := []struct { + expectedStatus proto.State + nextInputunit *proto.UnitExpected + }{ + { + proto.State_HEALTHY, + &inputStreamIncorrectPid, + }, + { + proto.State_DEGRADED, + &inputStreamCorrectPid, + }, + { + proto.State_HEALTHY, + &inputStreamCorrectPid, + }, + // wait for one more checkin, just to be sure it's healthy + { + proto.State_HEALTHY, + &inputStreamCorrectPid, + }, + } + + timeout := 2 * time.Minute + timer := time.NewTimer(timeout) + + for id := 0; id < len(scenarios); { + select { + case observed := <-observedStates: + state := extractState(observed.GetUnits(), unitOneID) + expectedUnits <- []*proto.UnitExpected{ + scenarios[id].nextInputunit, + &outputExpectedStream, + } + if state != scenarios[id].expectedStatus { + continue + } + // always ensure that output is healthy + outputState := extractState(observed.GetUnits(), unitOutID) + require.Equal(t, outputState, proto.State_HEALTHY) + + timer.Reset(timeout) + id++ + case <-timer.C: + t.Fatalf("timeout after %s waiting for checkin", timeout) + default: + } + } +} + +func extractState(units []*proto.UnitObserved, idx string) proto.State { + for _, unit := range units { + if unit.Id == idx { + return unit.GetState() + } + } + return -1 +} + +func getInputStream(id string, pid int, stateIdx int) proto.UnitExpected { + return proto.UnitExpected{ + Id: id, + Type: proto.UnitType_INPUT, + ConfigStateIdx: uint64(stateIdx), + State: proto.State_HEALTHY, + Config: &proto.UnitExpectedConfig{ + DataStream: &proto.DataStream{ + Namespace: "default", + }, + Streams: []*proto.Stream{{ + Id: "system/metrics-system.process-default-system", + DataStream: &proto.DataStream{ + Dataset: "system.process", + Type: "metrics", + }, + Source: tests.RequireNewStruct(map[string]interface{}{ + "metricsets": []interface{}{"process"}, + "process.pid": pid, + }), + }}, + Type: "system/metrics", + Id: "system/metrics-system-default-system", + Name: "system-1", + Revision: 1, + Meta: &proto.Meta{ + Package: &proto.Package{ + Name: "system", + Version: "1.17.0", + }, + }, + }, + } +}