Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.17](backport #41862) [filebeat][gcs] - Added support for retry config #41990

Merged
merged 2 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- AWS S3 input registry cleanup for untracked s3 objects. {pull}41694[41694]
- The environment variable `BEATS_AZURE_EVENTHUB_INPUT_TRACING_ENABLED: true` enables internal logs tracer for the azure-eventhub input. {issue}41931[41931] {pull}41932[41932]
- Refactor & cleanup with updates to default values and documentation. {pull}41834[41834]
- Added support for retry configuration in GCS input. {issue}11580[11580] {pull}41862[41862]

*Auditbeat*

Expand Down
40 changes: 40 additions & 0 deletions x-pack/filebeat/docs/inputs/input-gcs.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ Now let's explore the configuration attributes a bit more elaborately.
11. <<attrib-file_selectors-gcs,file_selectors>>
12. <<attrib-expand_event_list_from_field-gcs,expand_event_list_from_field>>
13. <<attrib-timestamp_epoch-gcs,timestamp_epoch>>
14. <<attrib-retry-gcs,retry>>


[id="attrib-project-id"]
Expand Down Expand Up @@ -397,6 +398,45 @@ filebeat.inputs:

The GCS APIs don't provide a direct way to filter files based on the timestamp, so the input will download all the files and then filter them based on the timestamp. This can cause a bottleneck in processing if the number of files are very high. It is recommended to use this attribute only when the number of files are limited or ample resources are available. This option scales vertically and not horizontally.

[id="attrib-retry-gcs"]
[float]
==== `retry`

This attribute can be used to configure a list of sub attributes that directly control how the input should behave when a download for a file/object fails or gets interrupted.

- `max_attempts`: This attribute defines the maximum number of retry attempts(including the initial api call) that should be attempted for a retryable error. The default value for this is `3`.
- `initial_backoff_duration`: This attribute defines the initial backoff time. The default value for this is `1s`.
- `max_backoff_duration`: This attribute defines the maximum backoff time. The default value for this is `30s`.
- `backoff_multiplier`: This attribute defines the backoff multiplication factor. The default value for this is `2`.

NOTE: The `initial_backoff_duration` and `max_backoff_duration` attributes must have time units. Valid time units are `ns`, `us` (or `µs`), `ms`, `s`, `m`, `h`.

By configuring these attributes, the user is given the flexibility to control how the input should behave when a download fails or gets interrupted. This attribute can only be
specified at the root level of the configuration and not at the bucket level. It applies uniformly to all the buckets.

An example configuration is given below :-

[source, yml]
----
filebeat.inputs:
- type: gcs
project_id: my_project_id
auth.credentials_file.path: {{file_path}}/{{creds_file_name}}.json
retry:
max_attempts: 3
initial_backoff_duration: 2s
max_backoff_duration: 60s
backoff_multiplier: 2
buckets:
- name: obs-bucket
max_workers: 3
poll: true
poll_interval: 11m
bucket_timeout: 10m
----

When configuring the `retry` attribute, the user should consider the `bucket_timeout` value. The `retry` attribute should be configured in such a way that the retries are completed within the `bucket_timeout` window. If the `retry` attribute is configured in such a way that the retries are not completed successfully within the `bucket_timeout` window, the input will suffer a `context timeout` for that specific object/file which it was retrying. This can cause gaps in ingested data to pile up over time.

[id="bucket-overrides"]
*The sample configs below will explain the bucket level overriding of attributes a bit further :-*

Expand Down
21 changes: 21 additions & 0 deletions x-pack/filebeat/input/gcs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ type config struct {
ExpandEventListFromField string `config:"expand_event_list_from_field"`
// This field is only used for system test purposes, to override the HTTP endpoint.
AlternativeHost string `config:"alternative_host"`
// Retry - Defines the retry configuration for the input.
Retry retryConfig `config:"retry"`
}

// bucket contains the config for each specific object storage bucket in the root account
Expand Down Expand Up @@ -92,6 +94,19 @@ type jsonCredentialsConfig struct {
AccountKey string `config:"account_key"`
}

type retryConfig struct {
// MaxAttempts configures the maximum number of times an API call can be made in the case of retryable errors.
// For example, if you set MaxAttempts(5), the operation will be attempted up to 5 times total (initial call plus 4 retries).
// If you set MaxAttempts(1), the operation will be attempted only once and there will be no retries. This setting defaults to 3.
MaxAttempts int `config:"max_attempts" validate:"min=1"`
// InitialBackOffDuration is the initial value of the retry period, defaults to 1 second.
InitialBackOffDuration time.Duration `config:"initial_backoff_duration" validate:"min=1"`
// MaxBackOffDuration is the maximum value of the retry period, defaults to 30 seconds.
MaxBackOffDuration time.Duration `config:"max_backoff_duration" validate:"min=2"`
// BackOffMultiplier is the factor by which the retry period increases. It should be greater than 1 and defaults to 2.
BackOffMultiplier float64 `config:"backoff_multiplier" validate:"min=1.1"`
}

func (c authConfig) Validate() error {
// credentials_file
if c.CredentialsFile != nil {
Expand Down Expand Up @@ -126,5 +141,11 @@ func defaultConfig() config {
PollInterval: 5 * time.Minute,
BucketTimeOut: 120 * time.Second,
ParseJSON: false,
Retry: retryConfig{
MaxAttempts: 3,
InitialBackOffDuration: time.Second,
MaxBackOffDuration: 30 * time.Second,
BackOffMultiplier: 2,
},
}
}
7 changes: 6 additions & 1 deletion x-pack/filebeat/input/gcs/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
ExpandEventListFromField: bucket.ExpandEventListFromField,
FileSelectors: bucket.FileSelectors,
ReaderConfig: bucket.ReaderConfig,
Retry: config.Retry,
})
}

