Skip to content

Commit

Permalink
Add unique per beat monitoring namespace (#41939)
Browse files Browse the repository at this point in the history
  • Loading branch information
leehinman authored Dec 9, 2024
1 parent 0827467 commit da6822b
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 39 deletions.
14 changes: 11 additions & 3 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,10 +272,18 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
waitEvents := newSignalWait()

// count active events for waiting on shutdown
var reg *monitoring.Registry

if b.Info.Monitoring.Namespace != nil {
reg = b.Info.Monitoring.Namespace.GetRegistry().GetRegistry("stats")
if reg == nil {
reg = b.Info.Monitoring.Namespace.GetRegistry().NewRegistry("stats")
}
}
wgEvents := &eventCounter{
count: monitoring.NewInt(nil, "filebeat.events.active"), // Gauge
added: monitoring.NewUint(nil, "filebeat.events.added"),
done: monitoring.NewUint(nil, "filebeat.events.done"),
count: monitoring.NewInt(reg, "filebeat.events.active"), // Gauge
added: monitoring.NewUint(reg, "filebeat.events.added"),
done: monitoring.NewUint(reg, "filebeat.events.done"),
}
finishedLogger := newFinishedLogger(wgEvents)

Expand Down
9 changes: 3 additions & 6 deletions heartbeat/monitors/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,8 @@ func makeMockFactory(pluginsReg *plugin.PluginsReg) (factory *RunnerFactory, sch
EphemeralID: eid,
FirstStart: time.Now(),
StartTime: time.Now(),
Monitoring: struct {
DefaultUsername string
}{
DefaultUsername: "test",
},
}
info.Monitoring.DefaultUsername = "test"

sched = scheduler.Create(
1,
Expand Down Expand Up @@ -246,7 +242,8 @@ func mockPluginBuilder() (plugin.PluginFactory, *atomic.Int, *atomic.Int) {

return plugin.Plugin{Jobs: j, DoClose: closer, Endpoints: 1}, nil
},
Stats: plugin.NewPluginCountersRecorder("test", reg)},
Stats: plugin.NewPluginCountersRecorder("test", reg),
},
built,
closed
}
Expand Down
7 changes: 5 additions & 2 deletions libbeat/beat/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (

"github.com/gofrs/uuid/v5"
"go.opentelemetry.io/collector/consumer"

"github.com/elastic/elastic-agent-libs/monitoring"
)

// Info stores a beats instance meta data.
Expand All @@ -41,9 +43,10 @@ type Info struct {

// Monitoring-related fields
Monitoring struct {
DefaultUsername string // The default username to be used to connect to Elasticsearch Monitoring
DefaultUsername string // The default username to be used to connect to Elasticsearch Monitoring
Namespace *monitoring.Namespace // a monitor namespace that is unique per beat instance
}
LogConsumer consumer.Logs //otel log consumer
LogConsumer consumer.Logs // otel log consumer

}

Expand Down
36 changes: 25 additions & 11 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,8 @@ func NewBeatReceiver(settings Settings, receiverConfig map[string]interface{}, c
config.OverwriteConfigOpts(configOpts(store))
}

b.Beat.Info.Monitoring.Namespace = monitoring.GetNamespace(b.Info.Beat + "-" + b.Info.ID.String())

instrumentation, err := instrumentation.New(cfg, b.Info.Beat, b.Info.Version)
if err != nil {
return nil, fmt.Errorf("error setting up instrumentation: %w", err)
Expand Down Expand Up @@ -469,11 +471,6 @@ func NewBeatReceiver(settings Settings, receiverConfig map[string]interface{}, c
return nil, fmt.Errorf("error creating processors: %w", err)
}

reg := monitoring.Default.GetRegistry(b.Info.Name)
if reg == nil {
reg = monitoring.Default.NewRegistry(b.Info.Name)
}

// This should be replaced with static config for otel consumer
// but need to figure out if we want the Queue settings from here.
outputEnabled := b.Config.Output.IsSet() && b.Config.Output.Config().Enabled()
Expand All @@ -485,12 +482,14 @@ func NewBeatReceiver(settings Settings, receiverConfig map[string]interface{}, c
}
}

tel := reg.GetRegistry("state")
uniq_reg := b.Beat.Info.Monitoring.Namespace.GetRegistry()

