Skip to content

Commit

Permalink
[libbeat] Add a metrics observer to the queue (#39774)
Browse files Browse the repository at this point in the history
Add a metrics observer to the queue, reporting the metrics:

- `queue.added.{events, bytes}`, the number of events/bytes added to the queue
- `queue.consumed.{events, bytes}`, the number of events/bytes sent to the outputs
- `queue.removed.{events, bytes}`, the number of events/bytes removed from the queue after acknowledgment (`queue.removed.events` is an alias for the existing `queue.acked`).
`queue.filled.{events, bytes}`, the current number of events and bytes in the queue (gauges)

It also fixes the behavior of `queue.filled.pct.events`, renaming it `queue.filled.pct`.

All byte values reported by the memory queue are 0 if the output doesn't support early encoding.

This required some refactoring to the pipeline, which previously used a single custom callback to track its only queue metric (`queue.acked`) from `outputObserver`, and also used that to manage a wait group that was used to drain the queue on pipeline shutdown. The main changes are:
- A new interface type, `queue.Observer`, with an implementation `queueObserver` for standard metrics reporting.
- `queueMaxEvents` and `queueACKed` were removed from `pipeline.outputObserver`, since their logic is now handled by `queue.Observer`.
- A queue factory now takes a `queue.Observer` instead of an ACK callback
- The queue API now includes a `Done()` channel that signals when all events are acked / shutdown is complete, so shutdown handling now waits on that channel in `outputController.Close` instead of the shared waitgroup in `Pipeline.Close`.
- `pipeline.outputObserver` was renamed `pipeline.retryObserver` since its only remaining functions track retries and retry failures. It is now owned by `eventConsumer` (its only caller) instead of `pipeline.outputController`.

The queue previously had a `Metrics()` call that was used in the shipper but didn't integrate with Beats metrics. It had no remaining callers, so I deleted it while adding the new helpers.
  • Loading branch information
faec authored Jun 11, 2024
1 parent 3c9f4d9 commit f8aedce
Show file tree
Hide file tree
Showing 29 changed files with 733 additions and 765 deletions.
4 changes: 2 additions & 2 deletions filebeat/tests/system/test_reload_inputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,6 @@ def test_start_stop(self):
inputs=False,
)

proc = self.start_beat()

os.mkdir(self.working_dir + "/logs/")
logfile = self.working_dir + "/logs/test.log"
os.mkdir(self.working_dir + "/configs/")
Expand All @@ -103,6 +101,8 @@ def test_start_stop(self):
with open(logfile, 'w') as f:
f.write("Hello world\n")

proc = self.start_beat()

self.wait_until(lambda: self.output_lines() == 1)

# Remove input by moving the file
Expand Down
49 changes: 44 additions & 5 deletions libbeat/docs/metrics-in-logs.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@

Every 30 seconds (by default), {beatname_uc} collects a _snapshot_ of metrics about itself. From this snapshot, {beatname_uc} computes a _delta snapshot_; this delta snapshot contains any metrics that have _changed_ since the last snapshot. Note that the values of the metrics are the values when the snapshot is taken, _NOT_ the _difference_ in values from the last snapshot.

