diff --git a/filebeat/docs/inputs/input-filestream.asciidoc b/filebeat/docs/inputs/input-filestream.asciidoc index 54283d6cce7..62d97cc1245 100644 --- a/filebeat/docs/inputs/input-filestream.asciidoc +++ b/filebeat/docs/inputs/input-filestream.asciidoc @@ -174,6 +174,7 @@ observe the activity of the input. | `files_closed_total` | Total number of files closed. | `files_active` | Number of files currently open (gauge). | `messages_read_total` | Total number of messages read. +| `messages_truncated_total` | Total number of messages truncated. | `bytes_processed_total` | Total number of bytes processed. | `events_processed_total` | Total number of events processed. | `processing_errors_total` | Total number of processing errors. diff --git a/filebeat/input/filestream/environment_test.go b/filebeat/input/filestream/environment_test.go index f9804bb16f3..c826f270b4c 100644 --- a/filebeat/input/filestream/environment_test.go +++ b/filebeat/input/filestream/environment_test.go @@ -345,7 +345,7 @@ func (e *inputTestingEnvironment) requireNoEntryInRegistry(filename, inputID str func (e *inputTestingEnvironment) requireOffsetInRegistryByID(key string, expectedOffset int) { entry, err := e.getRegistryState(key) if err != nil { - e.t.Fatalf(err.Error()) + e.t.Fatal(err.Error()) } require.Equal(e.t, expectedOffset, entry.Cursor.Offset) diff --git a/filebeat/input/filestream/input.go b/filebeat/input/filestream/input.go index c2efe2c50cd..3230c506163 100644 --- a/filebeat/input/filestream/input.go +++ b/filebeat/input/filestream/input.go @@ -22,6 +22,7 @@ import ( "fmt" "io" "os" + "slices" "time" "golang.org/x/text/transform" @@ -380,6 +381,15 @@ func (inp *filestream) readFromSource( s.Offset += int64(message.Bytes) + int64(message.Offset) + flags, err := message.Fields.GetValue("log.flags") + if err == nil { + if flags, ok := flags.([]string); ok { + if slices.Contains(flags, "truncated") { + metrics.MessagesTruncated.Add(1) + } + } + } + metrics.MessagesRead.Inc() if message.IsEmpty() || inp.isDroppedLine(log, string(message.Content)) { continue diff --git a/filebeat/input/filestream/internal/input-logfile/metrics.go b/filebeat/input/filestream/internal/input-logfile/metrics.go index 194e72f614d..edc6e915934 100644 --- a/filebeat/input/filestream/internal/input-logfile/metrics.go +++ b/filebeat/input/filestream/internal/input-logfile/metrics.go @@ -29,14 +29,15 @@ import ( type Metrics struct { unregister func() - FilesOpened *monitoring.Uint // Number of files that have been opened. - FilesClosed *monitoring.Uint // Number of files closed. - FilesActive *monitoring.Uint // Number of files currently open (gauge). - MessagesRead *monitoring.Uint // Number of messages read. - BytesProcessed *monitoring.Uint // Number of bytes processed. - EventsProcessed *monitoring.Uint // Number of events processed. - ProcessingErrors *monitoring.Uint // Number of processing errors. - ProcessingTime metrics.Sample // Histogram of the elapsed time for processing an event. + FilesOpened *monitoring.Uint // Number of files that have been opened. + FilesClosed *monitoring.Uint // Number of files closed. + FilesActive *monitoring.Uint // Number of files currently open (gauge). + MessagesRead *monitoring.Uint // Number of messages read. + MessagesTruncated *monitoring.Uint // Number of messages truncated. + BytesProcessed *monitoring.Uint // Number of bytes processed. + EventsProcessed *monitoring.Uint // Number of events processed. + ProcessingErrors *monitoring.Uint // Number of processing errors. + ProcessingTime metrics.Sample // Histogram of the elapsed time for processing an event. // Those metrics use the same registry/keys as the log input uses HarvesterStarted *monitoring.Int @@ -65,15 +66,16 @@ func NewMetrics(id string) *Metrics { reg, unreg := inputmon.NewInputRegistry("filestream", id, nil) m := Metrics{ - unregister: unreg, - FilesOpened: monitoring.NewUint(reg, "files_opened_total"), - FilesClosed: monitoring.NewUint(reg, "files_closed_total"), - FilesActive: monitoring.NewUint(reg, "files_active"), - MessagesRead: monitoring.NewUint(reg, "messages_read_total"), - BytesProcessed: monitoring.NewUint(reg, "bytes_processed_total"), - EventsProcessed: monitoring.NewUint(reg, "events_processed_total"), - ProcessingErrors: monitoring.NewUint(reg, "processing_errors_total"), - ProcessingTime: metrics.NewUniformSample(1024), + unregister: unreg, + FilesOpened: monitoring.NewUint(reg, "files_opened_total"), + FilesClosed: monitoring.NewUint(reg, "files_closed_total"), + FilesActive: monitoring.NewUint(reg, "files_active"), + MessagesRead: monitoring.NewUint(reg, "messages_read_total"), + MessagesTruncated: monitoring.NewUint(reg, "messages_truncated_total"), + BytesProcessed: monitoring.NewUint(reg, "bytes_processed_total"), + EventsProcessed: monitoring.NewUint(reg, "events_processed_total"), + ProcessingErrors: monitoring.NewUint(reg, "processing_errors_total"), + ProcessingTime: metrics.NewUniformSample(1024), HarvesterStarted: monitoring.NewInt(harvesterMetrics, "started"), HarvesterClosed: monitoring.NewInt(harvesterMetrics, "closed"), diff --git a/filebeat/input/filestream/metrics_integration_test.go b/filebeat/input/filestream/metrics_integration_test.go index 3671f076d0e..66a878a0fee 100644 --- a/filebeat/input/filestream/metrics_integration_test.go +++ b/filebeat/input/filestream/metrics_integration_test.go @@ -38,9 +38,20 @@ func TestFilestreamMetrics(t *testing.T) { "prospector.scanner.check_interval": "24h", "close.on_state_change.check_interval": "100ms", "close.on_state_change.inactive": "2s", + "parsers": []map[string]interface{}{ + { + "multiline": map[string]interface{}{ + "type": "pattern", + "pattern": "^multiline", + "negate": true, + "match": "after", + "max_lines": 1, + }, + }, + }, }) - testlines := []byte("first line\nsecond line\nthird line\n") + testlines := []byte("first line\nsecond line\nthird line\nmultiline first line\nmultiline second line\n") env.mustWriteToFile(testlogName, testlines) ctx, cancelInput := context.WithCancel(context.Background()) @@ -51,13 +62,14 @@ func TestFilestreamMetrics(t *testing.T) { env.waitUntilHarvesterIsDone() checkMetrics(t, "fake-ID", expectedMetrics{ - FilesOpened: 1, - FilesClosed: 1, - FilesActive: 0, - MessagesRead: 3, - BytesProcessed: 34, - EventsProcessed: 3, - ProcessingErrors: 0, + FilesOpened: 1, + FilesClosed: 1, + FilesActive: 0, + MessagesRead: 3, + MessagesTruncated: 1, + BytesProcessed: 77, + EventsProcessed: 3, + ProcessingErrors: 0, }) cancelInput() @@ -65,13 +77,14 @@ func TestFilestreamMetrics(t *testing.T) { } type expectedMetrics struct { - FilesOpened uint64 - FilesClosed uint64 - FilesActive uint64 - MessagesRead uint64 - BytesProcessed uint64 - EventsProcessed uint64 - ProcessingErrors uint64 + FilesOpened uint64 + FilesClosed uint64 + FilesActive uint64 + MessagesRead uint64 + MessagesTruncated uint64 + BytesProcessed uint64 + EventsProcessed uint64 + ProcessingErrors uint64 } func checkMetrics(t *testing.T, id string, expected expectedMetrics) { @@ -82,6 +95,7 @@ func checkMetrics(t *testing.T, id string, expected expectedMetrics) { require.Equal(t, expected.FilesOpened, reg.Get("files_opened_total").(*monitoring.Uint).Get(), "files_opened_total") require.Equal(t, expected.FilesClosed, reg.Get("files_closed_total").(*monitoring.Uint).Get(), "files_closed_total") require.Equal(t, expected.MessagesRead, reg.Get("messages_read_total").(*monitoring.Uint).Get(), "messages_read_total") + require.Equal(t, expected.MessagesTruncated, reg.Get("messages_truncated_total").(*monitoring.Uint).Get(), "messages_truncated_total") require.Equal(t, expected.BytesProcessed, reg.Get("bytes_processed_total").(*monitoring.Uint).Get(), "bytes_processed_total") require.Equal(t, expected.EventsProcessed, reg.Get("events_processed_total").(*monitoring.Uint).Get(), "events_processed_total") require.Equal(t, expected.ProcessingErrors, reg.Get("processing_errors_total").(*monitoring.Uint).Get(), "processing_errors_total") diff --git a/libbeat/reader/multiline/message_buffer.go b/libbeat/reader/multiline/message_buffer.go index 7dd1f21e3c7..531ef7481e5 100644 --- a/libbeat/reader/multiline/message_buffer.go +++ b/libbeat/reader/multiline/message_buffer.go @@ -18,16 +18,10 @@ package multiline import ( - "golang.org/x/time/rate" - "github.com/elastic/beats/v7/libbeat/reader" "github.com/elastic/elastic-agent-libs/logp" ) -// truncatedLogRate is a rate limiter for the log message that is -// printed when a multiline message is too large. -var truncatedLogRate = rate.Sometimes{First: 1, Every: 1000} - type messageBuffer struct { maxBytes int // bytes stored in content maxLines int @@ -129,9 +123,6 @@ func (b *messageBuffer) addLine(m reader.Message) { // finalize writes the existing content into the returned message and resets all reader variables. func (b *messageBuffer) finalize() reader.Message { if b.truncated > 0 { - truncatedLogRate.Do(func() { - b.logger.Warnf("The multiline message is too large and has been truncated to the limit of %d lines or %d bytes. This log is sampled, and the number of truncated messages is likely higher than the number of times this log message appears.", b.maxLines, b.maxBytes) - }) if err := b.message.AddFlagsWithKey("log.flags", "truncated"); err != nil { b.logger.Errorf("Failed to add truncated flag: %v", err) } diff --git a/metricbeat/module/mongodb/replstatus/replstatus_integration_test.go b/metricbeat/module/mongodb/replstatus/replstatus_integration_test.go index 018e62dfcda..c5b00c11ed0 100644 --- a/metricbeat/module/mongodb/replstatus/replstatus_integration_test.go +++ b/metricbeat/module/mongodb/replstatus/replstatus_integration_test.go @@ -133,11 +133,11 @@ func initiateReplicaSet(t *testing.T, host string) error { } type ReplicaConfig struct { - id string `bson:_id` - members []Host `bson:hosts` + id string `bson:"_id"` + members []Host `bson:"hosts"` } type Host struct { - id int `bson:_id` - host string `bson:host` + id int `bson:"_id"` + host string `bson:"host"` }