Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

x-pack/filebeat/input/awss3: fix document ID construction when using csv decoder #42019

Merged
merged 3 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Redact authorization headers in HTTPJSON debug logs. {pull}41920[41920]
- Further rate limiting fix in the Okta provider of the Entity Analytics input. {issue}40106[40106] {pull}41977[41977]
- Fix streaming input handling of invalid or empty websocket messages. {pull}42036[42036]
- Fix awss3 document ID construction when using the CSV decoder. {pull}42019[42019]

*Heartbeat*

Expand Down
5 changes: 4 additions & 1 deletion x-pack/filebeat/input/awss3/decoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ type decoder interface {
type valueDecoder interface {
decoder

decodeValue() (any, error)
// decodeValue returns the current value, and its offset
// in the stream. If the receiver is unable to provide
// a unique offset for the value, offset will be negative.
decodeValue() (offset int64, val any, _ error)
}

// newDecoder creates a new decoder based on the codec type.
Expand Down
12 changes: 7 additions & 5 deletions x-pack/filebeat/input/awss3/decoding_csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
r *csv.Reader

header []string
offset int64
current []string

err error
Expand Down Expand Up @@ -51,14 +52,15 @@
if d.err != nil {
return false
}
d.offset = d.r.InputOffset()
d.current, d.err = d.r.Read()
return d.err == nil
}

// decode returns the JSON encoded value of the current CSV line. next must
// have been called before any calls to decode.
func (d *csvDecoder) decode() ([]byte, error) {
v, err := d.decodeValue()
_, v, err := d.decodeValue()
if err != nil {
return nil, err
}
Expand All @@ -68,12 +70,12 @@
// decodeValue returns the value of the current CSV line interpreted as
// an object with fields based on the header held by the receiver. next must
// have been called before any calls to decode.
func (d *csvDecoder) decodeValue() (any, error) {
func (d *csvDecoder) decodeValue() (offset int64, val any, _ error) {
if d.err != nil {
return nil, d.err
return d.offset, nil, d.err
}
if len(d.current) == 0 {
return nil, fmt.Errorf("decode called before next")
return d.offset, nil, fmt.Errorf("decode called before next")
}
m := make(map[string]string, len(d.header))
// By the time we are here, current must be the same
Expand All @@ -83,12 +85,12 @@
for i, n := range d.header {
m[n] = d.current[i]
}
return m, nil
return d.offset, m, nil
}

// close closes the parquet decoder and releases the resources.
func (d *csvDecoder) close() error {
if d.err == io.EOF {

Check failure on line 93 in x-pack/filebeat/input/awss3/decoding_csv.go

View workflow job for this annotation

GitHub Actions / lint (windows)

comparing with == will fail on wrapped errors. Use errors.Is to check for a specific error (errorlint)

Check failure on line 93 in x-pack/filebeat/input/awss3/decoding_csv.go

View workflow job for this annotation

GitHub Actions / lint (linux)

comparing with == will fail on wrapped errors. Use errors.Is to check for a specific error (errorlint)
return nil
}
return d.err
Expand Down
15 changes: 11 additions & 4 deletions x-pack/filebeat/input/awss3/s3_objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,13 +158,12 @@ func (p *s3ObjectProcessor) ProcessS3Object(log *logp.Logger, eventCallback func
if err != nil {
return err
}
var evtOffset int64
switch dec := dec.(type) {
case valueDecoder:
defer dec.close()

for dec.next() {
val, err := dec.decodeValue()
evtOffset, val, err := dec.decodeValue()
if err != nil {
if errors.Is(err, io.EOF) {
return nil
Expand All @@ -183,6 +182,7 @@ func (p *s3ObjectProcessor) ProcessS3Object(log *logp.Logger, eventCallback func
case decoder:
defer dec.close()

var evtOffset int64
for dec.next() {
data, err := dec.decode()
if err != nil {
Expand Down Expand Up @@ -420,13 +420,17 @@ func (p *s3ObjectProcessor) readFile(r io.Reader) error {
return nil
}

// createEvent constructs a beat.Event from message and offset. The value of
// message populates the event message field, and offset is used to set the
// log.offset field and, with the object's ARN and key, the @metadata._id field.
// If offset is negative, it is ignored. No @metadata._id field is added to
// the event and the log.offset field is not set.
func (p *s3ObjectProcessor) createEvent(message string, offset int64) beat.Event {
event := beat.Event{
Timestamp: time.Now().UTC(),
Fields: mapstr.M{
"message": message,
"log": mapstr.M{
"offset": offset,
"file": mapstr.M{
"path": p.s3RequestURL,
},
Expand All @@ -448,7 +452,10 @@ func (p *s3ObjectProcessor) createEvent(message string, offset int64) beat.Event
},
},
}
event.SetID(objectID(p.s3ObjHash, offset))
if offset >= 0 {
event.Fields.Put("log.offset", offset)
event.SetID(objectID(p.s3ObjHash, offset))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we also skip setting the event ID? Is there something that uses this for idempotency?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was the stimulus for the change; in the case that there is no way to know the offset, we would end of making an @metadata._id for the document that is shared for all documents from the same object which would result in invalid document duplicates being handled by the index, and loss of data. By allowing a way to signal to the input that there is no way to differentiate between documents from the data source we allow downstream to know that it needs to fill the gap.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the clarification. In my first pass I missed part of the context.

}

if len(p.s3Metadata) > 0 {
_, _ = event.Fields.Put("aws.s3.metadata", p.s3Metadata)
Expand Down
Loading