diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index ceab21aa359..4909941b90a 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -272,10 +272,18 @@ func (fb *Filebeat) Run(b *beat.Beat) error { waitEvents := newSignalWait() // count active events for waiting on shutdown + var reg *monitoring.Registry + + if b.Info.Monitoring.Namespace != nil { + reg = b.Info.Monitoring.Namespace.GetRegistry().GetRegistry("stats") + if reg == nil { + reg = b.Info.Monitoring.Namespace.GetRegistry().NewRegistry("stats") + } + } wgEvents := &eventCounter{ - count: monitoring.NewInt(nil, "filebeat.events.active"), // Gauge - added: monitoring.NewUint(nil, "filebeat.events.added"), - done: monitoring.NewUint(nil, "filebeat.events.done"), + count: monitoring.NewInt(reg, "filebeat.events.active"), // Gauge + added: monitoring.NewUint(reg, "filebeat.events.added"), + done: monitoring.NewUint(reg, "filebeat.events.done"), } finishedLogger := newFinishedLogger(wgEvents) diff --git a/heartbeat/monitors/mocks.go b/heartbeat/monitors/mocks.go index c172d24464c..77dee19858a 100644 --- a/heartbeat/monitors/mocks.go +++ b/heartbeat/monitors/mocks.go @@ -60,12 +60,8 @@ func makeMockFactory(pluginsReg *plugin.PluginsReg) (factory *RunnerFactory, sch EphemeralID: eid, FirstStart: time.Now(), StartTime: time.Now(), - Monitoring: struct { - DefaultUsername string - }{ - DefaultUsername: "test", - }, } + info.Monitoring.DefaultUsername = "test" sched = scheduler.Create( 1, @@ -246,7 +242,8 @@ func mockPluginBuilder() (plugin.PluginFactory, *atomic.Int, *atomic.Int) { return plugin.Plugin{Jobs: j, DoClose: closer, Endpoints: 1}, nil }, - Stats: plugin.NewPluginCountersRecorder("test", reg)}, + Stats: plugin.NewPluginCountersRecorder("test", reg), + }, built, closed } diff --git a/libbeat/beat/info.go b/libbeat/beat/info.go index 314597abbb5..7c3b5c0d90f 100644 --- a/libbeat/beat/info.go +++ b/libbeat/beat/info.go @@ -22,6 +22,8 @@ import ( "github.com/gofrs/uuid/v5" "go.opentelemetry.io/collector/consumer" + + "github.com/elastic/elastic-agent-libs/monitoring" ) // Info stores a beats instance meta data. @@ -41,9 +43,10 @@ type Info struct { // Monitoring-related fields Monitoring struct { - DefaultUsername string // The default username to be used to connect to Elasticsearch Monitoring + DefaultUsername string // The default username to be used to connect to Elasticsearch Monitoring + Namespace *monitoring.Namespace // a monitor namespace that is unique per beat instance } - LogConsumer consumer.Logs //otel log consumer + LogConsumer consumer.Logs // otel log consumer } diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index 6332ebac39b..2d1eb3a20f0 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -337,6 +337,8 @@ func NewBeatReceiver(settings Settings, receiverConfig map[string]interface{}, c config.OverwriteConfigOpts(configOpts(store)) } + b.Beat.Info.Monitoring.Namespace = monitoring.GetNamespace(b.Info.Beat + "-" + b.Info.ID.String()) + instrumentation, err := instrumentation.New(cfg, b.Info.Beat, b.Info.Version) if err != nil { return nil, fmt.Errorf("error setting up instrumentation: %w", err) @@ -469,11 +471,6 @@ func NewBeatReceiver(settings Settings, receiverConfig map[string]interface{}, c return nil, fmt.Errorf("error creating processors: %w", err) } - reg := monitoring.Default.GetRegistry(b.Info.Name) - if reg == nil { - reg = monitoring.Default.NewRegistry(b.Info.Name) - } - // This should be replaced with static config for otel consumer // but need to figure out if we want the Queue settings from here. outputEnabled := b.Config.Output.IsSet() && b.Config.Output.Config().Enabled() @@ -485,12 +482,14 @@ func NewBeatReceiver(settings Settings, receiverConfig map[string]interface{}, c } } - tel := reg.GetRegistry("state") + uniq_reg := b.Beat.Info.Monitoring.Namespace.GetRegistry() + + tel := uniq_reg.GetRegistry("state") if tel == nil { - tel = reg.NewRegistry("state") + tel = uniq_reg.NewRegistry("state") } monitors := pipeline.Monitors{ - Metrics: reg, + Metrics: uniq_reg, Telemetry: tel, Logger: logp.L().Named("publisher"), Tracer: b.Instrumentation.Tracer(), @@ -510,7 +509,6 @@ func NewBeatReceiver(settings Settings, receiverConfig map[string]interface{}, c b.Publisher = publisher return b, nil - } // InitWithSettings does initialization of things common to all actions (read confs, flags) @@ -831,11 +829,27 @@ func (b *Beat) RegisterHostname(useFQDN bool) { hostname := b.Info.FQDNAwareHostname(useFQDN) // info.hostname - infoRegistry := monitoring.GetNamespace("info").GetRegistry() + var infoRegistry *monitoring.Registry + if b.Info.Monitoring.Namespace != nil { + infoRegistry = b.Info.Monitoring.Namespace.GetRegistry().GetRegistry("info") + if infoRegistry == nil { + infoRegistry = b.Info.Monitoring.Namespace.GetRegistry().NewRegistry("info") + } + } else { + infoRegistry = monitoring.GetNamespace("info").GetRegistry() + } monitoring.NewString(infoRegistry, "hostname").Set(hostname) // state.host - stateRegistry := monitoring.GetNamespace("state").GetRegistry() + var stateRegistry *monitoring.Registry + if b.Info.Monitoring.Namespace != nil { + stateRegistry = b.Info.Monitoring.Namespace.GetRegistry().GetRegistry("state") + if stateRegistry == nil { + stateRegistry = b.Info.Monitoring.Namespace.GetRegistry().NewRegistry("state") + } + } else { + stateRegistry = monitoring.GetNamespace("state").GetRegistry() + } monitoring.NewFunc(stateRegistry, "host", host.ReportInfo(hostname), monitoring.Report) } diff --git a/x-pack/filebeat/input/benchmark/input.go b/x-pack/filebeat/input/benchmark/input.go index dd6d198cc40..e098d3e746b 100644 --- a/x-pack/filebeat/input/benchmark/input.go +++ b/x-pack/filebeat/input/benchmark/input.go @@ -60,7 +60,7 @@ func (bi *benchmarkInput) Test(ctx v2.TestContext) error { // Run starts the data generation. func (bi *benchmarkInput) Run(ctx v2.Context, publisher stateless.Publisher) error { var wg sync.WaitGroup - metrics := newInputMetrics(ctx.ID) + metrics := newInputMetrics(ctx) for i := uint8(0); i < bi.cfg.Threads; i++ { wg.Add(1) @@ -103,8 +103,8 @@ func runThread(ctx v2.Context, publisher stateless.Publisher, thread uint8, cfg ticker.Stop() return case <-ticker.C: - //don't want to block on filling doPublish channel - //so only send as many as it can hold right now + // don't want to block on filling doPublish channel + // so only send as many as it can hold right now numToSend := cap(pubChan) - len(pubChan) for i := 0; i < numToSend; i++ { pubChan <- true @@ -157,8 +157,8 @@ type inputMetrics struct { } // newInputMetrics returns an input metric for the benchmark processor. -func newInputMetrics(id string) *inputMetrics { - reg, unreg := inputmon.NewInputRegistry(inputName, id, nil) +func newInputMetrics(ctx v2.Context) *inputMetrics { + reg, unreg := inputmon.NewInputRegistry(inputName, ctx.ID, ctx.Agent.Monitoring.Namespace.GetRegistry()) out := &inputMetrics{ unregister: unreg, eventsPublished: monitoring.NewUint(reg, "events_published_total"), diff --git a/x-pack/heartbeat/scenarios/framework/framework.go b/x-pack/heartbeat/scenarios/framework/framework.go index a2fb77e6307..6119f549e99 100644 --- a/x-pack/heartbeat/scenarios/framework/framework.go +++ b/x-pack/heartbeat/scenarios/framework/framework.go @@ -30,11 +30,13 @@ import ( beatversion "github.com/elastic/beats/v7/libbeat/version" ) -type ScenarioRun func(t *testing.T) (config mapstr.M, meta ScenarioRunMeta, close func(), err error) -type ScenarioRunMeta struct { - URL *url.URL - Status monitorstate.StateStatus -} +type ( + ScenarioRun func(t *testing.T) (config mapstr.M, meta ScenarioRunMeta, close func(), err error) + ScenarioRunMeta struct { + URL *url.URL + Status monitorstate.StateStatus + } +) type Scenario struct { Name string @@ -155,7 +157,6 @@ func NewScenarioDB() *ScenarioDB { ByTag: map[string][]Scenario{}, All: []Scenario{}, } - } func (sdb *ScenarioDB) Init() { @@ -250,7 +251,9 @@ func runMonitorOnce(t *testing.T, monitorConfig mapstr.M, meta ScenarioRunMeta, mIface, err := f.Create(pipe, conf) require.NoError(t, err) - mtr.monitor = mIface.(*monitors.Monitor) + mon, ok := mIface.(*monitors.Monitor) + require.True(t, ok, "type assertion didn't succeed") + mtr.monitor = mon require.NotNil(t, mtr.monitor, "could not convert to monitor %v", mIface) mtr.Events = pipe.PublishedEvents @@ -281,12 +284,8 @@ func setupFactoryAndSched(location *hbconfig.LocationWithID, stateLoader monitor EphemeralID: eid, FirstStart: time.Now(), StartTime: time.Now(), - Monitoring: struct { - DefaultUsername string - }{ - DefaultUsername: "test", - }, } + info.Monitoring.DefaultUsername = "test" sched = scheduler.Create( 1,