Expand Down Expand Up @@ -126,7 +127,7 @@
func (input *gcsInput) Run(inputCtx v2.Context, src cursor.Source,
cursor cursor.Cursor, publisher cursor.Publisher) error {
st := newState()
currentSource := src.(*Source)

Check failure on line 130 in x-pack/filebeat/input/gcs/input.go

View workflow job for this annotation

GitHub Actions / lint (linux)

Error return value is not checked (errcheck)

log := inputCtx.Logger.With("project_id", currentSource.ProjectId).With("bucket", currentSource.BucketName)
log.Infof("Running google cloud storage for project: %s", input.config.ProjectId)
Expand Down Expand Up @@ -158,9 +159,13 @@
}

bucket := client.Bucket(currentSource.BucketName).Retryer(
// Use WithMaxAttempts to change the maximum number of attempts.
storage.WithMaxAttempts(currentSource.Retry.MaxAttempts),
// Use WithBackoff to change the timing of the exponential backoff.
storage.WithBackoff(gax.Backoff{
Initial: 2 * time.Second,
Initial: currentSource.Retry.InitialBackOffDuration,
Max: currentSource.Retry.MaxBackOffDuration,
Multiplier: currentSource.Retry.BackOffMultiplier,
}),
// RetryAlways will retry the operation even if it is non-idempotent.
// Since we are only reading, the operation is always idempotent
Expand Down
8 changes: 6 additions & 2 deletions x-pack/filebeat/input/gcs/input_stateless.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import (
"context"
"time"

"cloud.google.com/go/storage"
gax "github.com/googleapis/gax-go/v2"
Expand Down Expand Up @@ -64,10 +63,11 @@
ExpandEventListFromField: bucket.ExpandEventListFromField,
FileSelectors: bucket.FileSelectors,
ReaderConfig: bucket.ReaderConfig,
Retry: in.config.Retry,
}

st := newState()
currentSource := source.(*Source)

Check failure on line 70 in x-pack/filebeat/input/gcs/input_stateless.go

View workflow job for this annotation

GitHub Actions / lint (linux)

Error return value is not checked (errcheck)
log := inputCtx.Logger.With("project_id", currentSource.ProjectId).With("bucket", currentSource.BucketName)
metrics := newInputMetrics(inputCtx.ID+":"+currentSource.BucketName, nil)
defer metrics.Close()
Expand All @@ -80,9 +80,13 @@
}()