tel := uniq_reg.GetRegistry("state")
if tel == nil {
tel = reg.NewRegistry("state")
tel = uniq_reg.NewRegistry("state")
}
monitors := pipeline.Monitors{
Metrics: reg,
Metrics: uniq_reg,
Telemetry: tel,
Logger: logp.L().Named("publisher"),
Tracer: b.Instrumentation.Tracer(),
Expand All @@ -510,7 +509,6 @@ func NewBeatReceiver(settings Settings, receiverConfig map[string]interface{}, c
b.Publisher = publisher

return b, nil

}

// InitWithSettings does initialization of things common to all actions (read confs, flags)
Expand Down Expand Up @@ -831,11 +829,27 @@ func (b *Beat) RegisterHostname(useFQDN bool) {
hostname := b.Info.FQDNAwareHostname(useFQDN)

// info.hostname
infoRegistry := monitoring.GetNamespace("info").GetRegistry()
var infoRegistry *monitoring.Registry
if b.Info.Monitoring.Namespace != nil {
infoRegistry = b.Info.Monitoring.Namespace.GetRegistry().GetRegistry("info")
if infoRegistry == nil {
infoRegistry = b.Info.Monitoring.Namespace.GetRegistry().NewRegistry("info")
}
} else {
infoRegistry = monitoring.GetNamespace("info").GetRegistry()
}
monitoring.NewString(infoRegistry, "hostname").Set(hostname)

// state.host
stateRegistry := monitoring.GetNamespace("state").GetRegistry()
var stateRegistry *monitoring.Registry
if b.Info.Monitoring.Namespace != nil {
stateRegistry = b.Info.Monitoring.Namespace.GetRegistry().GetRegistry("state")
if stateRegistry == nil {
stateRegistry = b.Info.Monitoring.Namespace.GetRegistry().NewRegistry("state")
}
} else {
stateRegistry = monitoring.GetNamespace("state").GetRegistry()
}
monitoring.NewFunc(stateRegistry, "host", host.ReportInfo(hostname), monitoring.Report)
}

Expand Down
10 changes: 5 additions & 5 deletions x-pack/filebeat/input/benchmark/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (bi *benchmarkInput) Test(ctx v2.TestContext) error {
// Run starts the data generation.
func (bi *benchmarkInput) Run(ctx v2.Context, publisher stateless.Publisher) error {
var wg sync.WaitGroup
metrics := newInputMetrics(ctx.ID)
metrics := newInputMetrics(ctx)

for i := uint8(0); i < bi.cfg.Threads; i++ {
wg.Add(1)
Expand Down Expand Up @@ -103,8 +103,8 @@ func runThread(ctx v2.Context, publisher stateless.Publisher, thread uint8, cfg
ticker.Stop()
return
case <-ticker.C:
//don't want to block on filling doPublish channel
//so only send as many as it can hold right now
// don't want to block on filling doPublish channel
// so only send as many as it can hold right now
numToSend := cap(pubChan) - len(pubChan)
for i := 0; i < numToSend; i++ {
pubChan <- true
Expand Down Expand Up @@ -157,8 +157,8 @@ type inputMetrics struct {
}

// newInputMetrics returns an input metric for the benchmark processor.
func newInputMetrics(id string) *inputMetrics {
reg, unreg := inputmon.NewInputRegistry(inputName, id, nil)
func newInputMetrics(ctx v2.Context) *inputMetrics {
reg, unreg := inputmon.NewInputRegistry(inputName, ctx.ID, ctx.Agent.Monitoring.Namespace.GetRegistry())
out := &inputMetrics{
unregister: unreg,
eventsPublished: monitoring.NewUint(reg, "events_published_total"),
Expand Down
23 changes: 11 additions & 12 deletions x-pack/heartbeat/scenarios/framework/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@ import (
beatversion "github.com/elastic/beats/v7/libbeat/version"
)

type ScenarioRun func(t *testing.T) (config mapstr.M, meta ScenarioRunMeta, close func(), err error)
type ScenarioRunMeta struct {
URL *url.URL
Status monitorstate.StateStatus
}
type (
ScenarioRun func(t *testing.T) (config mapstr.M, meta ScenarioRunMeta, close func(), err error)
ScenarioRunMeta struct {
URL *url.URL
Status monitorstate.StateStatus
}
)

type Scenario struct {
Name string
Expand Down Expand Up @@ -155,7 +157,6 @@ func NewScenarioDB() *ScenarioDB {
ByTag: map[string][]Scenario{},
All: []Scenario{},
}

}

func (sdb *ScenarioDB) Init() {
Expand Down Expand Up @@ -250,7 +251,9 @@ func runMonitorOnce(t *testing.T, monitorConfig mapstr.M, meta ScenarioRunMeta,

mIface, err := f.Create(pipe, conf)
require.NoError(t, err)
mtr.monitor = mIface.(*monitors.Monitor)
mon, ok := mIface.(*monitors.Monitor)
require.True(t, ok, "type assertion didn't succeed")
mtr.monitor = mon
require.NotNil(t, mtr.monitor, "could not convert to monitor %v", mIface)
mtr.Events = pipe.PublishedEvents

Expand Down Expand Up @@ -281,12 +284,8 @@ func setupFactoryAndSched(location *hbconfig.LocationWithID, stateLoader monitor
EphemeralID: eid,
FirstStart: time.Now(),
StartTime: time.Now(),
Monitoring: struct {
DefaultUsername string
}{
DefaultUsername: "test",
},
}
info.Monitoring.DefaultUsername = "test"

sched = scheduler.Create(
1,
Expand Down

0 comments on commit da6822b

Please sign in to comment.