Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[filebeat][streaming] - Fixed a scenario where CEL would process invalid/empty messages in case of a websocket error #42036

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

ShourieG
Copy link
Contributor

Type of change

  • Bug

Proposed commit message

Fixed a scenario where CEL would process invalid/empty messages in case of a websocket connection/read error. This is specially problematic after the addition of retry logic where these errors could cause false positives downstream even though the connection is re-established successfully.

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • I have made corresponding change to the default configuration files
  • I have added tests that prove my fix is effective or that my feature works
  • I have added an entry in CHANGELOG.next.asciidoc or CHANGELOG-developer.next.asciidoc.

Disruptive User Impact

None

Author's Checklist

  • [ ]

How to test this PR locally

Related issues

Use cases

Screenshots

Logs

@ShourieG ShourieG requested a review from a team as a code owner December 13, 2024 17:24
@botelastic botelastic bot added the needs_team Indicates that the issue/PR needs a Team:* label label Dec 13, 2024
Copy link
Contributor

mergify bot commented Dec 13, 2024

This pull request does not have a backport label.
If this is a bug or security fix, could you label this PR @ShourieG? 🙏.
For such, you'll need to label your PR with:

  • The upcoming major version of the Elastic Stack
  • The upcoming minor version of the Elastic Stack (if you're not pushing a breaking change)

To fixup this pull request, you need to add the backport labels for the needed
branches, such as:

  • backport-8./d is the label to automatically backport to the 8./d branch. /d is the digit

Copy link
Contributor

mergify bot commented Dec 13, 2024

backport-8.x has been added to help with the transition to the new branch 8.x.
If you don't need it please use backport-skip label and remove the backport-8.x label.

@mergify mergify bot added the backport-8.x Automated backport to the 8.x branch with mergify label Dec 13, 2024
@ShourieG ShourieG added the Team:Security-Service Integrations Security Service Integrations Team label Dec 13, 2024
@botelastic botelastic bot removed the needs_team Indicates that the issue/PR needs a Team:* label label Dec 13, 2024
@elasticmachine
Copy link
Collaborator

Pinging @elastic/security-service-integrations (Team:Security-Service Integrations)

@ShourieG ShourieG added needs_team Indicates that the issue/PR needs a Team:* label bugfix labels Dec 13, 2024
@botelastic botelastic bot removed the needs_team Indicates that the issue/PR needs a Team:* label label Dec 13, 2024
@ShourieG ShourieG added needs_team Indicates that the issue/PR needs a Team:* label input:streaming labels Dec 13, 2024
@botelastic botelastic bot removed the needs_team Indicates that the issue/PR needs a Team:* label label Dec 13, 2024
@ShourieG ShourieG added needs_team Indicates that the issue/PR needs a Team:* label backport-8.16 Automated backport with mergify backport-8.17 Automated backport with mergify labels Dec 13, 2024
@botelastic botelastic bot removed the needs_team Indicates that the issue/PR needs a Team:* label label Dec 13, 2024
@botelastic
Copy link

botelastic bot commented Dec 13, 2024

This pull request doesn't have a Team:<team> label.

Copy link
Contributor

@efd6 efd6 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the proposed commit message needs a clearer explanation of what is being fixed; it's not the situation that's being fixed, it's the response to the situation, so describe that and say what the behaviour is and then how that is rectified.

@@ -195,6 +195,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Rate limiting fixes in the Okta provider of the Entity Analytics input. {issue}40106[40106] {pull}41583[41583]
- Redact authorization headers in HTTPJSON debug logs. {pull}41920[41920]
- Further rate limiting fix in the Okta provider of the Entity Analytics input. {issue}40106[40106] {pull}41977[41977]
- Fixed a scenario in the streaming input where CEL would process invalid/empty messages in case of a websocket error. {pull}42036[42036]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- Fixed a scenario in the streaming input where CEL would process invalid/empty messages in case of a websocket error. {pull}42036[42036]
- Fix streaming input handling of invalid or empty websocket messages. {pull}42036[42036]

@@ -116,6 +116,7 @@ func (s *websocketStream) FollowStream(ctx context.Context) error {
return ctx.Err()
default:
_, message, err := c.ReadMessage()
s.metrics.receivedBytesTotal.Add(uint64(len(message)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why move this up? Is a message valid when err is non-nil?

Copy link
Contributor Author

@ShourieG ShourieG Dec 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we still count the received bytes irrespective of whether it's a valid message or not, since this is a metric ?

Comment on lines +141 to +149
} else {
state["response"] = message
s.log.Debugw("received websocket message", logp.Namespace("websocket"), "msg", string(message))
err = s.process(ctx, state, s.cursor, s.now().In(time.UTC))
if err != nil {
s.metrics.errorsTotal.Inc()
s.log.Errorw("failed to process and publish data", "error", err)
return err
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
} else {
state["response"] = message
s.log.Debugw("received websocket message", logp.Namespace("websocket"), "msg", string(message))
err = s.process(ctx, state, s.cursor, s.now().In(time.UTC))
if err != nil {
s.metrics.errorsTotal.Inc()
s.log.Errorw("failed to process and publish data", "error", err)
return err
}
continue
}
state["response"] = message
s.log.Debugw("received websocket message", logp.Namespace("websocket"), "msg", string(message))
err = s.process(ctx, state, s.cursor, s.now().In(time.UTC))
if err != nil {
s.metrics.errorsTotal.Inc()
s.log.Errorw("failed to process and publish data", "error", err)
return err
}

but why does the error case not return if !isRetryableError(err)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@efd6, It does return, the issue is when it is a RetryableError. In this scenario the message is still passed to CEL during the retry attempts. So if the 3rd attempt is successful the 1st 2 invalid messages are still processed by CEL leading to downstream errors in integration pipelines because the event itself might be malformed or not present. Also this is just unnecessary processing that can be avoided.

Copy link
Contributor Author

@ShourieG ShourieG Dec 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also I felt that an else block had better readability in this case since we already have a bunch of returns within the "if".

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
backport-8.x Automated backport to the 8.x branch with mergify backport-8.16 Automated backport with mergify backport-8.17 Automated backport with mergify bugfix input:streaming Team:Security-Service Integrations Security Service Integrations Team
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants