Skip to content

Commit

Permalink
Fix vet issues, add metric
Browse files Browse the repository at this point in the history
  • Loading branch information
mauri870 committed Dec 13, 2024
1 parent 4b3da21 commit 4c7fb15
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 46 deletions.
1 change: 1 addition & 0 deletions filebeat/docs/inputs/input-filestream.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ observe the activity of the input.
| `files_closed_total` | Total number of files closed.
| `files_active` | Number of files currently open (gauge).
| `messages_read_total` | Total number of messages read.
| `messages_truncated_total` | Total number of messages truncated.
| `bytes_processed_total` | Total number of bytes processed.
| `events_processed_total` | Total number of events processed.
| `processing_errors_total` | Total number of processing errors.
Expand Down
2 changes: 1 addition & 1 deletion filebeat/input/filestream/environment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ func (e *inputTestingEnvironment) requireNoEntryInRegistry(filename, inputID str
func (e *inputTestingEnvironment) requireOffsetInRegistryByID(key string, expectedOffset int) {
entry, err := e.getRegistryState(key)
if err != nil {
e.t.Fatalf(err.Error())
e.t.Fatal(err.Error())
}

require.Equal(e.t, expectedOffset, entry.Cursor.Offset)
Expand Down
10 changes: 10 additions & 0 deletions filebeat/input/filestream/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"io"
"os"
"slices"
"time"

"golang.org/x/text/transform"
Expand Down Expand Up @@ -380,6 +381,15 @@ func (inp *filestream) readFromSource(

s.Offset += int64(message.Bytes) + int64(message.Offset)

flags, err := message.Fields.GetValue("log.flags")
if err == nil {
if flags, ok := flags.([]string); ok {
if slices.Contains(flags, "truncated") {

Check failure on line 387 in filebeat/input/filestream/input.go

View workflow job for this annotation

GitHub Actions / lint (darwin)

cannot infer S (/opt/hostedtoolcache/go/1.22.9/x64/src/slices/slices.go:115:15) (typecheck)
metrics.MessagesTruncated.Add(1)
}
}
}

metrics.MessagesRead.Inc()
if message.IsEmpty() || inp.isDroppedLine(log, string(message.Content)) {
continue
Expand Down
36 changes: 19 additions & 17 deletions filebeat/input/filestream/internal/input-logfile/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,15 @@ import (
type Metrics struct {
unregister func()

FilesOpened *monitoring.Uint // Number of files that have been opened.
FilesClosed *monitoring.Uint // Number of files closed.
FilesActive *monitoring.Uint // Number of files currently open (gauge).
MessagesRead *monitoring.Uint // Number of messages read.
BytesProcessed *monitoring.Uint // Number of bytes processed.
EventsProcessed *monitoring.Uint // Number of events processed.
ProcessingErrors *monitoring.Uint // Number of processing errors.
ProcessingTime metrics.Sample // Histogram of the elapsed time for processing an event.
FilesOpened *monitoring.Uint // Number of files that have been opened.
FilesClosed *monitoring.Uint // Number of files closed.
FilesActive *monitoring.Uint // Number of files currently open (gauge).
MessagesRead *monitoring.Uint // Number of messages read.
MessagesTruncated *monitoring.Uint // Number of messages truncated.
BytesProcessed *monitoring.Uint // Number of bytes processed.
EventsProcessed *monitoring.Uint // Number of events processed.
ProcessingErrors *monitoring.Uint // Number of processing errors.
ProcessingTime metrics.Sample // Histogram of the elapsed time for processing an event.

// Those metrics use the same registry/keys as the log input uses
HarvesterStarted *monitoring.Int
Expand Down Expand Up @@ -65,15 +66,16 @@ func NewMetrics(id string) *Metrics {

reg, unreg := inputmon.NewInputRegistry("filestream", id, nil)
m := Metrics{
unregister: unreg,
FilesOpened: monitoring.NewUint(reg, "files_opened_total"),
FilesClosed: monitoring.NewUint(reg, "files_closed_total"),
FilesActive: monitoring.NewUint(reg, "files_active"),
MessagesRead: monitoring.NewUint(reg, "messages_read_total"),
BytesProcessed: monitoring.NewUint(reg, "bytes_processed_total"),
EventsProcessed: monitoring.NewUint(reg, "events_processed_total"),
ProcessingErrors: monitoring.NewUint(reg, "processing_errors_total"),
ProcessingTime: metrics.NewUniformSample(1024),
unregister: unreg,
FilesOpened: monitoring.NewUint(reg, "files_opened_total"),
FilesClosed: monitoring.NewUint(reg, "files_closed_total"),
FilesActive: monitoring.NewUint(reg, "files_active"),
MessagesRead: monitoring.NewUint(reg, "messages_read_total"),
MessagesTruncated: monitoring.NewUint(reg, "messages_truncated_total"),
BytesProcessed: monitoring.NewUint(reg, "bytes_processed_total"),
EventsProcessed: monitoring.NewUint(reg, "events_processed_total"),
ProcessingErrors: monitoring.NewUint(reg, "processing_errors_total"),
ProcessingTime: metrics.NewUniformSample(1024),

HarvesterStarted: monitoring.NewInt(harvesterMetrics, "started"),
HarvesterClosed: monitoring.NewInt(harvesterMetrics, "closed"),
Expand Down
44 changes: 29 additions & 15 deletions filebeat/input/filestream/metrics_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,20 @@ func TestFilestreamMetrics(t *testing.T) {
"prospector.scanner.check_interval": "24h",
"close.on_state_change.check_interval": "100ms",
"close.on_state_change.inactive": "2s",
"parsers": []map[string]interface{}{
{
"multiline": map[string]interface{}{
"type": "pattern",
"pattern": "^multiline",
"negate": true,
"match": "after",
"max_lines": 1,
},
},
},
})

testlines := []byte("first line\nsecond line\nthird line\n")
testlines := []byte("first line\nsecond line\nthird line\nmultiline first line\nmultiline second line\n")
env.mustWriteToFile(testlogName, testlines)

ctx, cancelInput := context.WithCancel(context.Background())
Expand All @@ -51,27 +62,29 @@ func TestFilestreamMetrics(t *testing.T) {
env.waitUntilHarvesterIsDone()

checkMetrics(t, "fake-ID", expectedMetrics{
FilesOpened: 1,
FilesClosed: 1,
FilesActive: 0,
MessagesRead: 3,
BytesProcessed: 34,
EventsProcessed: 3,
ProcessingErrors: 0,
FilesOpened: 1,
FilesClosed: 1,
FilesActive: 0,
MessagesRead: 3,
MessagesTruncated: 1,
BytesProcessed: 77,
EventsProcessed: 3,
ProcessingErrors: 0,
})

cancelInput()
env.waitUntilInputStops()
}

type expectedMetrics struct {
FilesOpened uint64
FilesClosed uint64
FilesActive uint64
MessagesRead uint64
BytesProcessed uint64
EventsProcessed uint64
ProcessingErrors uint64
FilesOpened uint64
FilesClosed uint64
FilesActive uint64
MessagesRead uint64
MessagesTruncated uint64
BytesProcessed uint64
EventsProcessed uint64
ProcessingErrors uint64
}

func checkMetrics(t *testing.T, id string, expected expectedMetrics) {
Expand All @@ -82,6 +95,7 @@ func checkMetrics(t *testing.T, id string, expected expectedMetrics) {
require.Equal(t, expected.FilesOpened, reg.Get("files_opened_total").(*monitoring.Uint).Get(), "files_opened_total")
require.Equal(t, expected.FilesClosed, reg.Get("files_closed_total").(*monitoring.Uint).Get(), "files_closed_total")
require.Equal(t, expected.MessagesRead, reg.Get("messages_read_total").(*monitoring.Uint).Get(), "messages_read_total")
require.Equal(t, expected.MessagesTruncated, reg.Get("messages_truncated_total").(*monitoring.Uint).Get(), "messages_truncated_total")
require.Equal(t, expected.BytesProcessed, reg.Get("bytes_processed_total").(*monitoring.Uint).Get(), "bytes_processed_total")
require.Equal(t, expected.EventsProcessed, reg.Get("events_processed_total").(*monitoring.Uint).Get(), "events_processed_total")
require.Equal(t, expected.ProcessingErrors, reg.Get("processing_errors_total").(*monitoring.Uint).Get(), "processing_errors_total")
Expand Down
9 changes: 0 additions & 9 deletions libbeat/reader/multiline/message_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,10 @@
package multiline

import (
"golang.org/x/time/rate"

"github.com/elastic/beats/v7/libbeat/reader"
"github.com/elastic/elastic-agent-libs/logp"
)

// truncatedLogRate is a rate limiter for the log message that is
// printed when a multiline message is too large.
var truncatedLogRate = rate.Sometimes{First: 1, Every: 1000}

type messageBuffer struct {
maxBytes int // bytes stored in content
maxLines int
Expand Down Expand Up @@ -129,9 +123,6 @@ func (b *messageBuffer) addLine(m reader.Message) {
// finalize writes the existing content into the returned message and resets all reader variables.
func (b *messageBuffer) finalize() reader.Message {
if b.truncated > 0 {
truncatedLogRate.Do(func() {
b.logger.Warnf("The multiline message is too large and has been truncated to the limit of %d lines or %d bytes. This log is sampled, and the number of truncated messages is likely higher than the number of times this log message appears.", b.maxLines, b.maxBytes)
})
if err := b.message.AddFlagsWithKey("log.flags", "truncated"); err != nil {
b.logger.Errorf("Failed to add truncated flag: %v", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,11 @@ func initiateReplicaSet(t *testing.T, host string) error {
}

type ReplicaConfig struct {
id string `bson:_id`
members []Host `bson:hosts`
id string `bson:"_id"`
members []Host `bson:"hosts"`
}

type Host struct {
id int `bson:_id`
host string `bson:host`
id int `bson:"_id"`
host string `bson:"host"`
}

0 comments on commit 4c7fb15

Please sign in to comment.