Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

x-pack/filebeat/input/awss3: fix document ID construction when using csv decoder #42019

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Rate limiting fixes in the Okta provider of the Entity Analytics input. {issue}40106[40106] {pull}41583[41583]
- Redact authorization headers in HTTPJSON debug logs. {pull}41920[41920]
- Further rate limiting fix in the Okta provider of the Entity Analytics input. {issue}40106[40106] {pull}41977[41977]
- Fix awss3 document ID construction when using the CSV decoder. {pull}42019[42019]

*Heartbeat*

Expand Down
5 changes: 4 additions & 1 deletion x-pack/filebeat/input/awss3/decoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ type decoder interface {
type valueDecoder interface {
decoder

decodeValue() (any, error)
// decodeValue returns the current value, and its offset
// in the stream. If the receiver is unable to provide
// a unique offset for the value, offset will be negative.
decodeValue() (offset int64, val any, _ error)
}

// newDecoder creates a new decoder based on the codec type.
Expand Down
12 changes: 7 additions & 5 deletions x-pack/filebeat/input/awss3/decoding_csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
r *csv.Reader

header []string
offset int64
current []string

err error
Expand Down Expand Up @@ -51,14 +52,15 @@
if d.err != nil {
return false
}
d.offset = d.r.InputOffset()
d.current, d.err = d.r.Read()
return d.err == nil
}

// decode returns the JSON encoded value of the current CSV line. next must
// have been called before any calls to decode.
func (d *csvDecoder) decode() ([]byte, error) {
v, err := d.decodeValue()
_, v, err := d.decodeValue()
if err != nil {
return nil, err
}
Expand All @@ -68,12 +70,12 @@
// decodeValue returns the value of the current CSV line interpreted as
// an object with fields based on the header held by the receiver. next must
// have been called before any calls to decode.
func (d *csvDecoder) decodeValue() (any, error) {
func (d *csvDecoder) decodeValue() (offset int64, val any, _ error) {
if d.err != nil {
return nil, d.err
return d.offset, nil, d.err
}
if len(d.current) == 0 {
return nil, fmt.Errorf("decode called before next")
return d.offset, nil, fmt.Errorf("decode called before next")
}
m := make(map[string]string, len(d.header))
// By the time we are here, current must be the same
Expand All @@ -83,12 +85,12 @@
for i, n := range d.header {
m[n] = d.current[i]
}
return m, nil
return d.offset, m, nil
}

// close closes the parquet decoder and releases the resources.
func (d *csvDecoder) close() error {
if d.err == io.EOF {

Check failure on line 93 in x-pack/filebeat/input/awss3/decoding_csv.go

View workflow job for this annotation

GitHub Actions / lint (linux)

comparing with == will fail on wrapped errors. Use errors.Is to check for a specific error (errorlint)
return nil
}
return d.err
Expand Down
12 changes: 9 additions & 3 deletions x-pack/filebeat/input/awss3/s3_objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,13 +158,12 @@ func (p *s3ObjectProcessor) ProcessS3Object(log *logp.Logger, eventCallback func
if err != nil {
return err
}
var evtOffset int64
switch dec := dec.(type) {
case valueDecoder:
defer dec.close()

for dec.next() {
val, err := dec.decodeValue()
evtOffset, val, err := dec.decodeValue()
if err != nil {
if errors.Is(err, io.EOF) {
return nil
Expand All @@ -183,6 +182,7 @@ func (p *s3ObjectProcessor) ProcessS3Object(log *logp.Logger, eventCallback func
case decoder:
defer dec.close()

var evtOffset int64
for dec.next() {
data, err := dec.decode()
if err != nil {
Expand Down Expand Up @@ -420,6 +420,10 @@ func (p *s3ObjectProcessor) readFile(r io.Reader) error {
return nil
}

// createEvent constructs a beat.Event from message and offset. The value of
// message populates the event message field, and offset is used to set the
// log.offset field and, if offset is non-negative, with the object's ARN and
// key, the @metadata._id field.
func (p *s3ObjectProcessor) createEvent(message string, offset int64) beat.Event {
event := beat.Event{
Timestamp: time.Now().UTC(),
Expand Down Expand Up @@ -448,7 +452,9 @@ func (p *s3ObjectProcessor) createEvent(message string, offset int64) beat.Event
},
},
}
event.SetID(objectID(p.s3ObjHash, offset))
if offset >= 0 {
andrewkroh marked this conversation as resolved.
Show resolved Hide resolved
event.SetID(objectID(p.s3ObjHash, offset))
}

if len(p.s3Metadata) > 0 {
_, _ = event.Fields.Put("aws.s3.metadata", p.s3Metadata)
Expand Down
Loading