If this delta snapshot contains _any_ metrics (indicating at least one metric that has changed since the last snapshot), this delta snapshot is serialized as JSON and emitted in {beatname_uc}'s logs at the `INFO` log level. Here is an example of such a log entry:
If this delta snapshot contains _any_ metrics (indicating at least one metric that has changed since the last snapshot), this delta snapshot is serialized as JSON and emitted in {beatname_uc}'s logs at the `INFO` log level. Most snapshot fields report the change in the metric since the last snapshot, however some fields are _gauges_, which always report the current value. Here is an example of such a log entry:
[source,json]
----
{"log.level":"info","@timestamp":"2023-07-14T12:50:36.811Z","log.logger":"monitoring","log.origin":{"file.name":"log/log.go","file.line":187},"message":"Non-zero metrics in the last 30s","service.name":"filebeat","monitoring":{"metrics":{"beat":{"cgroup":{"memory":{"mem":{"usage":{"bytes":0}}}},"cpu":{"system":{"ticks":692690,"time":{"ms":60}},"total":{"ticks":3167250,"time":{"ms":150},"value":3167250},"user":{"ticks":2474560,"time":{"ms":90}}},"handles":{"limit":{"hard":1048576,"soft":1048576},"open":32},"info":{"ephemeral_id":"2bab8688-34c0-4522-80af-db86948d547d","uptime":{"ms":617670096},"version":"8.6.2"},"memstats":{"gc_next":57189272,"memory_alloc":43589824,"memory_total":275281335792,"rss":183574528},"runtime":{"goroutines":212}},"filebeat":{"events":{"active":5,"added":52,"done":49},"harvester":{"open_files":6,"running":6,"started":1}},"libbeat":{"config":{"module":{"running":15}},"output":{"events":{"acked":48,"active":0,"batches":6,"total":48},"read":{"bytes":210},"write":{"bytes":26923}},"pipeline":{"clients":15,"events":{"active":5,"filtered":1,"published":51,"total":52},"queue":{"acked":48}}},"registrar":{"states":{"current":14,"update":49},"writes":{"success":6,"total":6}},"system":{"load":{"1":0.91,"15":0.37,"5":0.4,"norm":{"1":0.1138,"15":0.0463,"5":0.05}}}},"ecs.version":"1.6.0"}}
{"log.level":"info","@timestamp":"2023-07-14T12:50:36.811Z","log.logger":"monitoring","log.origin":{"file.name":"log/log.go","file.line":187},"message":"Non-zero metrics in the last 30s","service.name":"filebeat","monitoring":{"metrics":{"beat":{"cgroup":{"memory":{"mem":{"usage":{"bytes":0}}}},"cpu":{"system":{"ticks":692690,"time":{"ms":60}},"total":{"ticks":3167250,"time":{"ms":150},"value":3167250},"user":{"ticks":2474560,"time":{"ms":90}}},"handles":{"limit":{"hard":1048576,"soft":1048576},"open":32},"info":{"ephemeral_id":"2bab8688-34c0-4522-80af-db86948d547d","uptime":{"ms":617670096},"version":"8.6.2"},"memstats":{"gc_next":57189272,"memory_alloc":43589824,"memory_total":275281335792,"rss":183574528},"runtime":{"goroutines":212}},"filebeat":{"events":{"active":5,"added":52,"done":49},"harvester":{"open_files":6,"running":6,"started":1}},"libbeat":{"config":{"module":{"running":15}},"output":{"events":{"acked":48,"active":0,"batches":6,"total":48},"read":{"bytes":210},"write":{"bytes":26923}},"pipeline":{"clients":15,"events":{"active":5,"filtered":1,"published":51,"total":52},"queue":{"max_events":3500,"filled":{"events":5,"bytes":6425,"pct":0.0014},"added":{"events":52,"bytes":65702},"consumed":{"events":52,"bytes":65702},"removed":{"events":48,"bytes":59277},"acked":48}}},"registrar":{"states":{"current":14,"update":49},"writes":{"success":6,"total":6}},"system":{"load":{"1":0.91,"15":0.37,"5":0.4,"norm":{"1":0.1138,"15":0.0463,"5":0.05}}}},"ecs.version":"1.6.0"}}
----
[discrete]
Expand Down Expand Up @@ -113,6 +113,24 @@ Focussing on the `.monitoring.metrics` field, and formatting the JSON, it's valu
"total": 52
},
"queue": {
"max_events": 3500,
"filled": {
"events": 5,
"bytes": 6425,
"pct": 0.0014
},
"added": {
"events": 52,
"bytes": 65702
},
"consumed": {
"events": 52,
"bytes": 65702
},
"removed": {
"events": 48,
"bytes": 59277
},
"acked": 48
}
}
Expand All @@ -130,12 +148,12 @@ Focussing on the `.monitoring.metrics` field, and formatting the JSON, it's valu
"system": {
"load": {
"1": 0.91,
"5": 0.4,
"15": 0.37,
"5": 0.4,
"norm": {
"1": 0.1138,
"5": 0.05,
"15": 0.0463
"15": 0.0463,
"5": 0.05
}
}
}
Expand Down Expand Up @@ -170,9 +188,30 @@ endif::[]
| `.output.events.total` | Integer | Number of events currently being processed by the output. | If this number grows over time, it may indicate that the output destination (e.g. {ls} pipeline or {es} cluster) is not able to accept events at the same or faster rate than what {beatname_uc} is sending to it.
| `.output.events.acked` | Integer | Number of events acknowledged by the output destination. | Generally, we want this number to be the same as `.output.events.total` as this indicates that the output destination has reliably received all the events sent to it.
| `.output.events.failed` | Integer | Number of events that {beatname_uc} tried to send to the output destination, but the destination failed to receive them. | Generally, we want this field to be absent or its value to be zero. When the value is greater than zero, it's useful to check {beatname_uc}'s logs right before this log entry's `@timestamp` to see if there are any connectivity issues with the output destination. Note that failed events are not lost or dropped; they will be sent back to the publisher pipeline for retrying later.
| `.output.events.dropped` | Integer | Number of events that {beatname_uc} gave up sending to the output destination because of a permanent (non-retryable) error.
| `.output.events.dead_letter` | Integer | Number of events that {beatname_uc} successfully sent to a configured dead letter index after they failed to ingest in the primary index.
| `.output.write.latency` | Object | Reports statistics on the time to send an event to the connected output, in milliseconds. This can be used to diagnose delays and performance issues caused by I/O or output configuration. This metric is available for the Elasticsearch, file, redis, and logstash outputs.
|===
[cols="1,1,2,2"]
|===
| Field path (relative to `.monitoring.metrics.libbeat.pipeline`) | Type | Meaning | Troubleshooting hints
| `.queue.max_events` | Integer (gauge) | The queue's maximum event count if it has one, otherwise zero.
| `.queue.max_bytes` | Integer (gauge) | The queue's maximum byte count if it has one, otherwise zero.
| `.queue.filled.events` | Integer (gauge) | Number of events currently stored by the queue. |
| `.queue.filled.bytes` | Integer (gauge) | Number of bytes currently stored by the queue. |
| `.queue.filled.pct` | Float (gauge) | How full the queue is relative to its maximum size, as a fraction from 0 to 1. | Low throughput while `queue.filled.pct` is low means congestion in the input. Low throughput while `queue.filled.pct` is high means congestion in the output.
| `.queue.added.events` | Integer | Number of events added to the queue by input workers. |
| `.queue.added.bytes` | Integer | Number of bytes added to the queue by input workers. |
| `.queue.consumed.events` | Integer | Number of events sent to output workers. |
| `.queue.consumed.bytes` | Integer | Number of bytes sent to output workers. |
| `.queue.removed.events` | Integer | Number of events removed from the queue after being processed by output workers. |
| `.queue.removed.bytes` | Integer | Number of bytes removed from the queue after being processed by output workers. |
|===
When using the memory queue, byte metrics are only set if the output supports them. Currently only the Elasticsearch output supports byte metrics.
ifeval::["{beatname_lc}"=="filebeat"]
[cols="1,1,2,2"]
|===
Expand Down
63 changes: 33 additions & 30 deletions libbeat/monitoring/report/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,36 +37,39 @@ import (
// TODO: Replace this with a proper solution that uses the metric type from
// where it is defined. See: https://github.com/elastic/beats/issues/5433
var gauges = map[string]bool{
"libbeat.output.events.active": true,
"libbeat.pipeline.events.active": true,
"libbeat.pipeline.clients": true,
"libbeat.pipeline.queue.max_events": true,
"libbeat.pipeline.queue.filled.pct.events": true,
"libbeat.config.module.running": true,
"registrar.states.current": true,
"filebeat.events.active": true,
"filebeat.harvester.running": true,
"filebeat.harvester.open_files": true,
"beat.memstats.memory_total": true,
"beat.memstats.memory_alloc": true,
"beat.memstats.rss": true,
"beat.memstats.gc_next": true,
"beat.info.uptime.ms": true,
"beat.cgroup.memory.mem.usage.bytes": true,
"beat.cpu.user.ticks": true,
"beat.cpu.system.ticks": true,
"beat.cpu.total.value": true,
"beat.cpu.total.ticks": true,
"beat.handles.open": true,
"beat.handles.limit.hard": true,
"beat.handles.limit.soft": true,
"beat.runtime.goroutines": true,
"system.load.1": true,
"system.load.5": true,
"system.load.15": true,
"system.load.norm.1": true,
"system.load.norm.5": true,
"system.load.norm.15": true,
"libbeat.output.events.active": true,
"libbeat.pipeline.events.active": true,
"libbeat.pipeline.clients": true,
"libbeat.pipeline.queue.max_events": true,
"libbeat.pipeline.queue.max_bytes": true,
"libbeat.pipeline.queue.filled.events": true,
"libbeat.pipeline.queue.filled.bytes": true,
"libbeat.pipeline.queue.filled.pct": true,
"libbeat.config.module.running": true,
"registrar.states.current": true,
"filebeat.events.active": true,
"filebeat.harvester.running": true,
"filebeat.harvester.open_files": true,
"beat.memstats.memory_total": true,
"beat.memstats.memory_alloc": true,
"beat.memstats.rss": true,
"beat.memstats.gc_next": true,
"beat.info.uptime.ms": true,
"beat.cgroup.memory.mem.usage.bytes": true,
"beat.cpu.user.ticks": true,
"beat.cpu.system.ticks": true,
"beat.cpu.total.value": true,
"beat.cpu.total.ticks": true,
"beat.handles.open": true,
"beat.handles.limit.hard": true,
"beat.handles.limit.soft": true,
"beat.runtime.goroutines": true,
"system.load.1": true,
"system.load.5": true,
"system.load.15": true,
"system.load.norm.1": true,
"system.load.norm.5": true,
"system.load.norm.15": true,
}

// isGauge returns true when the given metric key name represents a gauge value.
Expand Down
16 changes: 5 additions & 11 deletions libbeat/publisher/pipeline/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,8 @@ type client struct {
mutex sync.Mutex
waiter *clientCloseWaiter

eventFlags publisher.EventFlags
canDrop bool
eventWaitGroup *sync.WaitGroup
eventFlags publisher.EventFlags
canDrop bool

// Open state, signaling, and sync primitives for coordinating client Close.
isOpen atomic.Bool // set to false during shutdown, such that no new events will be accepted anymore.
Expand Down Expand Up @@ -132,10 +131,8 @@ func (c *client) publish(e beat.Event) {
}

func (c *client) Close() error {
// first stop ack handling. ACK handler might block on wait (with timeout), waiting
// for pending events to be ACKed.
c.closeOnce.Do(func() {
c.isOpen.Store(false)
if c.isOpen.Swap(false) {
// Only do shutdown handling the first time Close is called
c.onClosing()

c.logger.Debug("client: closing acker")
Expand All @@ -158,7 +155,7 @@ func (c *client) Close() error {
}
c.logger.Debug("client: done closing processors")
}
})
}
return nil
}

Expand All @@ -180,9 +177,6 @@ func (c *client) onNewEvent() {
}

func (c *client) onPublished() {
if c.eventWaitGroup != nil {
c.eventWaitGroup.Add(1)
}
c.observer.publishedEvent()
if c.clientListener != nil {
c.clientListener.Published()
Expand Down
75 changes: 21 additions & 54 deletions libbeat/publisher/pipeline/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,59 +60,27 @@ func TestClient(t *testing.T) {
// Note: no asserts. If closing fails we have a deadlock, because Publish
// would block forever

cases := map[string]struct {
context bool
close func(client beat.Client, cancel func())
}{
"close unblocks client without context": {
context: false,
close: func(client beat.Client, _ func()) {
client.Close()
},
},
"close unblocks client with context": {
context: true,
close: func(client beat.Client, _ func()) {
client.Close()
},
},
"context cancel unblocks client": {
context: true,
close: func(client beat.Client, cancel func()) {
cancel()
},
},
}

logp.TestingSetup()
routinesChecker := resources.NewGoroutinesChecker()
defer routinesChecker.Check(t)

for name, test := range cases {
t.Run(name, func(t *testing.T) {
routinesChecker := resources.NewGoroutinesChecker()
defer routinesChecker.Check(t)
pipeline := makePipeline(t, Settings{}, makeTestQueue())
defer pipeline.Close()

pipeline := makePipeline(t, Settings{}, makeTestQueue())
defer pipeline.Close()

client, err := pipeline.ConnectWith(beat.ClientConfig{})
if err != nil {
t.Fatal(err)
}
defer client.Close()
client, err := pipeline.ConnectWith(beat.ClientConfig{})
if err != nil {
t.Fatal(err)
}

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
client.Publish(beat.Event{})
}()
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
client.Publish(beat.Event{})
}()

test.close(client, func() {
client.Close()
})
wg.Wait()
})
}
client.Close()
wg.Wait()
})

