Skip to content

Commit

Permalink
Merge branch 'main' into servicelabel_beats
Browse files Browse the repository at this point in the history
  • Loading branch information
ishleenk17 authored Nov 29, 2024
2 parents e3a55d3 + e42589d commit bdda7ad
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 32 deletions.
54 changes: 27 additions & 27 deletions catalog-info.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ metadata:
spec:
type: buildkite-pipeline
owner: group:ingest-fp
system: buildkite
system: platform-ingest
implementation:
apiVersion: buildkite.elastic.dev/v1
kind: Pipeline
Expand Down Expand Up @@ -81,7 +81,7 @@ metadata:
spec:
type: buildkite-pipeline
owner: group:ingest-fp
system: buildkite
system: platform-ingest
implementation:
apiVersion: buildkite.elastic.dev/v1
kind: Pipeline
Expand Down Expand Up @@ -128,7 +128,7 @@ metadata:
spec:
type: buildkite-pipeline
owner: group:ingest-fp
system: buildkite
system: platform-ingest
implementation:
apiVersion: buildkite.elastic.dev/v1
kind: Pipeline
Expand Down Expand Up @@ -175,7 +175,7 @@ metadata:
spec:
type: buildkite-pipeline
owner: group:ingest-fp
system: buildkite
system: platform-ingest
implementation:
apiVersion: buildkite.elastic.dev/v1
kind: Pipeline
Expand Down Expand Up @@ -222,7 +222,7 @@ metadata:
spec:
type: buildkite-pipeline
owner: group:ingest-fp
system: buildkite
system: platform-ingest
implementation:
apiVersion: buildkite.elastic.dev/v1
kind: Pipeline
Expand Down Expand Up @@ -269,7 +269,7 @@ metadata:
spec:
type: buildkite-pipeline
owner: group:ingest-fp
system: buildkite
system: platform-ingest
implementation:
apiVersion: buildkite.elastic.dev/v1
kind: Pipeline
Expand Down Expand Up @@ -316,7 +316,7 @@ metadata:
spec:
type: buildkite-pipeline
owner: group:ingest-fp
system: buildkite
system: platform-ingest
implementation:
apiVersion: buildkite.elastic.dev/v1
kind: Pipeline
Expand Down Expand Up @@ -363,7 +363,7 @@ metadata:
spec:
type: buildkite-pipeline
owner: group:ingest-fp
system: buildkite
system: platform-ingest
implementation:
apiVersion: buildkite.elastic.dev/v1
kind: Pipeline
Expand Down Expand Up @@ -410,7 +410,7 @@ metadata:
spec:
type: buildkite-pipeline
owner: group:ingest-fp
system: buildkite
system: platform-ingest
implementation:
apiVersion: buildkite.elastic.dev/v1
kind: Pipeline
Expand Down Expand Up @@ -456,7 +456,7 @@ metadata:
spec:
type: buildkite-pipeline
owner: group:ingest-fp
system: buildkite
system: platform-ingest
implementation:
apiVersion: buildkite.elastic.dev/v1
kind: Pipeline
Expand Down Expand Up @@ -503,7 +503,7 @@ metadata:
spec:
type: buildkite-pipeline
owner: group:ingest-fp
system: buildkite
system: platform-ingest
implementation:
apiVersion: buildkite.elastic.dev/v1
kind: Pipeline
Expand Down Expand Up @@ -550,7 +550,7 @@ metadata:
spec:
type: buildkite-pipeline
owner: group:ingest-fp
system: buildkite
system: platform-ingest
implementation:
apiVersion: buildkite.elastic.dev/v1
kind: Pipeline
Expand Down Expand Up @@ -597,7 +597,7 @@ metadata:
spec:
type: buildkite-pipeline
owner: group:ingest-fp
system: buildkite
system: platform-ingest
implementation:
apiVersion: buildkite.elastic.dev/v1
kind: Pipeline
Expand Down Expand Up @@ -644,7 +644,7 @@ metadata:
spec:
type: buildkite-pipeline
owner: group:ingest-fp
system: buildkite
system: platform-ingest
implementation:
apiVersion: buildkite.elastic.dev/v1
kind: Pipeline
Expand Down Expand Up @@ -690,7 +690,7 @@ metadata:
spec:
type: buildkite-pipeline
owner: group:ingest-fp
system: buildkite
system: platform-ingest
implementation:
apiVersion: buildkite.elastic.dev/v1
kind: Pipeline
Expand Down Expand Up @@ -725,7 +725,7 @@ metadata:
spec:
type: buildkite-pipeline
owner: group:ingest-fp
system: buildkite
system: platform-ingest
implementation:
apiVersion: buildkite.elastic.dev/v1
kind: Pipeline
Expand Down Expand Up @@ -761,7 +761,7 @@ metadata:
spec:
type: buildkite-pipeline
owner: group:ingest-fp
system: buildkite
system: platform-ingest
implementation:
apiVersion: buildkite.elastic.dev/v1
kind: Pipeline
Expand Down Expand Up @@ -808,7 +808,7 @@ metadata:
spec:
type: buildkite-pipeline
owner: group:ingest-fp
system: buildkite
system: platform-ingest
implementation:
apiVersion: buildkite.elastic.dev/v1
kind: Pipeline
Expand Down Expand Up @@ -855,7 +855,7 @@ metadata:
spec:
type: buildkite-pipeline
owner: group:ingest-fp
system: buildkite
system: platform-ingest
implementation:
apiVersion: buildkite.elastic.dev/v1
kind: Pipeline
Expand Down Expand Up @@ -902,7 +902,7 @@ metadata:
spec:
type: buildkite-pipeline
owner: group:ingest-fp
system: buildkite
system: platform-ingest
implementation:
apiVersion: buildkite.elastic.dev/v1
kind: Pipeline
Expand Down Expand Up @@ -949,7 +949,7 @@ metadata:
spec:
type: buildkite-pipeline
owner: group:ingest-fp
system: buildkite
system: platform-ingest
implementation:
apiVersion: buildkite.elastic.dev/v1
kind: Pipeline
Expand Down Expand Up @@ -996,7 +996,7 @@ metadata:
spec:
type: buildkite-pipeline
owner: group:ingest-fp
system: buildkite
system: platform-ingest
implementation:
apiVersion: buildkite.elastic.dev/v1
kind: Pipeline
Expand Down Expand Up @@ -1043,7 +1043,7 @@ metadata:
spec:
type: buildkite-pipeline
owner: group:ingest-fp
system: buildkite
system: platform-ingest
implementation:
apiVersion: buildkite.elastic.dev/v1
kind: Pipeline
Expand Down Expand Up @@ -1092,7 +1092,7 @@ metadata:
spec:
type: buildkite-pipeline
owner: group:ingest-fp
system: buildkite
system: platform-ingest
implementation:
apiVersion: buildkite.elastic.dev/v1
kind: Pipeline
Expand Down Expand Up @@ -1128,7 +1128,7 @@ metadata:
spec:
type: buildkite-pipeline
owner: group:ingest-fp
system: buildkite
system: platform-ingest
implementation:
apiVersion: buildkite.elastic.dev/v1
kind: Pipeline
Expand Down Expand Up @@ -1162,7 +1162,7 @@ metadata:
spec:
type: buildkite-pipeline
owner: group:ingest-fp
system: buildkite
system: platform-ingest
implementation:
apiVersion: buildkite.elastic.dev/v1
kind: Pipeline
Expand Down Expand Up @@ -1205,7 +1205,7 @@ metadata:
spec:
type: buildkite-pipeline
owner: group:ingest-fp
system: buildkite
system: platform-ingest
implementation:
apiVersion: buildkite.elastic.dev/v1
kind: Pipeline
Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/kafka/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func newKafkaClient(
return c, nil
}

