From a85cd029d776993045623fa663121df74921428d Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Tue, 26 Nov 2024 16:19:24 +0000 Subject: [PATCH 1/4] Assert reload count --- x-pack/libbeat/management/managerV2_test.go | 148 +++++++++++++++++++- 1 file changed, 144 insertions(+), 4 deletions(-) diff --git a/x-pack/libbeat/management/managerV2_test.go b/x-pack/libbeat/management/managerV2_test.go index f1b32b15d82d..fd4f48d6543f 100644 --- a/x-pack/libbeat/management/managerV2_test.go +++ b/x-pack/libbeat/management/managerV2_test.go @@ -13,6 +13,7 @@ import ( "time" "github.com/joeshaw/multierror" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap/zapcore" "google.golang.org/grpc" @@ -266,6 +267,141 @@ func TestManagerV2(t *testing.T) { }, 15*time.Second, 300*time.Millisecond) } +func TestManagerV2_ReloadCount(t *testing.T) { + r := reload.NewRegistry() + + output := &reloadable{} + r.MustRegisterOutput(output) + inputs := &reloadableList{} + r.MustRegisterInput(inputs) + apm := &reloadable{} + r.MustRegisterAPM(apm) + + inputConfigUpdated := make(chan struct{}) + onObserved := func(observed *proto.CheckinObserved, currentIdx int) { + if currentIdx == 1 { + period, err := inputs.Configs()[0].Config.String("period", -1) + require.NoError(t, err) + if period == "10m" { + select { + case <-inputConfigUpdated: + default: + close(inputConfigUpdated) + } + } + } + } + + agentInfo := &proto.AgentInfo{ + Id: "elastic-agent-id", + Version: version.GetDefaultVersion(), + Snapshot: true, + } + srv := integration.NewMockServer([]*proto.CheckinExpected{ + { + AgentInfo: agentInfo, + Units: []*proto.UnitExpected{ + { + Id: "output-unit", + Type: proto.UnitType_OUTPUT, + ConfigStateIdx: 1, + Config: &proto.UnitExpectedConfig{ + Id: "default", + Type: "elasticsearch", + Name: "elasticsearch", + }, + State: proto.State_HEALTHY, + LogLevel: proto.UnitLogLevel_INFO, + }, + { + Id: "input-unit-1", + Type: proto.UnitType_INPUT, + ConfigStateIdx: 1, + Config: &proto.UnitExpectedConfig{ + Id: "system/metrics-system-default-system-1", + Type: "system/metrics", + Name: "system-1", + Streams: []*proto.Stream{ + { + Id: "system/metrics-system.filesystem-default-system-1", + Source: integration.RequireNewStruct(t, map[string]interface{}{ + "metricsets": []interface{}{"filesystem"}, + "period": "1m", + }), + }, + }, + }, + State: proto.State_HEALTHY, + LogLevel: proto.UnitLogLevel_INFO, + }, + }, + Features: nil, + FeaturesIdx: 1, + }, + { + AgentInfo: agentInfo, + Units: []*proto.UnitExpected{ + { + Id: "output-unit", + Type: proto.UnitType_OUTPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + LogLevel: proto.UnitLogLevel_INFO, + }, + { + Id: "input-unit-1", + Type: proto.UnitType_INPUT, + ConfigStateIdx: 2, + Config: &proto.UnitExpectedConfig{ + Id: "system/metrics-system-default-system-1", + Type: "system/metrics", + Name: "system-1", + Streams: []*proto.Stream{ + { + Id: "system/metrics-system.filesystem-default-system-1", + Source: integration.RequireNewStruct(t, map[string]interface{}{ + "metricsets": []interface{}{"filesystem"}, + "period": "10m", + }), + }, + }, + }, + State: proto.State_HEALTHY, + LogLevel: proto.UnitLogLevel_INFO, + }, + }, + Features: nil, + FeaturesIdx: 1, + }, + }, + onObserved, + 500*time.Millisecond, + ) + require.NoError(t, srv.Start()) + defer srv.Stop() + + client := client.NewV2(fmt.Sprintf(":%d", srv.Port), "", client.VersionInfo{ + Name: "program", + Meta: map[string]string{ + "key": "value", + }, + }, client.WithGRPCDialOptions(grpc.WithTransportCredentials(insecure.NewCredentials()))) + + m, err := NewV2AgentManagerWithClient(&Config{ + Enabled: true, + }, r, client) + require.NoError(t, err) + + err = m.Start() + require.NoError(t, err) + defer m.Stop() + + <-inputConfigUpdated + assert.Equal(t, 1, output.reloadCount) + assert.Equal(t, 2, inputs.reloadCount) + assert.Equal(t, 0, apm.reloadCount) +} + func TestOutputError(t *testing.T) { // Uncomment the line below to see the debug logs for this test // logp.DevelopmentSetup(logp.WithLevel(logp.DebugLevel), logp.WithSelectors("*")) @@ -553,19 +689,22 @@ func TestErrorPerUnit(t *testing.T) { } type reloadable struct { - mx sync.Mutex - config *reload.ConfigWithMeta + mx sync.Mutex + config *reload.ConfigWithMeta + reloadCount int } type reloadableList struct { - mx sync.Mutex - configs []*reload.ConfigWithMeta + mx sync.Mutex + configs []*reload.ConfigWithMeta + reloadCount int } func (r *reloadable) Reload(config *reload.ConfigWithMeta) error { r.mx.Lock() defer r.mx.Unlock() r.config = config + r.reloadCount++ return nil } @@ -579,6 +718,7 @@ func (r *reloadableList) Reload(configs []*reload.ConfigWithMeta) error { r.mx.Lock() defer r.mx.Unlock() r.configs = configs + r.reloadCount++ return nil } From e9cbcc89e6167233335cb4f348f26fd9e2ef5359 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Tue, 26 Nov 2024 16:25:10 +0000 Subject: [PATCH 2/4] Remove extra reload --- x-pack/libbeat/management/managerV2.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/x-pack/libbeat/management/managerV2.go b/x-pack/libbeat/management/managerV2.go index e39b394bf2b3..288f28ae0de5 100644 --- a/x-pack/libbeat/management/managerV2.go +++ b/x-pack/libbeat/management/managerV2.go @@ -893,6 +893,13 @@ func (cm *BeatV2Manager) reloadAPM(unit *agentUnit) { apmConfig = expected.APMConfig } } + + if (cm.lastAPMCfg == nil && apmConfig == nil) || (cm.lastAPMCfg != nil && gproto.Equal(cm.lastAPMCfg, apmConfig)) { + // configuration for the APM tracing did not change; do nothing + cm.logger.Debug("Skipped reloading APM tracing; configuration didn't change") + return + } + if apmConfig == nil { // APM tracing is being stopped cm.logger.Debug("Stopping APM tracing") @@ -907,12 +914,6 @@ func (cm *BeatV2Manager) reloadAPM(unit *agentUnit) { return } - if cm.lastAPMCfg != nil && gproto.Equal(cm.lastAPMCfg, apmConfig) { - // configuration for the APM tracing did not change; do nothing - cm.logger.Debug("Skipped reloading APM tracing; configuration didn't change") - return - } - uconfig, err := conf.NewConfigFrom(apmConfig) if err != nil { cm.logger.Errorf("Failed to create uconfig from APM configuration: %s", err) From e2281a0620906960050f7b5585cc607ae35bb7ec Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Tue, 26 Nov 2024 16:30:00 +0000 Subject: [PATCH 3/4] Add comment --- x-pack/libbeat/management/managerV2_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/x-pack/libbeat/management/managerV2_test.go b/x-pack/libbeat/management/managerV2_test.go index fd4f48d6543f..e7515266b0bc 100644 --- a/x-pack/libbeat/management/managerV2_test.go +++ b/x-pack/libbeat/management/managerV2_test.go @@ -397,9 +397,9 @@ func TestManagerV2_ReloadCount(t *testing.T) { defer m.Stop() <-inputConfigUpdated - assert.Equal(t, 1, output.reloadCount) - assert.Equal(t, 2, inputs.reloadCount) - assert.Equal(t, 0, apm.reloadCount) + assert.Equal(t, 1, output.reloadCount) // initial load + assert.Equal(t, 2, inputs.reloadCount) // initial load + config update + assert.Equal(t, 0, apm.reloadCount) // no apm tracing config applied } func TestOutputError(t *testing.T) { From 27a3768acf9075efc4ba953da209db37fd562783 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Tue, 26 Nov 2024 16:32:40 +0000 Subject: [PATCH 4/4] Add changelog --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 9bf2698243bb..141e7b03fd7f 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -112,6 +112,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Ensure Elasticsearch output can always recover from network errors {pull}40794[40794] - Add `translate_ldap_attribute` processor. {pull}41472[41472] - Remove unnecessary debug logs during idle connection teardown {issue}40824[40824] +- Remove unnecessary reload for Elastic Agent managed beats when apm tracing config changes from nil to nil {pull}41794[41794] *Auditbeat*