From 47f15027a1e9e0d9b3d32d9bd16675cb99703922 Mon Sep 17 00:00:00 2001 From: Yehor Shvedov <146825775+ev1yehor@users.noreply.github.com> Date: Thu, 28 Nov 2024 23:58:58 +0200 Subject: [PATCH 1/2] Update catalog-info file with correct system property (#41618) --- catalog-info.yaml | 54 +++++++++++++++++++++++------------------------ 1 file changed, 27 insertions(+), 27 deletions(-) diff --git a/catalog-info.yaml b/catalog-info.yaml index a0eca0c2c9f6..16d4bd7e0d0b 100644 --- a/catalog-info.yaml +++ b/catalog-info.yaml @@ -31,7 +31,7 @@ metadata: spec: type: buildkite-pipeline owner: group:ingest-fp - system: buildkite + system: platform-ingest implementation: apiVersion: buildkite.elastic.dev/v1 kind: Pipeline @@ -81,7 +81,7 @@ metadata: spec: type: buildkite-pipeline owner: group:ingest-fp - system: buildkite + system: platform-ingest implementation: apiVersion: buildkite.elastic.dev/v1 kind: Pipeline @@ -128,7 +128,7 @@ metadata: spec: type: buildkite-pipeline owner: group:ingest-fp - system: buildkite + system: platform-ingest implementation: apiVersion: buildkite.elastic.dev/v1 kind: Pipeline @@ -175,7 +175,7 @@ metadata: spec: type: buildkite-pipeline owner: group:ingest-fp - system: buildkite + system: platform-ingest implementation: apiVersion: buildkite.elastic.dev/v1 kind: Pipeline @@ -222,7 +222,7 @@ metadata: spec: type: buildkite-pipeline owner: group:ingest-fp - system: buildkite + system: platform-ingest implementation: apiVersion: buildkite.elastic.dev/v1 kind: Pipeline @@ -269,7 +269,7 @@ metadata: spec: type: buildkite-pipeline owner: group:ingest-fp - system: buildkite + system: platform-ingest implementation: apiVersion: buildkite.elastic.dev/v1 kind: Pipeline @@ -316,7 +316,7 @@ metadata: spec: type: buildkite-pipeline owner: group:ingest-fp - system: buildkite + system: platform-ingest implementation: apiVersion: buildkite.elastic.dev/v1 kind: Pipeline @@ -363,7 +363,7 @@ metadata: spec: type: buildkite-pipeline owner: group:ingest-fp - system: buildkite + system: platform-ingest implementation: apiVersion: buildkite.elastic.dev/v1 kind: Pipeline @@ -410,7 +410,7 @@ metadata: spec: type: buildkite-pipeline owner: group:ingest-fp - system: buildkite + system: platform-ingest implementation: apiVersion: buildkite.elastic.dev/v1 kind: Pipeline @@ -456,7 +456,7 @@ metadata: spec: type: buildkite-pipeline owner: group:ingest-fp - system: buildkite + system: platform-ingest implementation: apiVersion: buildkite.elastic.dev/v1 kind: Pipeline @@ -503,7 +503,7 @@ metadata: spec: type: buildkite-pipeline owner: group:ingest-fp - system: buildkite + system: platform-ingest implementation: apiVersion: buildkite.elastic.dev/v1 kind: Pipeline @@ -550,7 +550,7 @@ metadata: spec: type: buildkite-pipeline owner: group:ingest-fp - system: buildkite + system: platform-ingest implementation: apiVersion: buildkite.elastic.dev/v1 kind: Pipeline @@ -597,7 +597,7 @@ metadata: spec: type: buildkite-pipeline owner: group:ingest-fp - system: buildkite + system: platform-ingest implementation: apiVersion: buildkite.elastic.dev/v1 kind: Pipeline @@ -644,7 +644,7 @@ metadata: spec: type: buildkite-pipeline owner: group:ingest-fp - system: buildkite + system: platform-ingest implementation: apiVersion: buildkite.elastic.dev/v1 kind: Pipeline @@ -690,7 +690,7 @@ metadata: spec: type: buildkite-pipeline owner: group:ingest-fp - system: buildkite + system: platform-ingest implementation: apiVersion: buildkite.elastic.dev/v1 kind: Pipeline @@ -725,7 +725,7 @@ metadata: spec: type: buildkite-pipeline owner: group:ingest-fp - system: buildkite + system: platform-ingest implementation: apiVersion: buildkite.elastic.dev/v1 kind: Pipeline @@ -761,7 +761,7 @@ metadata: spec: type: buildkite-pipeline owner: group:ingest-fp - system: buildkite + system: platform-ingest implementation: apiVersion: buildkite.elastic.dev/v1 kind: Pipeline @@ -808,7 +808,7 @@ metadata: spec: type: buildkite-pipeline owner: group:ingest-fp - system: buildkite + system: platform-ingest implementation: apiVersion: buildkite.elastic.dev/v1 kind: Pipeline @@ -855,7 +855,7 @@ metadata: spec: type: buildkite-pipeline owner: group:ingest-fp - system: buildkite + system: platform-ingest implementation: apiVersion: buildkite.elastic.dev/v1 kind: Pipeline @@ -902,7 +902,7 @@ metadata: spec: type: buildkite-pipeline owner: group:ingest-fp - system: buildkite + system: platform-ingest implementation: apiVersion: buildkite.elastic.dev/v1 kind: Pipeline @@ -949,7 +949,7 @@ metadata: spec: type: buildkite-pipeline owner: group:ingest-fp - system: buildkite + system: platform-ingest implementation: apiVersion: buildkite.elastic.dev/v1 kind: Pipeline @@ -996,7 +996,7 @@ metadata: spec: type: buildkite-pipeline owner: group:ingest-fp - system: buildkite + system: platform-ingest implementation: apiVersion: buildkite.elastic.dev/v1 kind: Pipeline @@ -1043,7 +1043,7 @@ metadata: spec: type: buildkite-pipeline owner: group:ingest-fp - system: buildkite + system: platform-ingest implementation: apiVersion: buildkite.elastic.dev/v1 kind: Pipeline @@ -1092,7 +1092,7 @@ metadata: spec: type: buildkite-pipeline owner: group:ingest-fp - system: buildkite + system: platform-ingest implementation: apiVersion: buildkite.elastic.dev/v1 kind: Pipeline @@ -1128,7 +1128,7 @@ metadata: spec: type: buildkite-pipeline owner: group:ingest-fp - system: buildkite + system: platform-ingest implementation: apiVersion: buildkite.elastic.dev/v1 kind: Pipeline @@ -1162,7 +1162,7 @@ metadata: spec: type: buildkite-pipeline owner: group:ingest-fp - system: buildkite + system: platform-ingest implementation: apiVersion: buildkite.elastic.dev/v1 kind: Pipeline @@ -1205,7 +1205,7 @@ metadata: spec: type: buildkite-pipeline owner: group:ingest-fp - system: buildkite + system: platform-ingest implementation: apiVersion: buildkite.elastic.dev/v1 kind: Pipeline From e42589d7cc2f04d015fbf6e2a48a99b4fcd31ab4 Mon Sep 17 00:00:00 2001 From: Craig MacKenzie Date: Thu, 28 Nov 2024 18:31:52 -0500 Subject: [PATCH 2/2] Fix Kafka output panic at startup (#41824) * Make Kafka output satisfy NetworkClient interface. * Make Redis output satisfy network client. * Add initial regression integration test. * Add an integration test to ensure connectivity. * Fix build error in old integration test. * Fix redis lint error. * Fix typo in comment. * Fix another typo. --- libbeat/outputs/kafka/client.go | 2 +- .../outputs/kafka/kafka_integration_test.go | 2 +- libbeat/outputs/redis/backoff.go | 6 +- libbeat/outputs/redis/client.go | 2 +- libbeat/tests/integration/kafka_test.go | 89 +++++++++++++++++++ 5 files changed, 96 insertions(+), 5 deletions(-) create mode 100644 libbeat/tests/integration/kafka_test.go diff --git a/libbeat/outputs/kafka/client.go b/libbeat/outputs/kafka/client.go index 1780f1392b31..1e110f8ca3e9 100644 --- a/libbeat/outputs/kafka/client.go +++ b/libbeat/outputs/kafka/client.go @@ -113,7 +113,7 @@ func newKafkaClient( return c, nil } -func (c *client) Connect() error { +func (c *client) Connect(_ context.Context) error { c.mux.Lock() defer c.mux.Unlock() diff --git a/libbeat/outputs/kafka/kafka_integration_test.go b/libbeat/outputs/kafka/kafka_integration_test.go index e9abc559774d..b187b5d57ffa 100644 --- a/libbeat/outputs/kafka/kafka_integration_test.go +++ b/libbeat/outputs/kafka/kafka_integration_test.go @@ -280,7 +280,7 @@ func TestKafkaPublish(t *testing.T) { output, ok := grp.Clients[0].(*client) assert.True(t, ok, "grp.Clients[0] didn't contain a ptr to client") - if err := output.Connect(); err != nil { + if err := output.Connect(context.Background()); err != nil { t.Fatal(err) } assert.Equal(t, output.index, "testbeat") diff --git a/libbeat/outputs/redis/backoff.go b/libbeat/outputs/redis/backoff.go index 2abc1f846f0a..42f9db1c2854 100644 --- a/libbeat/outputs/redis/backoff.go +++ b/libbeat/outputs/redis/backoff.go @@ -19,6 +19,7 @@ package redis import ( "context" + "errors" "time" "github.com/gomodule/redigo/redis" @@ -61,7 +62,7 @@ func newBackoffClient(client *client, init, max time.Duration) *backoffClient { } func (b *backoffClient) Connect(ctx context.Context) error { - err := b.client.Connect() + err := b.client.Connect(ctx) if err != nil { // give the client a chance to promote an internal error to a network error. b.updateFailReason(err) @@ -102,7 +103,8 @@ func (b *backoffClient) updateFailReason(err error) { return } - if _, ok := err.(redis.Error); ok { + var redisErr *redis.Error + if errors.As(err, &redisErr) { b.reason = failRedis } else { b.reason = failOther diff --git a/libbeat/outputs/redis/client.go b/libbeat/outputs/redis/client.go index 9f5c9812dd10..db3ec5a3b433 100644 --- a/libbeat/outputs/redis/client.go +++ b/libbeat/outputs/redis/client.go @@ -90,7 +90,7 @@ func newClient( } } -func (c *client) Connect() error { +func (c *client) Connect(_ context.Context) error { c.log.Debug("connect") err := c.Client.Connect() if err != nil { diff --git a/libbeat/tests/integration/kafka_test.go b/libbeat/tests/integration/kafka_test.go new file mode 100644 index 000000000000..72e5b37e49d6 --- /dev/null +++ b/libbeat/tests/integration/kafka_test.go @@ -0,0 +1,89 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//go:build integration + +package integration + +import ( + "fmt" + "testing" + "time" + + "github.com/Shopify/sarama" +) + +var ( + // https://github.com/elastic/sarama/blob/c7eabfcee7e5bcd7d0071f0ece4d6bec8c33928a/config_test.go#L14-L17 + // The version of MockBroker used when this test was written only supports the lowest protocol version by default. + // Version incompatibilities will result in message decoding errors between the mock and the beat. + kafkaVersion = sarama.MinVersion + kafkaTopic = "test_topic" + kafkaCfg = ` +mockbeat: +logging: + level: debug + selectors: + - publisher_pipeline_output + - kafka +queue.mem: + events: 4096 + flush.timeout: 0s +output.kafka: + topic: %s + version: %s + hosts: + - %s + backoff: + init: 0.1s + max: 0.2s +` +) + +// TestKafkaOutputCanConnectAndPublish ensures the beat Kafka output can successfully produce messages to Kafka. +// Regression test for https://github.com/elastic/beats/issues/41823 where the Kafka output would +// panic on the first Publish because it's Connect method was no longer called. +func TestKafkaOutputCanConnectAndPublish(t *testing.T) { + // Create a Mock Kafka broker that will listen on localhost on a random unallocated port. + // The reference configuration was taken from https://github.com/elastic/sarama/blob/c7eabfcee7e5bcd7d0071f0ece4d6bec8c33928a/async_producer_test.go#L141. + leader := sarama.NewMockBroker(t, 1) + defer leader.Close() + + // The mock broker must respond to a single metadata request. + metadataResponse := new(sarama.MetadataResponse) + metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) + metadataResponse.AddTopicPartition(kafkaTopic, 0, leader.BrokerID(), nil, nil, nil, sarama.ErrNoError) + leader.Returns(metadataResponse) + + // The mock broker must return a single produce response. If no produce request is received, the test will fail. + // This guarantees that mockbeat successfully produced a message to Kafka and connectivity is established. + prodSuccess := new(sarama.ProduceResponse) + prodSuccess.AddTopicPartition(kafkaTopic, 0, sarama.ErrNoError) + leader.Returns(prodSuccess) + + // Start mockbeat with the appropriate configuration. + mockbeat := NewBeat(t, "mockbeat", "../../libbeat.test") + mockbeat.WriteConfigFile(fmt.Sprintf(kafkaCfg, kafkaTopic, kafkaVersion, leader.Addr())) + mockbeat.Start() + + // Wait for mockbeat to log that it successfully published a batch to Kafka. + // This ensures that mockbeat received the expected produce response configured above. + mockbeat.WaitForLogs( + `finished kafka batch`, + 10*time.Second, + "did not find finished batch log") +}