diff --git a/libbeat/reader/filter/filter.go b/libbeat/reader/filter/filter.go index 61894981be9..0cc6fdf09e3 100644 --- a/libbeat/reader/filter/filter.go +++ b/libbeat/reader/filter/filter.go @@ -55,15 +55,22 @@ func NewParser(r reader.Reader, c *Config) *FilterParser { } } -func (p *FilterParser) Next() (reader.Message, error) { +func (p *FilterParser) Next() (message reader.Message, err error) { + // discardedOffset accounts for the bytes of discarded messages + var discardedOffset int + defer func() { + message.Bytes += discardedOffset + }() + for p.ctx.Err() == nil { - message, err := p.r.Next() + message, err = p.r.Next() if err != nil { return message, err } if p.matchAny(string(message.Content)) { return message, err } + discardedOffset += message.Bytes p.logger.Debug("dropping message because it does not match any of the provided patterns [%v]: %s", p.matchers, string(message.Content)) } return reader.Message{}, io.EOF