t.Run("no infinite loop when processing fails", func(t *testing.T) {
Expand Down Expand Up @@ -216,9 +184,6 @@ func TestClient(t *testing.T) {
}

func TestClientWaitClose(t *testing.T) {
routinesChecker := resources.NewGoroutinesChecker()
defer routinesChecker.Check(t)

makePipeline := func(settings Settings, qu queue.Queue) *Pipeline {
p, err := New(beat.Info{},
Monitors{},
Expand All @@ -241,6 +206,9 @@ func TestClientWaitClose(t *testing.T) {
defer pipeline.Close()

t.Run("WaitClose blocks", func(t *testing.T) {
routinesChecker := resources.NewGoroutinesChecker()
defer routinesChecker.Check(t)

client, err := pipeline.ConnectWith(beat.ClientConfig{
WaitClose: 500 * time.Millisecond,
})
Expand Down Expand Up @@ -272,6 +240,8 @@ func TestClientWaitClose(t *testing.T) {
})

t.Run("ACKing events unblocks WaitClose", func(t *testing.T) {
routinesChecker := resources.NewGoroutinesChecker()
defer routinesChecker.Check(t)
client, err := pipeline.ConnectWith(beat.ClientConfig{
WaitClose: time.Minute,
})
Expand Down Expand Up @@ -344,9 +314,6 @@ func TestMonitoring(t *testing.T) {
require.NoError(t, err)
defer pipeline.Close()

metricsSnapshot := monitoring.CollectFlatSnapshot(metrics, monitoring.Full, true)
assert.Equal(t, int64(maxEvents), metricsSnapshot.Ints["pipeline.queue.max_events"])

telemetrySnapshot := monitoring.CollectFlatSnapshot(telemetry, monitoring.Full, true)
assert.Equal(t, "output_name", telemetrySnapshot.Strings["output.name"])
assert.Equal(t, int64(batchSize), telemetrySnapshot.Ints["output.batch_size"])
Expand Down
16 changes: 8 additions & 8 deletions libbeat/publisher/pipeline/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ import (
type eventConsumer struct {
logger *logp.Logger

// eventConsumer calls the observer methods eventsRetry and eventsDropped.
observer outputObserver
// eventConsumer calls the retryObserver methods eventsRetry and eventsDropped.
retryObserver retryObserver

// When the output changes, the new target is sent to the worker routine
// on this channel. Clients should call eventConsumer.setTarget().
Expand Down Expand Up @@ -73,12 +73,12 @@ type retryRequest struct {

func newEventConsumer(
log *logp.Logger,
observer outputObserver,
observer retryObserver,
) *eventConsumer {
c := &eventConsumer{
logger: log,
observer: observer,
queueReader: makeQueueReader(),
logger: log,
retryObserver: observer,
queueReader: makeQueueReader(),

targetChan: make(chan consumerTarget),
retryChan: make(chan retryRequest),
Expand Down Expand Up @@ -163,7 +163,7 @@ outerLoop:
// Successfully sent a batch to the output workers
if len(retryBatches) > 0 {
// This was a retry, report it to the observer
c.observer.eventsRetry(len(active.Events()))
c.retryObserver.eventsRetry(len(active.Events()))
retryBatches = retryBatches[1:]
} else {
// This was directly from the queue, clear the value so we can
Expand All @@ -183,7 +183,7 @@ outerLoop:
alive := req.batch.reduceTTL()

countDropped := countFailed - len(req.batch.Events())
c.observer.eventsDropped(countDropped)
c.retryObserver.eventsDropped(countDropped)

if !alive {
log.Info("Drop batch")
Expand Down
Loading

0 comments on commit f8aedce

Please sign in to comment.