bkt := client.Bucket(currentSource.BucketName).Retryer(
// Use WithMaxAttempts to change the maximum number of attempts.
storage.WithMaxAttempts(currentSource.Retry.MaxAttempts),
// Use WithBackoff to change the timing of the exponential backoff.
storage.WithBackoff(gax.Backoff{
Initial: 2 * time.Second,
Initial: currentSource.Retry.InitialBackOffDuration,
Max: currentSource.Retry.MaxBackOffDuration,
Multiplier: currentSource.Retry.BackOffMultiplier,
}),
// RetryAlways will retry the operation even if it is non-idempotent.
// Since we are only reading, the operation is always idempotent
Expand Down
75 changes: 75 additions & 0 deletions x-pack/filebeat/input/gcs/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,81 @@ func Test_StorageClient(t *testing.T) {
mock.Gcs_test_new_object_docs_ata_json: true,
},
},
{
name: "RetryWithDefaultValues",
baseConfig: map[string]interface{}{
"project_id": "elastic-sa",
"auth.credentials_file.path": "testdata/gcs_creds.json",
"max_workers": 1,
"poll": true,
"poll_interval": "1m",
"bucket_timeout": "1m",
"buckets": []map[string]interface{}{
{
"name": "gcs-test-new",
},
},
},
mockHandler: mock.GCSRetryServer,
expected: map[string]bool{
mock.Gcs_test_new_object_ata_json: true,
mock.Gcs_test_new_object_data3_json: true,
mock.Gcs_test_new_object_docs_ata_json: true,
},
},
{
name: "RetryWithCustomValues",
baseConfig: map[string]interface{}{
"project_id": "elastic-sa",
"auth.credentials_file.path": "testdata/gcs_creds.json",
"max_workers": 1,
"poll": true,
"poll_interval": "10s",
"bucket_timeout": "10s",
"retry": map[string]interface{}{
"max_attempts": 5,
"initial_backoff_duration": "1s",
"max_backoff_duration": "3s",
"backoff_multiplier": 1.4,
},
"buckets": []map[string]interface{}{
{
"name": "gcs-test-new",
},
},
},
mockHandler: mock.GCSRetryServer,
expected: map[string]bool{
mock.Gcs_test_new_object_ata_json: true,
mock.Gcs_test_new_object_data3_json: true,
mock.Gcs_test_new_object_docs_ata_json: true,
},
},
{
name: "RetryMinimumValueCheck",
baseConfig: map[string]interface{}{
"project_id": "elastic-sa",
"auth.credentials_file.path": "testdata/gcs_creds.json",
"max_workers": 1,
"poll": true,
"poll_interval": "10s",
"bucket_timeout": "10s",
"retry": map[string]interface{}{
"max_attempts": 5,
"initial_backoff_duration": "1s",
"max_backoff_duration": "3s",
"backoff_multiplier": 1,
},
"buckets": []map[string]interface{}{
{
"name": "gcs-test-new",
},
},
},
mockHandler: mock.GCSRetryServer,
expected: map[string]bool{},
isError: errors.New("requires value >= 1.1 accessing 'retry.backoff_multiplier'"),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
40 changes: 40 additions & 0 deletions x-pack/filebeat/input/gcs/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,43 @@ func GCSFileServer() http.Handler {
w.Write([]byte("resource not found"))
})
}

//nolint:errcheck // We can ignore errors here, as this is just for testing
func GCSRetryServer() http.Handler {
retries := 0
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
retries++
path := strings.Split(strings.TrimLeft(r.URL.Path, "/"), "/")
if r.Method == http.MethodGet && retries >= 3 {
switch len(path) {
case 2:
if path[0] == "b" {
if buckets[path[1]] {
w.Write([]byte(fetchBucket[path[1]]))
return
}
} else if buckets[path[0]] && availableObjects[path[0]][path[1]] {
w.Write([]byte(objects[path[0]][path[1]]))
return
}
case 3:
if path[0] == "b" && path[2] == "o" {
if buckets[path[1]] {
w.Write([]byte(objectList[path[1]]))
return
}
} else if buckets[path[0]] {
objName := strings.Join(path[1:], "/")
if availableObjects[path[0]][objName] {
w.Write([]byte(objects[path[0]][objName]))
return
}
}
default:
w.WriteHeader(http.StatusNotFound)
return
}
}
w.WriteHeader(http.StatusInternalServerError)
})
}
1 change: 1 addition & 0 deletions x-pack/filebeat/input/gcs/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type Source struct {
FileSelectors []fileSelectorConfig
ReaderConfig readerConfig
ExpandEventListFromField string
Retry retryConfig
}

func (s *Source) Name() string {
Expand Down
Loading