Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[filebeat][streaming] - Fixed a scenario where CEL would process invalid/empty messages in case of a websocket error #42036

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Rate limiting fixes in the Okta provider of the Entity Analytics input. {issue}40106[40106] {pull}41583[41583]
- Redact authorization headers in HTTPJSON debug logs. {pull}41920[41920]
- Further rate limiting fix in the Okta provider of the Entity Analytics input. {issue}40106[40106] {pull}41977[41977]
- Fixed a scenario in the streaming input where CEL would process invalid/empty messages in case of a websocket error. {pull}42036[42036]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- Fixed a scenario in the streaming input where CEL would process invalid/empty messages in case of a websocket error. {pull}42036[42036]
- Fix streaming input handling of invalid or empty websocket messages. {pull}42036[42036]


*Heartbeat*

Expand Down
19 changes: 10 additions & 9 deletions x-pack/filebeat/input/streaming/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ func (s *websocketStream) FollowStream(ctx context.Context) error {
return ctx.Err()
default:
_, message, err := c.ReadMessage()
s.metrics.receivedBytesTotal.Add(uint64(len(message)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why move this up? Is a message valid when err is non-nil?

Copy link
Contributor Author

@ShourieG ShourieG Dec 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we still count the received bytes irrespective of whether it's a valid message or not, since this is a metric ?

if err != nil {
s.metrics.errorsTotal.Inc()
if isRetryableError(err) {
Expand All @@ -137,15 +138,15 @@ func (s *websocketStream) FollowStream(ctx context.Context) error {
s.log.Errorw("failed to read websocket data", "error", err)
return err
}
}
s.metrics.receivedBytesTotal.Add(uint64(len(message)))
state["response"] = message
s.log.Debugw("received websocket message", logp.Namespace("websocket"), "msg", string(message))
err = s.process(ctx, state, s.cursor, s.now().In(time.UTC))
if err != nil {
s.metrics.errorsTotal.Inc()
s.log.Errorw("failed to process and publish data", "error", err)
return err
} else {
state["response"] = message
s.log.Debugw("received websocket message", logp.Namespace("websocket"), "msg", string(message))
err = s.process(ctx, state, s.cursor, s.now().In(time.UTC))
if err != nil {
s.metrics.errorsTotal.Inc()
s.log.Errorw("failed to process and publish data", "error", err)
return err
}
Comment on lines +141 to +149
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
} else {
state["response"] = message
s.log.Debugw("received websocket message", logp.Namespace("websocket"), "msg", string(message))
err = s.process(ctx, state, s.cursor, s.now().In(time.UTC))
if err != nil {
s.metrics.errorsTotal.Inc()
s.log.Errorw("failed to process and publish data", "error", err)
return err
}
continue
}
state["response"] = message
s.log.Debugw("received websocket message", logp.Namespace("websocket"), "msg", string(message))
err = s.process(ctx, state, s.cursor, s.now().In(time.UTC))
if err != nil {
s.metrics.errorsTotal.Inc()
s.log.Errorw("failed to process and publish data", "error", err)
return err
}

but why does the error case not return if !isRetryableError(err)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@efd6, It does return, the issue is when it is a RetryableError. In this scenario the message is still passed to CEL during the retry attempts. So if the 3rd attempt is successful the 1st 2 invalid messages are still processed by CEL leading to downstream errors in integration pipelines because the event itself might be malformed or not present. Also this is just unnecessary processing that can be avoided.

Copy link
Contributor Author

@ShourieG ShourieG Dec 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also I felt that an else block had better readability in this case since we already have a bunch of returns within the "if".

}
}
}
Expand Down
Loading