From d7f3fff3317a997f6754770ec5f2689a2c4f9b9f Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Fri, 13 Dec 2024 22:46:50 +0530 Subject: [PATCH 1/2] fixed a scenario in which CEL would process empty/invalid messages in case of a websocket error --- x-pack/filebeat/input/streaming/websocket.go | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/x-pack/filebeat/input/streaming/websocket.go b/x-pack/filebeat/input/streaming/websocket.go index 8a8cf76fe36..d4523333998 100644 --- a/x-pack/filebeat/input/streaming/websocket.go +++ b/x-pack/filebeat/input/streaming/websocket.go @@ -116,6 +116,7 @@ func (s *websocketStream) FollowStream(ctx context.Context) error { return ctx.Err() default: _, message, err := c.ReadMessage() + s.metrics.receivedBytesTotal.Add(uint64(len(message))) if err != nil { s.metrics.errorsTotal.Inc() if isRetryableError(err) { @@ -137,15 +138,15 @@ func (s *websocketStream) FollowStream(ctx context.Context) error { s.log.Errorw("failed to read websocket data", "error", err) return err } - } - s.metrics.receivedBytesTotal.Add(uint64(len(message))) - state["response"] = message - s.log.Debugw("received websocket message", logp.Namespace("websocket"), "msg", string(message)) - err = s.process(ctx, state, s.cursor, s.now().In(time.UTC)) - if err != nil { - s.metrics.errorsTotal.Inc() - s.log.Errorw("failed to process and publish data", "error", err) - return err + } else { + state["response"] = message + s.log.Debugw("received websocket message", logp.Namespace("websocket"), "msg", string(message)) + err = s.process(ctx, state, s.cursor, s.now().In(time.UTC)) + if err != nil { + s.metrics.errorsTotal.Inc() + s.log.Errorw("failed to process and publish data", "error", err) + return err + } } } } From ba13e77d6d4fafd7a13855783e5f9c93c84d05b3 Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Fri, 13 Dec 2024 22:56:06 +0530 Subject: [PATCH 2/2] updated changelog --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index f0964ce1545..a866cd5fed4 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -195,6 +195,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Rate limiting fixes in the Okta provider of the Entity Analytics input. {issue}40106[40106] {pull}41583[41583] - Redact authorization headers in HTTPJSON debug logs. {pull}41920[41920] - Further rate limiting fix in the Okta provider of the Entity Analytics input. {issue}40106[40106] {pull}41977[41977] +- Fixed a scenario in the streaming input where CEL would process invalid/empty messages in case of a websocket error. {pull}42036[42036] *Heartbeat*