From 3b49a936b6933cec15e64ebc23b1fd73cf65293b Mon Sep 17 00:00:00 2001 From: Lee E Hinman <57081003+leehinman@users.noreply.github.com> Date: Thu, 29 Aug 2024 07:48:39 -0500 Subject: [PATCH] refactor libbeat/common/reload to not use global variable (#40622) * refactor libbeat/common/reload to not use global variable --- filebeat/beater/filebeat.go | 3 +-- heartbeat/beater/heartbeat.go | 2 +- libbeat/beat/beat.go | 3 ++- libbeat/cmd/instance/beat.go | 7 ++++--- libbeat/common/reload/reload.go | 3 --- metricbeat/beater/metricbeat.go | 3 +-- packetbeat/beater/packetbeat.go | 3 +-- x-pack/osquerybeat/beater/osquerybeat.go | 2 +- x-pack/osquerybeat/internal/config/watcher.go | 4 ++-- 9 files changed, 13 insertions(+), 17 deletions(-) diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index 94f50127faaa..9d9cb220d4eb 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -39,7 +39,6 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/cfgfile" "github.com/elastic/beats/v7/libbeat/common/cfgwarn" - "github.com/elastic/beats/v7/libbeat/common/reload" "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" "github.com/elastic/beats/v7/libbeat/management" "github.com/elastic/beats/v7/libbeat/monitoring/inputmon" @@ -405,7 +404,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error { // Register reloadable list of inputs and modules inputs := cfgfile.NewRunnerList(management.DebugK, inputLoader, fb.pipeline) - reload.RegisterV2.MustRegisterInput(inputs) + b.Registry.MustRegisterInput(inputs) modules := cfgfile.NewRunnerList(management.DebugK, moduleLoader, fb.pipeline) diff --git a/heartbeat/beater/heartbeat.go b/heartbeat/beater/heartbeat.go index e4e5a5129a8c..9a849f6bc7e7 100644 --- a/heartbeat/beater/heartbeat.go +++ b/heartbeat/beater/heartbeat.go @@ -286,7 +286,7 @@ func (bt *Heartbeat) RunCentralMgmtMonitors(b *beat.Beat) { }) inputs := cfgfile.NewRunnerList(management.DebugK, bt.monitorFactory, b.Publisher) - reload.RegisterV2.MustRegisterInput(inputs) + b.Registry.MustRegisterInput(inputs) } // RunReloadableMonitors runs the `heartbeat.config.monitors` portion of the yaml config if present. diff --git a/libbeat/beat/beat.go b/libbeat/beat/beat.go index 96da5556f96c..bb7e09dde257 100644 --- a/libbeat/beat/beat.go +++ b/libbeat/beat/beat.go @@ -84,7 +84,8 @@ type Beat struct { Instrumentation instrumentation.Instrumentation // instrumentation holds an APM agent for capturing and reporting traces - API *api.Server // API server. This is nil unless the http endpoint is enabled. + API *api.Server // API server. This is nil unless the http endpoint is enabled. + Registry *reload.Registry // input, & output registry for configuration manager, should be instantiated in NewBeat } // GenerateUserAgent populates the UserAgent field on the beat.Info struct diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index 936ecf0caaa0..42f377a76e74 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -283,7 +283,8 @@ func NewBeat(name, indexPrefix, v string, elasticLicensed bool, initFuncs []func StartTime: time.Now(), EphemeralID: eid, }, - Fields: fields, + Fields: fields, + Registry: reload.NewRegistry(), } return &Beat{Beat: b}, nil @@ -405,7 +406,7 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) { return nil, fmt.Errorf("error initializing publisher: %w", err) } - reload.RegisterV2.MustRegisterOutput(b.makeOutputReloader(publisher.OutputReloader())) + b.Registry.MustRegisterOutput(b.makeOutputReloader(publisher.OutputReloader())) b.Publisher = publisher beater, err := bt(&b.Beat, sub) @@ -861,7 +862,7 @@ func (b *Beat) configure(settings Settings) error { } // initialize config manager - m, err := management.NewManager(b.Config.Management, reload.RegisterV2) + m, err := management.NewManager(b.Config.Management, b.Registry) if err != nil { return err } diff --git a/libbeat/common/reload/reload.go b/libbeat/common/reload/reload.go index d029f473f17c..66c716a5ddae 100644 --- a/libbeat/common/reload/reload.go +++ b/libbeat/common/reload/reload.go @@ -26,9 +26,6 @@ import ( "github.com/elastic/elastic-agent-libs/mapstr" ) -// RegisterV2 is the special registry used for the V2 controller -var RegisterV2 = NewRegistry() - // InputRegName is the registation name for V2 inputs const InputRegName = "input" diff --git a/metricbeat/beater/metricbeat.go b/metricbeat/beater/metricbeat.go index ec307e2ff5bd..bcb9a893a873 100644 --- a/metricbeat/beater/metricbeat.go +++ b/metricbeat/beater/metricbeat.go @@ -24,7 +24,6 @@ import ( "github.com/elastic/beats/v7/libbeat/autodiscover" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/cfgfile" - "github.com/elastic/beats/v7/libbeat/common/reload" "github.com/elastic/beats/v7/libbeat/management" "github.com/elastic/beats/v7/libbeat/monitoring/inputmon" "github.com/elastic/beats/v7/metricbeat/mb" @@ -248,7 +247,7 @@ func (bt *Metricbeat) Run(b *beat.Beat) error { // Centrally managed modules factory := module.NewFactory(b.Info, bt.registry, bt.moduleOptions...) modules := cfgfile.NewRunnerList(management.DebugK, factory, b.Publisher) - reload.RegisterV2.MustRegisterInput(modules) + b.Registry.MustRegisterInput(modules) wg.Add(1) go func() { defer wg.Done() diff --git a/packetbeat/beater/packetbeat.go b/packetbeat/beater/packetbeat.go index d8c223f17892..6495a7333797 100644 --- a/packetbeat/beater/packetbeat.go +++ b/packetbeat/beater/packetbeat.go @@ -24,7 +24,6 @@ import ( "time" "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/common/reload" "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" "github.com/elastic/beats/v7/libbeat/management" "github.com/elastic/beats/v7/libbeat/monitoring/inputmon" @@ -209,7 +208,7 @@ func (pb *packetbeat) runStatic(b *beat.Beat, factory *processorFactory) error { // the runner by starting the beat's manager. It returns on the first fatal error. func (pb *packetbeat) runManaged(b *beat.Beat, factory *processorFactory) error { runner := newReloader(management.DebugK, factory, b.Publisher) - reload.RegisterV2.MustRegisterInput(runner) + b.Registry.MustRegisterInput(runner) logp.Debug("main", "Waiting for the runner to finish") // Start the manager after all the hooks are registered and terminates when diff --git a/x-pack/osquerybeat/beater/osquerybeat.go b/x-pack/osquerybeat/beater/osquerybeat.go index b4fe30a47d88..119d72cdd5af 100644 --- a/x-pack/osquerybeat/beater/osquerybeat.go +++ b/x-pack/osquerybeat/beater/osquerybeat.go @@ -142,7 +142,7 @@ func (bt *osquerybeat) Run(b *beat.Beat) error { defer bt.close() // Watch input configuration updates - inputConfigCh := config.WatchInputs(ctx, bt.log) + inputConfigCh := config.WatchInputs(ctx, bt.log, b.Registry) // Install osqueryd if needed err = installOsquery(ctx) diff --git a/x-pack/osquerybeat/internal/config/watcher.go b/x-pack/osquerybeat/internal/config/watcher.go index 4b4aebdcd316..59c720693189 100644 --- a/x-pack/osquerybeat/internal/config/watcher.go +++ b/x-pack/osquerybeat/internal/config/watcher.go @@ -58,14 +58,14 @@ func (r *reloader) Reload(configs []*reload.ConfigWithMeta) error { return nil } -func WatchInputs(ctx context.Context, log *logp.Logger) <-chan []InputConfig { +func WatchInputs(ctx context.Context, log *logp.Logger, registry *reload.Registry) <-chan []InputConfig { ch := make(chan []InputConfig) r := &reloader{ ctx: ctx, log: log, ch: ch, } - reload.RegisterV2.MustRegisterInput(r) + registry.MustRegisterInput(r) return ch }