From 2dbb60663c8e27698156faa7f8563662ce84a3aa Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Mon, 9 Dec 2024 14:55:58 +0000 Subject: [PATCH 1/4] Remove ManagerV2 unnecessary reload when apm tracing config is nil (#41794) Remove unnecessary reload due to apm tracing config change in ManagerV2 when apm tracing config changes from nil to nil. --- CHANGELOG.next.asciidoc | 1 + x-pack/libbeat/management/managerV2.go | 13 +- x-pack/libbeat/management/managerV2_test.go | 148 +++++++++++++++++++- 3 files changed, 152 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index aacf702a4db3..76bc04ede5dd 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -115,6 +115,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Ensure Elasticsearch output can always recover from network errors {pull}40794[40794] - Add `translate_ldap_attribute` processor. {pull}41472[41472] - Remove unnecessary debug logs during idle connection teardown {issue}40824[40824] +- Remove unnecessary reload for Elastic Agent managed beats when apm tracing config changes from nil to nil {pull}41794[41794] - Fix incorrect cloud provider identification in add_cloud_metadata processor using provider priority mechanism {pull}41636[41636] - Prevent panic if libbeat processors are loaded more than once. {issue}41475[41475] {pull}41857[51857] - Fix a bug where log files are rotated on startup when interval is configured and rotateonstartup is disabled {issue}41894[41894] {pull}41895[41895] diff --git a/x-pack/libbeat/management/managerV2.go b/x-pack/libbeat/management/managerV2.go index e39b394bf2b3..288f28ae0de5 100644 --- a/x-pack/libbeat/management/managerV2.go +++ b/x-pack/libbeat/management/managerV2.go @@ -893,6 +893,13 @@ func (cm *BeatV2Manager) reloadAPM(unit *agentUnit) { apmConfig = expected.APMConfig } } + + if (cm.lastAPMCfg == nil && apmConfig == nil) || (cm.lastAPMCfg != nil && gproto.Equal(cm.lastAPMCfg, apmConfig)) { + // configuration for the APM tracing did not change; do nothing + cm.logger.Debug("Skipped reloading APM tracing; configuration didn't change") + return + } + if apmConfig == nil { // APM tracing is being stopped cm.logger.Debug("Stopping APM tracing") @@ -907,12 +914,6 @@ func (cm *BeatV2Manager) reloadAPM(unit *agentUnit) { return } - if cm.lastAPMCfg != nil && gproto.Equal(cm.lastAPMCfg, apmConfig) { - // configuration for the APM tracing did not change; do nothing - cm.logger.Debug("Skipped reloading APM tracing; configuration didn't change") - return - } - uconfig, err := conf.NewConfigFrom(apmConfig) if err != nil { cm.logger.Errorf("Failed to create uconfig from APM configuration: %s", err) diff --git a/x-pack/libbeat/management/managerV2_test.go b/x-pack/libbeat/management/managerV2_test.go index f1b32b15d82d..e7515266b0bc 100644 --- a/x-pack/libbeat/management/managerV2_test.go +++ b/x-pack/libbeat/management/managerV2_test.go @@ -13,6 +13,7 @@ import ( "time" "github.com/joeshaw/multierror" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap/zapcore" "google.golang.org/grpc" @@ -266,6 +267,141 @@ func TestManagerV2(t *testing.T) { }, 15*time.Second, 300*time.Millisecond) } +func TestManagerV2_ReloadCount(t *testing.T) { + r := reload.NewRegistry() + + output := &reloadable{} + r.MustRegisterOutput(output) + inputs := &reloadableList{} + r.MustRegisterInput(inputs) + apm := &reloadable{} + r.MustRegisterAPM(apm) + + inputConfigUpdated := make(chan struct{}) + onObserved := func(observed *proto.CheckinObserved, currentIdx int) { + if currentIdx == 1 { + period, err := inputs.Configs()[0].Config.String("period", -1) + require.NoError(t, err) + if period == "10m" { + select { + case <-inputConfigUpdated: + default: + close(inputConfigUpdated) + } + } + } + } + + agentInfo := &proto.AgentInfo{ + Id: "elastic-agent-id", + Version: version.GetDefaultVersion(), + Snapshot: true, + } + srv := integration.NewMockServer([]*proto.CheckinExpected{ + { + AgentInfo: agentInfo, + Units: []*proto.UnitExpected{ + { + Id: "output-unit", + Type: proto.UnitType_OUTPUT, + ConfigStateIdx: 1, + Config: &proto.UnitExpectedConfig{ + Id: "default", + Type: "elasticsearch", + Name: "elasticsearch", + }, + State: proto.State_HEALTHY, + LogLevel: proto.UnitLogLevel_INFO, + }, + { + Id: "input-unit-1", + Type: proto.UnitType_INPUT, + ConfigStateIdx: 1, + Config: &proto.UnitExpectedConfig{ + Id: "system/metrics-system-default-system-1", + Type: "system/metrics", + Name: "system-1", + Streams: []*proto.Stream{ + { + Id: "system/metrics-system.filesystem-default-system-1", + Source: integration.RequireNewStruct(t, map[string]interface{}{ + "metricsets": []interface{}{"filesystem"}, + "period": "1m", + }), + }, + }, + }, + State: proto.State_HEALTHY, + LogLevel: proto.UnitLogLevel_INFO, + }, + }, + Features: nil, + FeaturesIdx: 1, + }, + { + AgentInfo: agentInfo, + Units: []*proto.UnitExpected{ + { + Id: "output-unit", + Type: proto.UnitType_OUTPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + LogLevel: proto.UnitLogLevel_INFO, + }, + { + Id: "input-unit-1", + Type: proto.UnitType_INPUT, + ConfigStateIdx: 2, + Config: &proto.UnitExpectedConfig{ + Id: "system/metrics-system-default-system-1", + Type: "system/metrics", + Name: "system-1", + Streams: []*proto.Stream{ + { + Id: "system/metrics-system.filesystem-default-system-1", + Source: integration.RequireNewStruct(t, map[string]interface{}{ + "metricsets": []interface{}{"filesystem"}, + "period": "10m", + }), + }, + }, + }, + State: proto.State_HEALTHY, + LogLevel: proto.UnitLogLevel_INFO, + }, + }, + Features: nil, + FeaturesIdx: 1, + }, + }, + onObserved, + 500*time.Millisecond, + ) + require.NoError(t, srv.Start()) + defer srv.Stop() + + client := client.NewV2(fmt.Sprintf(":%d", srv.Port), "", client.VersionInfo{ + Name: "program", + Meta: map[string]string{ + "key": "value", + }, + }, client.WithGRPCDialOptions(grpc.WithTransportCredentials(insecure.NewCredentials()))) + + m, err := NewV2AgentManagerWithClient(&Config{ + Enabled: true, + }, r, client) + require.NoError(t, err) + + err = m.Start() + require.NoError(t, err) + defer m.Stop() + + <-inputConfigUpdated + assert.Equal(t, 1, output.reloadCount) // initial load + assert.Equal(t, 2, inputs.reloadCount) // initial load + config update + assert.Equal(t, 0, apm.reloadCount) // no apm tracing config applied +} + func TestOutputError(t *testing.T) { // Uncomment the line below to see the debug logs for this test // logp.DevelopmentSetup(logp.WithLevel(logp.DebugLevel), logp.WithSelectors("*")) @@ -553,19 +689,22 @@ func TestErrorPerUnit(t *testing.T) { } type reloadable struct { - mx sync.Mutex - config *reload.ConfigWithMeta + mx sync.Mutex + config *reload.ConfigWithMeta + reloadCount int } type reloadableList struct { - mx sync.Mutex - configs []*reload.ConfigWithMeta + mx sync.Mutex + configs []*reload.ConfigWithMeta + reloadCount int } func (r *reloadable) Reload(config *reload.ConfigWithMeta) error { r.mx.Lock() defer r.mx.Unlock() r.config = config + r.reloadCount++ return nil } @@ -579,6 +718,7 @@ func (r *reloadableList) Reload(configs []*reload.ConfigWithMeta) error { r.mx.Lock() defer r.mx.Unlock() r.configs = configs + r.reloadCount++ return nil } From 0827467636d1b22b4118968c15acdeff2937456f Mon Sep 17 00:00:00 2001 From: "Alex K." <8418476+fearful-symmetry@users.noreply.github.com> Date: Mon, 9 Dec 2024 07:31:17 -0800 Subject: [PATCH 2/4] Let network processor handle multiple IPs (#41918) * let network processor handle multiple IPs * add changelog * linter... * fix linter, logs, changelog * linter... * linter... * linter... * update docs * whoops * simplify logic * docs, cleanup --- CHANGELOG.next.asciidoc | 1 + libbeat/conditions/conditions_test.go | 23 ++++++++++ libbeat/conditions/network.go | 61 +++++++++++++++----------- libbeat/conditions/network_test.go | 52 ++++++++++++++++++++++ libbeat/docs/processors-using.asciidoc | 13 ++++-- 5 files changed, 121 insertions(+), 29 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 76bc04ede5dd..bb6c4df0f627 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -118,6 +118,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Remove unnecessary reload for Elastic Agent managed beats when apm tracing config changes from nil to nil {pull}41794[41794] - Fix incorrect cloud provider identification in add_cloud_metadata processor using provider priority mechanism {pull}41636[41636] - Prevent panic if libbeat processors are loaded more than once. {issue}41475[41475] {pull}41857[51857] +- Allow network condition to handle field values that are arrays of IP addresses. {pull}41918[41918] - Fix a bug where log files are rotated on startup when interval is configured and rotateonstartup is disabled {issue}41894[41894] {pull}41895[41895] *Auditbeat* diff --git a/libbeat/conditions/conditions_test.go b/libbeat/conditions/conditions_test.go index be74f0a7caa9..8abbb0694e73 100644 --- a/libbeat/conditions/conditions_test.go +++ b/libbeat/conditions/conditions_test.go @@ -105,6 +105,29 @@ var httpResponseTestEvent = &beat.Event{ }, } +var httpResponseEventIPList = &beat.Event{ + Timestamp: time.Now(), + Fields: mapstr.M{ + "@timestamp": "2024-12-05T09:51:23.642Z", + "ecs": mapstr.M{ + "version": "8.11.0", + }, + "host": mapstr.M{ + "hostname": "testhost", + "os": mapstr.M{ + "type": "linux", + "family": "debian", + "version": "11 (bullseye)", + "platform": "debian", + }, + "ip": []string{ + "10.1.0.55", + "fe80::4001:aff:fe9a:55", + }, + }, + }, +} + func testConfig(t *testing.T, expected bool, event *beat.Event, config *Config) { t.Helper() logp.TestingSetup() diff --git a/libbeat/conditions/network.go b/libbeat/conditions/network.go index 76ab2d08423c..25c5fb86e701 100644 --- a/libbeat/conditions/network.go +++ b/libbeat/conditions/network.go @@ -20,6 +20,7 @@ package conditions import ( "fmt" "net" + "slices" "strings" "github.com/elastic/elastic-agent-libs/logp" @@ -94,6 +95,24 @@ func (m multiNetworkMatcher) String() string { return strings.Join(names, " OR ") } +func makeMatcher(network string) (networkMatcher, error) { + m := singleNetworkMatcher{name: network, netContainsFunc: namedNetworks[network]} + if m.netContainsFunc == nil { + subnet, err := parseCIDR(network) + if err != nil { + return nil, err + } + m.netContainsFunc = subnet.Contains + } + return m, nil +} + +func invalidTypeError(field string, value interface{}) error { + return fmt.Errorf("network condition attempted to set "+ + "'%v' -> '%v' and encountered unexpected type '%T', only "+ + "strings or []strings are allowed", field, value, value) +} + // NewNetworkCondition builds a new Network using the given configuration. func NewNetworkCondition(fields map[string]interface{}) (*Network, error) { cond := &Network{ @@ -101,24 +120,6 @@ func NewNetworkCondition(fields map[string]interface{}) (*Network, error) { log: logp.NewLogger(logName), } - makeMatcher := func(network string) (networkMatcher, error) { - m := singleNetworkMatcher{name: network, netContainsFunc: namedNetworks[network]} - if m.netContainsFunc == nil { - subnet, err := parseCIDR(network) - if err != nil { - return nil, err - } - m.netContainsFunc = subnet.Contains - } - return m, nil - } - - invalidTypeError := func(field string, value interface{}) error { - return fmt.Errorf("network condition attempted to set "+ - "'%v' -> '%v' and encountered unexpected type '%T', only "+ - "strings or []strings are allowed", field, value, value) - } - for field, value := range mapstr.M(fields).Flatten() { switch v := value.(type) { case string: @@ -157,15 +158,17 @@ func (c *Network) Check(event ValuesMap) bool { return false } - ip := extractIP(value) - if ip == nil { + ipList := extractIP(value) + if len(ipList) == 0 { c.log.Debugf("Invalid IP address in field=%v for network condition", field) return false } - - if !network.Contains(ip) { + // match on an "any" basis when we find multiple IPs in the event; + // if the network matcher returns true for any seen IP, consider it a match + if !slices.ContainsFunc(ipList, network.Contains) { return false } + } return true @@ -202,12 +205,20 @@ func parseCIDR(value string) (*net.IPNet, error) { // extractIP return an IP address if unk is an IP address string or a net.IP. // Otherwise it returns nil. -func extractIP(unk interface{}) net.IP { +func extractIP(unk interface{}) []net.IP { switch v := unk.(type) { case string: - return net.ParseIP(v) - case net.IP: + return []net.IP{net.ParseIP(v)} + case []net.IP: return v + case net.IP: + return []net.IP{v} + case []string: + parsed := make([]net.IP, len(v)) + for i, rawIP := range v { + parsed[i] = net.ParseIP(rawIP) + } + return parsed default: return nil } diff --git a/libbeat/conditions/network_test.go b/libbeat/conditions/network_test.go index b79568098e42..71ab0918e675 100644 --- a/libbeat/conditions/network_test.go +++ b/libbeat/conditions/network_test.go @@ -79,6 +79,26 @@ network: testYAMLConfig(t, true, evt, yaml) }) + + t.Run("IP list", func(t *testing.T) { + const yaml = ` +network: + ip: + client: [loopback] + server: [loopback] + host: 10.10.0.0/8 +` + + evt := &beat.Event{Fields: mapstr.M{ + "ip": mapstr.M{ + "client": "127.0.0.1", + "server": "127.0.0.1", + "host": []string{"10.10.0.83", "fe80::4001:aff:fe9a:53"}, + }, + }} + + testYAMLConfig(t, true, evt, yaml) + }) } func TestNetworkCreate(t *testing.T) { @@ -166,6 +186,22 @@ func TestNetworkCheck(t *testing.T) { }) }) + t.Run("multiple IPs field single match", func(t *testing.T) { + testConfig(t, true, httpResponseEventIPList, &Config{ + Network: map[string]interface{}{ + "host.ip": "10.1.0.0/24", + }, + }) + }) + + t.Run("multiple IPs field negative match", func(t *testing.T) { + testConfig(t, false, httpResponseEventIPList, &Config{ + Network: map[string]interface{}{ + "host.ip": "127.0.0.0/24", + }, + }) + }) + // Multiple conditions are treated as an implicit AND. t.Run("multiple fields negative match", func(t *testing.T) { testConfig(t, false, httpResponseTestEvent, &Config{ @@ -191,6 +227,22 @@ func TestNetworkCheck(t *testing.T) { }, }) }) + + t.Run("multiple values multiple IPs match", func(t *testing.T) { + testConfig(t, true, httpResponseEventIPList, &Config{ + Network: map[string]interface{}{ + "host.ip": []interface{}{"10.1.0.0/24", "127.0.0.0/24"}, + }, + }) + }) + + t.Run("multiple values multiple IPs no match", func(t *testing.T) { + testConfig(t, false, httpResponseEventIPList, &Config{ + Network: map[string]interface{}{ + "host.ip": []interface{}{"12.1.0.0/24", "127.0.0.0/24"}, + }, + }) + }) } func TestNetworkPrivate(t *testing.T) { diff --git a/libbeat/docs/processors-using.asciidoc b/libbeat/docs/processors-using.asciidoc index dd91ea8d5db6..a029f5f2ea8b 100644 --- a/libbeat/docs/processors-using.asciidoc +++ b/libbeat/docs/processors-using.asciidoc @@ -311,10 +311,15 @@ range: [[condition-network]] ===== `network` -The `network` condition checks if the field is in a certain IP network range. -Both IPv4 and IPv6 addresses are supported. The network range may be specified -using CIDR notation, like "192.0.2.0/24" or "2001:db8::/32", or by using one of -these named ranges: +The `network` condition checks whether a field's value falls within a specified +IP network range. If multiple fields are provided, each field value must match +its corresponding network range. You can specify multiple network ranges for a +single field, and a match occurs if any one of the ranges matches. If the field +value is an array of IPs, it will match if any of the IPs fall within any of the +given ranges. Both IPv4 and IPv6 addresses are supported. + +The network range may be specified using CIDR notation, like "192.0.2.0/24" or +"2001:db8::/32", or by using one of these named ranges: - `loopback` - Matches loopback addresses in the range of `127.0.0.0/8` or `::1/128`. From da6822b047abc974b153b5f9190345dd4983617d Mon Sep 17 00:00:00 2001 From: Lee E Hinman <57081003+leehinman@users.noreply.github.com> Date: Mon, 9 Dec 2024 09:58:17 -0600 Subject: [PATCH 3/4] Add unique per beat monitoring namespace (#41939) --- filebeat/beater/filebeat.go | 14 ++++++-- heartbeat/monitors/mocks.go | 9 ++--- libbeat/beat/info.go | 7 ++-- libbeat/cmd/instance/beat.go | 36 +++++++++++++------ x-pack/filebeat/input/benchmark/input.go | 10 +++--- .../scenarios/framework/framework.go | 23 ++++++------ 6 files changed, 60 insertions(+), 39 deletions(-) diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index ceab21aa3590..4909941b90ae 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -272,10 +272,18 @@ func (fb *Filebeat) Run(b *beat.Beat) error { waitEvents := newSignalWait() // count active events for waiting on shutdown + var reg *monitoring.Registry + + if b.Info.Monitoring.Namespace != nil { + reg = b.Info.Monitoring.Namespace.GetRegistry().GetRegistry("stats") + if reg == nil { + reg = b.Info.Monitoring.Namespace.GetRegistry().NewRegistry("stats") + } + } wgEvents := &eventCounter{ - count: monitoring.NewInt(nil, "filebeat.events.active"), // Gauge - added: monitoring.NewUint(nil, "filebeat.events.added"), - done: monitoring.NewUint(nil, "filebeat.events.done"), + count: monitoring.NewInt(reg, "filebeat.events.active"), // Gauge + added: monitoring.NewUint(reg, "filebeat.events.added"), + done: monitoring.NewUint(reg, "filebeat.events.done"), } finishedLogger := newFinishedLogger(wgEvents) diff --git a/heartbeat/monitors/mocks.go b/heartbeat/monitors/mocks.go index c172d24464c8..77dee19858a7 100644 --- a/heartbeat/monitors/mocks.go +++ b/heartbeat/monitors/mocks.go @@ -60,12 +60,8 @@ func makeMockFactory(pluginsReg *plugin.PluginsReg) (factory *RunnerFactory, sch EphemeralID: eid, FirstStart: time.Now(), StartTime: time.Now(), - Monitoring: struct { - DefaultUsername string - }{ - DefaultUsername: "test", - }, } + info.Monitoring.DefaultUsername = "test" sched = scheduler.Create( 1, @@ -246,7 +242,8 @@ func mockPluginBuilder() (plugin.PluginFactory, *atomic.Int, *atomic.Int) { return plugin.Plugin{Jobs: j, DoClose: closer, Endpoints: 1}, nil }, - Stats: plugin.NewPluginCountersRecorder("test", reg)}, + Stats: plugin.NewPluginCountersRecorder("test", reg), + }, built, closed } diff --git a/libbeat/beat/info.go b/libbeat/beat/info.go index 314597abbb56..7c3b5c0d90f8 100644 --- a/libbeat/beat/info.go +++ b/libbeat/beat/info.go @@ -22,6 +22,8 @@ import ( "github.com/gofrs/uuid/v5" "go.opentelemetry.io/collector/consumer" + + "github.com/elastic/elastic-agent-libs/monitoring" ) // Info stores a beats instance meta data. @@ -41,9 +43,10 @@ type Info struct { // Monitoring-related fields Monitoring struct { - DefaultUsername string // The default username to be used to connect to Elasticsearch Monitoring + DefaultUsername string // The default username to be used to connect to Elasticsearch Monitoring + Namespace *monitoring.Namespace // a monitor namespace that is unique per beat instance } - LogConsumer consumer.Logs //otel log consumer + LogConsumer consumer.Logs // otel log consumer } diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index 6332ebac39b5..2d1eb3a20f02 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -337,6 +337,8 @@ func NewBeatReceiver(settings Settings, receiverConfig map[string]interface{}, c config.OverwriteConfigOpts(configOpts(store)) } + b.Beat.Info.Monitoring.Namespace = monitoring.GetNamespace(b.Info.Beat + "-" + b.Info.ID.String()) + instrumentation, err := instrumentation.New(cfg, b.Info.Beat, b.Info.Version) if err != nil { return nil, fmt.Errorf("error setting up instrumentation: %w", err) @@ -469,11 +471,6 @@ func NewBeatReceiver(settings Settings, receiverConfig map[string]interface{}, c return nil, fmt.Errorf("error creating processors: %w", err) } - reg := monitoring.Default.GetRegistry(b.Info.Name) - if reg == nil { - reg = monitoring.Default.NewRegistry(b.Info.Name) - } - // This should be replaced with static config for otel consumer // but need to figure out if we want the Queue settings from here. outputEnabled := b.Config.Output.IsSet() && b.Config.Output.Config().Enabled() @@ -485,12 +482,14 @@ func NewBeatReceiver(settings Settings, receiverConfig map[string]interface{}, c } } - tel := reg.GetRegistry("state") + uniq_reg := b.Beat.Info.Monitoring.Namespace.GetRegistry() + + tel := uniq_reg.GetRegistry("state") if tel == nil { - tel = reg.NewRegistry("state") + tel = uniq_reg.NewRegistry("state") } monitors := pipeline.Monitors{ - Metrics: reg, + Metrics: uniq_reg, Telemetry: tel, Logger: logp.L().Named("publisher"), Tracer: b.Instrumentation.Tracer(), @@ -510,7 +509,6 @@ func NewBeatReceiver(settings Settings, receiverConfig map[string]interface{}, c b.Publisher = publisher return b, nil - } // InitWithSettings does initialization of things common to all actions (read confs, flags) @@ -831,11 +829,27 @@ func (b *Beat) RegisterHostname(useFQDN bool) { hostname := b.Info.FQDNAwareHostname(useFQDN) // info.hostname - infoRegistry := monitoring.GetNamespace("info").GetRegistry() + var infoRegistry *monitoring.Registry + if b.Info.Monitoring.Namespace != nil { + infoRegistry = b.Info.Monitoring.Namespace.GetRegistry().GetRegistry("info") + if infoRegistry == nil { + infoRegistry = b.Info.Monitoring.Namespace.GetRegistry().NewRegistry("info") + } + } else { + infoRegistry = monitoring.GetNamespace("info").GetRegistry() + } monitoring.NewString(infoRegistry, "hostname").Set(hostname) // state.host - stateRegistry := monitoring.GetNamespace("state").GetRegistry() + var stateRegistry *monitoring.Registry + if b.Info.Monitoring.Namespace != nil { + stateRegistry = b.Info.Monitoring.Namespace.GetRegistry().GetRegistry("state") + if stateRegistry == nil { + stateRegistry = b.Info.Monitoring.Namespace.GetRegistry().NewRegistry("state") + } + } else { + stateRegistry = monitoring.GetNamespace("state").GetRegistry() + } monitoring.NewFunc(stateRegistry, "host", host.ReportInfo(hostname), monitoring.Report) } diff --git a/x-pack/filebeat/input/benchmark/input.go b/x-pack/filebeat/input/benchmark/input.go index dd6d198cc409..e098d3e746bc 100644 --- a/x-pack/filebeat/input/benchmark/input.go +++ b/x-pack/filebeat/input/benchmark/input.go @@ -60,7 +60,7 @@ func (bi *benchmarkInput) Test(ctx v2.TestContext) error { // Run starts the data generation. func (bi *benchmarkInput) Run(ctx v2.Context, publisher stateless.Publisher) error { var wg sync.WaitGroup - metrics := newInputMetrics(ctx.ID) + metrics := newInputMetrics(ctx) for i := uint8(0); i < bi.cfg.Threads; i++ { wg.Add(1) @@ -103,8 +103,8 @@ func runThread(ctx v2.Context, publisher stateless.Publisher, thread uint8, cfg ticker.Stop() return case <-ticker.C: - //don't want to block on filling doPublish channel - //so only send as many as it can hold right now + // don't want to block on filling doPublish channel + // so only send as many as it can hold right now numToSend := cap(pubChan) - len(pubChan) for i := 0; i < numToSend; i++ { pubChan <- true @@ -157,8 +157,8 @@ type inputMetrics struct { } // newInputMetrics returns an input metric for the benchmark processor. -func newInputMetrics(id string) *inputMetrics { - reg, unreg := inputmon.NewInputRegistry(inputName, id, nil) +func newInputMetrics(ctx v2.Context) *inputMetrics { + reg, unreg := inputmon.NewInputRegistry(inputName, ctx.ID, ctx.Agent.Monitoring.Namespace.GetRegistry()) out := &inputMetrics{ unregister: unreg, eventsPublished: monitoring.NewUint(reg, "events_published_total"), diff --git a/x-pack/heartbeat/scenarios/framework/framework.go b/x-pack/heartbeat/scenarios/framework/framework.go index a2fb77e63070..6119f549e997 100644 --- a/x-pack/heartbeat/scenarios/framework/framework.go +++ b/x-pack/heartbeat/scenarios/framework/framework.go @@ -30,11 +30,13 @@ import ( beatversion "github.com/elastic/beats/v7/libbeat/version" ) -type ScenarioRun func(t *testing.T) (config mapstr.M, meta ScenarioRunMeta, close func(), err error) -type ScenarioRunMeta struct { - URL *url.URL - Status monitorstate.StateStatus -} +type ( + ScenarioRun func(t *testing.T) (config mapstr.M, meta ScenarioRunMeta, close func(), err error) + ScenarioRunMeta struct { + URL *url.URL + Status monitorstate.StateStatus + } +) type Scenario struct { Name string @@ -155,7 +157,6 @@ func NewScenarioDB() *ScenarioDB { ByTag: map[string][]Scenario{}, All: []Scenario{}, } - } func (sdb *ScenarioDB) Init() { @@ -250,7 +251,9 @@ func runMonitorOnce(t *testing.T, monitorConfig mapstr.M, meta ScenarioRunMeta, mIface, err := f.Create(pipe, conf) require.NoError(t, err) - mtr.monitor = mIface.(*monitors.Monitor) + mon, ok := mIface.(*monitors.Monitor) + require.True(t, ok, "type assertion didn't succeed") + mtr.monitor = mon require.NotNil(t, mtr.monitor, "could not convert to monitor %v", mIface) mtr.Events = pipe.PublishedEvents @@ -281,12 +284,8 @@ func setupFactoryAndSched(location *hbconfig.LocationWithID, stateLoader monitor EphemeralID: eid, FirstStart: time.Now(), StartTime: time.Now(), - Monitoring: struct { - DefaultUsername string - }{ - DefaultUsername: "test", - }, } + info.Monitoring.DefaultUsername = "test" sched = scheduler.Create( 1, From f86a5f07fbeab811f6e8c80f0350c2b7be873603 Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Mon, 9 Dec 2024 14:37:33 -0300 Subject: [PATCH 4/4] metricbeat: handle nf_conntrack module not loaded in linux integration (#41930) * metricbeat: handle nf_conntrack module not loaded in linux integration * ci lint fixes * fix error check to prevent windows failure --- .../module/linux/conntrack/conntrack.go | 9 +++++++- .../module/linux/conntrack/conntrack_test.go | 22 +++++++++++++++++++ 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/metricbeat/module/linux/conntrack/conntrack.go b/metricbeat/module/linux/conntrack/conntrack.go index 602a786b5742..42c2114f92ce 100644 --- a/metricbeat/module/linux/conntrack/conntrack.go +++ b/metricbeat/module/linux/conntrack/conntrack.go @@ -19,6 +19,7 @@ package conntrack import ( "fmt" + "os" "github.com/prometheus/procfs" @@ -50,7 +51,10 @@ type MetricSet struct { func New(base mb.BaseMetricSet) (mb.MetricSet, error) { cfgwarn.Beta("The linux conntrack metricset is beta.") - sys := base.Module().(resolve.Resolver) + sys, ok := base.Module().(resolve.Resolver) + if !ok { + return nil, fmt.Errorf("unexpected module type: %T", base.Module()) + } return &MetricSet{ BaseMetricSet: base, @@ -68,6 +72,9 @@ func (m *MetricSet) Fetch(report mb.ReporterV2) error { } conntrackStats, err := newFS.ConntrackStat() if err != nil { + if os.IsNotExist(err) { + err = mb.PartialMetricsError{Err: fmt.Errorf("nf_conntrack kernel module not loaded: %w", err)} + } return fmt.Errorf("error fetching conntrack stats: %w", err) } diff --git a/metricbeat/module/linux/conntrack/conntrack_test.go b/metricbeat/module/linux/conntrack/conntrack_test.go index cc3b2a052306..0d1ad2a46775 100644 --- a/metricbeat/module/linux/conntrack/conntrack_test.go +++ b/metricbeat/module/linux/conntrack/conntrack_test.go @@ -18,10 +18,15 @@ package conntrack import ( + "errors" + "os" + "path/filepath" "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/elastic/beats/v7/metricbeat/mb" mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing" _ "github.com/elastic/beats/v7/metricbeat/module/linux" "github.com/elastic/elastic-agent-libs/mapstr" @@ -60,6 +65,23 @@ func TestFetch(t *testing.T) { assert.Equal(t, testConn, rawEvent) } +func TestFetchConntrackModuleNotLoaded(t *testing.T) { + // Create a temporary directory to simulate a missing /proc/net/stat/nf_conntrack file + tmpDir := t.TempDir() + assert.NoError(t, os.MkdirAll(filepath.Join(tmpDir, "proc/net/stat"), 0755)) + c := getConfig() + c["hostfs"] = tmpDir + + f := mbtest.NewReportingMetricSetV2Error(t, c) + events, errs := mbtest.ReportingFetchV2Error(f) + + require.Len(t, errs, 1) + err := errors.Join(errs...) + assert.ErrorAs(t, err, &mb.PartialMetricsError{}) + assert.Contains(t, err.Error(), "error fetching conntrack stats: nf_conntrack kernel module not loaded") + require.Empty(t, events) +} + func getConfig() map[string]interface{} { return map[string]interface{}{ "module": "linux",