func (c *client) Connect() error {
func (c *client) Connect(_ context.Context) error {
c.mux.Lock()
defer c.mux.Unlock()

Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/kafka/kafka_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func TestKafkaPublish(t *testing.T) {

output, ok := grp.Clients[0].(*client)
assert.True(t, ok, "grp.Clients[0] didn't contain a ptr to client")
if err := output.Connect(); err != nil {
if err := output.Connect(context.Background()); err != nil {
t.Fatal(err)
}
assert.Equal(t, output.index, "testbeat")
Expand Down
6 changes: 4 additions & 2 deletions libbeat/outputs/redis/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package redis

import (
"context"
"errors"
"time"

"github.com/gomodule/redigo/redis"
Expand Down Expand Up @@ -61,7 +62,7 @@ func newBackoffClient(client *client, init, max time.Duration) *backoffClient {
}

func (b *backoffClient) Connect(ctx context.Context) error {
err := b.client.Connect()
err := b.client.Connect(ctx)
if err != nil {
// give the client a chance to promote an internal error to a network error.
b.updateFailReason(err)
Expand Down Expand Up @@ -102,7 +103,8 @@ func (b *backoffClient) updateFailReason(err error) {
return
}

if _, ok := err.(redis.Error); ok {
var redisErr *redis.Error
if errors.As(err, &redisErr) {
b.reason = failRedis
} else {
b.reason = failOther
Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/redis/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func newClient(
}
}

func (c *client) Connect() error {
func (c *client) Connect(_ context.Context) error {
c.log.Debug("connect")
err := c.Client.Connect()
if err != nil {
Expand Down
89 changes: 89 additions & 0 deletions libbeat/tests/integration/kafka_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//go:build integration

package integration

import (
"fmt"
"testing"
"time"

"github.com/Shopify/sarama"
)

var (
// https://github.com/elastic/sarama/blob/c7eabfcee7e5bcd7d0071f0ece4d6bec8c33928a/config_test.go#L14-L17
// The version of MockBroker used when this test was written only supports the lowest protocol version by default.
// Version incompatibilities will result in message decoding errors between the mock and the beat.
kafkaVersion = sarama.MinVersion
kafkaTopic = "test_topic"
kafkaCfg = `
mockbeat:
logging:
level: debug
selectors:
- publisher_pipeline_output
- kafka
queue.mem:
events: 4096
flush.timeout: 0s
output.kafka:
topic: %s
version: %s
hosts:
- %s
backoff:
init: 0.1s
max: 0.2s
`
)

// TestKafkaOutputCanConnectAndPublish ensures the beat Kafka output can successfully produce messages to Kafka.
// Regression test for https://github.com/elastic/beats/issues/41823 where the Kafka output would
// panic on the first Publish because it's Connect method was no longer called.
func TestKafkaOutputCanConnectAndPublish(t *testing.T) {
// Create a Mock Kafka broker that will listen on localhost on a random unallocated port.
// The reference configuration was taken from https://github.com/elastic/sarama/blob/c7eabfcee7e5bcd7d0071f0ece4d6bec8c33928a/async_producer_test.go#L141.
leader := sarama.NewMockBroker(t, 1)
defer leader.Close()

// The mock broker must respond to a single metadata request.
metadataResponse := new(sarama.MetadataResponse)
metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
metadataResponse.AddTopicPartition(kafkaTopic, 0, leader.BrokerID(), nil, nil, nil, sarama.ErrNoError)
leader.Returns(metadataResponse)

// The mock broker must return a single produce response. If no produce request is received, the test will fail.
// This guarantees that mockbeat successfully produced a message to Kafka and connectivity is established.
prodSuccess := new(sarama.ProduceResponse)
prodSuccess.AddTopicPartition(kafkaTopic, 0, sarama.ErrNoError)
leader.Returns(prodSuccess)

// Start mockbeat with the appropriate configuration.
mockbeat := NewBeat(t, "mockbeat", "../../libbeat.test")
mockbeat.WriteConfigFile(fmt.Sprintf(kafkaCfg, kafkaTopic, kafkaVersion, leader.Addr()))
mockbeat.Start()

// Wait for mockbeat to log that it successfully published a batch to Kafka.
// This ensures that mockbeat received the expected produce response configured above.
mockbeat.WaitForLogs(
`finished kafka batch`,
10*time.Second,
"did not find finished batch log")
}

0 comments on commit bdda7ad

Please sign in to comment.