Skip to content

Commit

Permalink
Merge branch '8.x' into mergify/bp/8.x/pr-41889
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelKatsoulis authored Dec 11, 2024
2 parents 48e9eb5 + ce624b1 commit 3a4cd4a
Show file tree
Hide file tree
Showing 12 changed files with 403 additions and 58 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -2894,6 +2894,7 @@ https://github.com/elastic/beats/compare/v7.17.0\...v8.0.0[View commits]
- Add `while_pattern` type to multiline reader. {pull}19662[19662]
- auditd dataset: Use `process.args` to store program arguments instead of `auditd.log.aNNN` fields. {pull}29601[29601]
- Remove deprecated old `awscloudwatch` input name. {pull}29844[29844]
- Remove `docker` input. Please use `filestream` input with `container` parser or `container` input. {pull}28817[28817]

*Metricbeat*

Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Add evaluation state dump debugging option to CEL input. {pull}41335[41335]
- Added support for retry configuration in GCS input. {issue}11580[11580] {pull}41862[41862]
- Improve S3 polling mode states registry when using list prefix option. {pull}41869[41869]
- Add support for SSL and Proxy configurations for websoket type in streaming input. {pull}41934[41934]
- AWS S3 input registry cleanup for untracked s3 objects. {pull}41694[41694]
- The environment variable `BEATS_AZURE_EVENTHUB_INPUT_TRACING_ENABLED: true` enables internal logs tracer for the azure-eventhub input. {issue}41931[41931] {pull}41932[41932]

Expand Down
25 changes: 0 additions & 25 deletions libbeat/outputs/otelconsumer/otelconsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package otelconsumer
import (
"context"
"fmt"
"strings"
"time"

"github.com/elastic/beats/v7/libbeat/beat"
Expand Down Expand Up @@ -103,30 +102,6 @@ func (out *otelConsumer) logsPublish(ctx context.Context, batch publisher.Batch)

err := out.logsConsumer.ConsumeLogs(ctx, pLogs)
if err != nil {
// If the batch is too large, the elasticsearchexporter will
// return a 413 error.
//
// At the moment, the exporter does not support batch splitting
// on error so we do it here.
//
// See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/36163.
if strings.Contains(err.Error(), "Request Entity Too Large") {
// Try and split the batch into smaller batches and retry
if batch.SplitRetry() {
st.BatchSplit()
st.RetryableErrors(len(events))
} else {
// If the batch could not be split, there is no option left but
// to drop it and log the error state.
batch.Drop()
st.PermanentErrors(len(events))
out.log.Errorf("the batch is too large to be sent: %v", err)
}

// Don't propagate the error, the batch was split and retried.
return nil
}

// Permanent errors shouldn't be retried. This tipically means
// the data cannot be serialized by the exporter that is attached
// to the pipeline or when the destination refuses the data because
Expand Down
27 changes: 0 additions & 27 deletions libbeat/outputs/otelconsumer/otelconsumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,33 +90,6 @@ func TestPublish(t *testing.T) {
assert.Equal(t, outest.BatchRetry, batch.Signals[0].Tag)
})

t.Run("split batch on entity too large error", func(t *testing.T) {
batch := outest.NewBatch(event1, event2, event3)

otelConsumer := makeOtelConsumer(t, func(ctx context.Context, ld plog.Logs) error {
return errors.New("Request Entity Too Large")
})

err := otelConsumer.Publish(ctx, batch)
assert.NoError(t, err)
assert.Len(t, batch.Signals, 1)
assert.Equal(t, outest.BatchSplitRetry, batch.Signals[0].Tag)
})

t.Run("drop batch if can't split on entity too large error", func(t *testing.T) {
batch := outest.NewBatch(event1)

otelConsumer := makeOtelConsumer(t, func(ctx context.Context, ld plog.Logs) error {
return errors.New("Request Entity Too Large")
})

err := otelConsumer.Publish(ctx, batch)
assert.NoError(t, err)
assert.Len(t, batch.Signals, 2)
assert.Equal(t, outest.BatchSplitRetry, batch.Signals[0].Tag)
assert.Equal(t, outest.BatchDrop, batch.Signals[1].Tag)
})

t.Run("drop batch on permanent consumer error", func(t *testing.T) {
batch := outest.NewBatch(event1, event2, event3)

Expand Down
22 changes: 22 additions & 0 deletions x-pack/filebeat/docs/inputs/input-streaming.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,28 @@ The minimum time to wait between retries. This ensures that retries are spaced o

The maximum time to wait between retries. This prevents the retry mechanism from becoming too slow, ensuring that the client does not wait indefinitely between retries. This is crucial in systems where timeouts or user experience are critical. For example, `wait_max` might be set to 10 seconds, meaning that even if the calculated backoff is greater than this, the client will wait at most 10 seconds before retrying.

[float]
=== `timeout`
Timeout is the maximum amount of time the websocket dialer will wait for a connection to be established. The default value is `180` seconds.

[float]
==== `proxy_url`
This specifies the forward proxy URL to use for the connection. The `proxy_url` configuration is optional and can be used to configure the proxy settings for the connection. The `proxy_url` default value is set by `http.ProxyFromEnvironment` which reads the `HTTP_PROXY`, `HTTPS_PROXY`, and `NO_PROXY` environment variables.

[float]
==== `proxy_headers`
This specifies the headers to be sent to the proxy server. The `proxy_headers` configuration is optional and can be used to configure the headers to be sent to the proxy server.

[float]
==== `ssl`
This specifies the SSL configuration for the connection. The `ssl` configuration is optional and can be used to configure the SSL settings for the connection. The `ssl` configuration has the following subfields:

- `certificate_authorities`: A list of root certificates to use for verifying the server's certificate.
- `certificate`: The (PEM encoded) certificate to use for client authentication.
- `key`: The (PEM encoded) private key to use for client authentication.

If this is a self-signed certificate, the `certificate_authorities` field should be set to the certificate itself.

[float]
=== Metrics

Expand Down
11 changes: 9 additions & 2 deletions x-pack/filebeat/input/streaming/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,8 @@ type config struct {
Redact *redact `config:"redact"`
// Retry is the configuration for retrying failed connections.
Retry *retry `config:"retry"`

// Transport is the common the transport config.
Transport httpcommon.HTTPTransportSettings `config:",inline"`

// CrowdstrikeAppID is the value used to set the
// appId request parameter in the FalconHose stream
// discovery request.
Expand Down Expand Up @@ -166,3 +165,11 @@ func checkURLScheme(c config) error {
return fmt.Errorf("unknown stream type: %s", c.Type)
}
}

func defaultConfig() config {
return config{
Transport: httpcommon.HTTPTransportSettings{
Timeout: 180 * time.Second,
},
}
}
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/streaming/input_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func NewInputManager(log *logp.Logger, store inputcursor.StateStore) InputManage
}

func cursorConfigure(cfg *conf.C) ([]inputcursor.Source, inputcursor.Input, error) {
src := &source{cfg: config{}}
src := &source{cfg: defaultConfig()}
if err := cfg.Unpack(&src.cfg); err != nil {
return nil, nil, err
}
Expand Down
Loading

0 comments on commit 3a4cd4a

Please sign in to comment.