diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index e77db13d36c..8c2da31dc99 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -324,6 +324,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - AWS S3 input registry cleanup for untracked s3 objects. {pull}41694[41694] - The environment variable `BEATS_AZURE_EVENTHUB_INPUT_TRACING_ENABLED: true` enables internal logs tracer for the azure-eventhub input. {issue}41931[41931] {pull}41932[41932] - Refactor & cleanup with updates to default values and documentation. {pull}41834[41834] +- Added support for retry configuration in GCS input. {issue}11580[11580] {pull}41862[41862] *Auditbeat* diff --git a/x-pack/filebeat/docs/inputs/input-gcs.asciidoc b/x-pack/filebeat/docs/inputs/input-gcs.asciidoc index 2a762ddec18..d85a7393440 100644 --- a/x-pack/filebeat/docs/inputs/input-gcs.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-gcs.asciidoc @@ -164,6 +164,7 @@ Now let's explore the configuration attributes a bit more elaborately. 11. <> 12. <> 13. <> + 14. <> [id="attrib-project-id"] @@ -397,6 +398,45 @@ filebeat.inputs: The GCS APIs don't provide a direct way to filter files based on the timestamp, so the input will download all the files and then filter them based on the timestamp. This can cause a bottleneck in processing if the number of files are very high. It is recommended to use this attribute only when the number of files are limited or ample resources are available. This option scales vertically and not horizontally. +[id="attrib-retry-gcs"] +[float] +==== `retry` + +This attribute can be used to configure a list of sub attributes that directly control how the input should behave when a download for a file/object fails or gets interrupted. + + - `max_attempts`: This attribute defines the maximum number of retry attempts(including the initial api call) that should be attempted for a retryable error. The default value for this is `3`. + - `initial_backoff_duration`: This attribute defines the initial backoff time. The default value for this is `1s`. + - `max_backoff_duration`: This attribute defines the maximum backoff time. The default value for this is `30s`. + - `backoff_multiplier`: This attribute defines the backoff multiplication factor. The default value for this is `2`. + +NOTE: The `initial_backoff_duration` and `max_backoff_duration` attributes must have time units. Valid time units are `ns`, `us` (or `µs`), `ms`, `s`, `m`, `h`. + +By configuring these attributes, the user is given the flexibility to control how the input should behave when a download fails or gets interrupted. This attribute can only be +specified at the root level of the configuration and not at the bucket level. It applies uniformly to all the buckets. + +An example configuration is given below :- + +[source, yml] +---- +filebeat.inputs: +- type: gcs + project_id: my_project_id + auth.credentials_file.path: {{file_path}}/{{creds_file_name}}.json + retry: + max_attempts: 3 + initial_backoff_duration: 2s + max_backoff_duration: 60s + backoff_multiplier: 2 + buckets: + - name: obs-bucket + max_workers: 3 + poll: true + poll_interval: 11m + bucket_timeout: 10m +---- + +When configuring the `retry` attribute, the user should consider the `bucket_timeout` value. The `retry` attribute should be configured in such a way that the retries are completed within the `bucket_timeout` window. If the `retry` attribute is configured in such a way that the retries are not completed successfully within the `bucket_timeout` window, the input will suffer a `context timeout` for that specific object/file which it was retrying. This can cause gaps in ingested data to pile up over time. + [id="bucket-overrides"] *The sample configs below will explain the bucket level overriding of attributes a bit further :-* diff --git a/x-pack/filebeat/input/gcs/config.go b/x-pack/filebeat/input/gcs/config.go index 64f64c69bc5..9a9f6dceadd 100644 --- a/x-pack/filebeat/input/gcs/config.go +++ b/x-pack/filebeat/input/gcs/config.go @@ -50,6 +50,8 @@ type config struct { ExpandEventListFromField string `config:"expand_event_list_from_field"` // This field is only used for system test purposes, to override the HTTP endpoint. AlternativeHost string `config:"alternative_host"` + // Retry - Defines the retry configuration for the input. + Retry retryConfig `config:"retry"` } // bucket contains the config for each specific object storage bucket in the root account @@ -92,6 +94,19 @@ type jsonCredentialsConfig struct { AccountKey string `config:"account_key"` } +type retryConfig struct { + // MaxAttempts configures the maximum number of times an API call can be made in the case of retryable errors. + // For example, if you set MaxAttempts(5), the operation will be attempted up to 5 times total (initial call plus 4 retries). + // If you set MaxAttempts(1), the operation will be attempted only once and there will be no retries. This setting defaults to 3. + MaxAttempts int `config:"max_attempts" validate:"min=1"` + // InitialBackOffDuration is the initial value of the retry period, defaults to 1 second. + InitialBackOffDuration time.Duration `config:"initial_backoff_duration" validate:"min=1"` + // MaxBackOffDuration is the maximum value of the retry period, defaults to 30 seconds. + MaxBackOffDuration time.Duration `config:"max_backoff_duration" validate:"min=2"` + // BackOffMultiplier is the factor by which the retry period increases. It should be greater than 1 and defaults to 2. + BackOffMultiplier float64 `config:"backoff_multiplier" validate:"min=1.1"` +} + func (c authConfig) Validate() error { // credentials_file if c.CredentialsFile != nil { @@ -126,5 +141,11 @@ func defaultConfig() config { PollInterval: 5 * time.Minute, BucketTimeOut: 120 * time.Second, ParseJSON: false, + Retry: retryConfig{ + MaxAttempts: 3, + InitialBackOffDuration: time.Second, + MaxBackOffDuration: 30 * time.Second, + BackOffMultiplier: 2, + }, } } diff --git a/x-pack/filebeat/input/gcs/input.go b/x-pack/filebeat/input/gcs/input.go index 33e46d034d7..1d6bff8b857 100644 --- a/x-pack/filebeat/input/gcs/input.go +++ b/x-pack/filebeat/input/gcs/input.go @@ -72,6 +72,7 @@ func configure(cfg *conf.C) ([]cursor.Source, cursor.Input, error) { ExpandEventListFromField: bucket.ExpandEventListFromField, FileSelectors: bucket.FileSelectors, ReaderConfig: bucket.ReaderConfig, + Retry: config.Retry, }) } @@ -158,9 +159,13 @@ func (input *gcsInput) Run(inputCtx v2.Context, src cursor.Source, } bucket := client.Bucket(currentSource.BucketName).Retryer( + // Use WithMaxAttempts to change the maximum number of attempts. + storage.WithMaxAttempts(currentSource.Retry.MaxAttempts), // Use WithBackoff to change the timing of the exponential backoff. storage.WithBackoff(gax.Backoff{ - Initial: 2 * time.Second, + Initial: currentSource.Retry.InitialBackOffDuration, + Max: currentSource.Retry.MaxBackOffDuration, + Multiplier: currentSource.Retry.BackOffMultiplier, }), // RetryAlways will retry the operation even if it is non-idempotent. // Since we are only reading, the operation is always idempotent diff --git a/x-pack/filebeat/input/gcs/input_stateless.go b/x-pack/filebeat/input/gcs/input_stateless.go index c0038bf31dc..b6901f0df25 100644 --- a/x-pack/filebeat/input/gcs/input_stateless.go +++ b/x-pack/filebeat/input/gcs/input_stateless.go @@ -6,7 +6,6 @@ package gcs import ( "context" - "time" "cloud.google.com/go/storage" gax "github.com/googleapis/gax-go/v2" @@ -64,6 +63,7 @@ func (in *statelessInput) Run(inputCtx v2.Context, publisher stateless.Publisher ExpandEventListFromField: bucket.ExpandEventListFromField, FileSelectors: bucket.FileSelectors, ReaderConfig: bucket.ReaderConfig, + Retry: in.config.Retry, } st := newState() @@ -80,9 +80,13 @@ func (in *statelessInput) Run(inputCtx v2.Context, publisher stateless.Publisher }() bkt := client.Bucket(currentSource.BucketName).Retryer( + // Use WithMaxAttempts to change the maximum number of attempts. + storage.WithMaxAttempts(currentSource.Retry.MaxAttempts), // Use WithBackoff to change the timing of the exponential backoff. storage.WithBackoff(gax.Backoff{ - Initial: 2 * time.Second, + Initial: currentSource.Retry.InitialBackOffDuration, + Max: currentSource.Retry.MaxBackOffDuration, + Multiplier: currentSource.Retry.BackOffMultiplier, }), // RetryAlways will retry the operation even if it is non-idempotent. // Since we are only reading, the operation is always idempotent diff --git a/x-pack/filebeat/input/gcs/input_test.go b/x-pack/filebeat/input/gcs/input_test.go index 5595622c93e..345c7c57909 100644 --- a/x-pack/filebeat/input/gcs/input_test.go +++ b/x-pack/filebeat/input/gcs/input_test.go @@ -520,6 +520,81 @@ func Test_StorageClient(t *testing.T) { mock.Gcs_test_new_object_docs_ata_json: true, }, }, + { + name: "RetryWithDefaultValues", + baseConfig: map[string]interface{}{ + "project_id": "elastic-sa", + "auth.credentials_file.path": "testdata/gcs_creds.json", + "max_workers": 1, + "poll": true, + "poll_interval": "1m", + "bucket_timeout": "1m", + "buckets": []map[string]interface{}{ + { + "name": "gcs-test-new", + }, + }, + }, + mockHandler: mock.GCSRetryServer, + expected: map[string]bool{ + mock.Gcs_test_new_object_ata_json: true, + mock.Gcs_test_new_object_data3_json: true, + mock.Gcs_test_new_object_docs_ata_json: true, + }, + }, + { + name: "RetryWithCustomValues", + baseConfig: map[string]interface{}{ + "project_id": "elastic-sa", + "auth.credentials_file.path": "testdata/gcs_creds.json", + "max_workers": 1, + "poll": true, + "poll_interval": "10s", + "bucket_timeout": "10s", + "retry": map[string]interface{}{ + "max_attempts": 5, + "initial_backoff_duration": "1s", + "max_backoff_duration": "3s", + "backoff_multiplier": 1.4, + }, + "buckets": []map[string]interface{}{ + { + "name": "gcs-test-new", + }, + }, + }, + mockHandler: mock.GCSRetryServer, + expected: map[string]bool{ + mock.Gcs_test_new_object_ata_json: true, + mock.Gcs_test_new_object_data3_json: true, + mock.Gcs_test_new_object_docs_ata_json: true, + }, + }, + { + name: "RetryMinimumValueCheck", + baseConfig: map[string]interface{}{ + "project_id": "elastic-sa", + "auth.credentials_file.path": "testdata/gcs_creds.json", + "max_workers": 1, + "poll": true, + "poll_interval": "10s", + "bucket_timeout": "10s", + "retry": map[string]interface{}{ + "max_attempts": 5, + "initial_backoff_duration": "1s", + "max_backoff_duration": "3s", + "backoff_multiplier": 1, + }, + "buckets": []map[string]interface{}{ + { + "name": "gcs-test-new", + }, + }, + }, + mockHandler: mock.GCSRetryServer, + expected: map[string]bool{}, + isError: errors.New("requires value >= 1.1 accessing 'retry.backoff_multiplier'"), + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/x-pack/filebeat/input/gcs/mock/mock.go b/x-pack/filebeat/input/gcs/mock/mock.go index 50d2a431e01..1def436511a 100644 --- a/x-pack/filebeat/input/gcs/mock/mock.go +++ b/x-pack/filebeat/input/gcs/mock/mock.go @@ -98,3 +98,43 @@ func GCSFileServer() http.Handler { w.Write([]byte("resource not found")) }) } + +//nolint:errcheck // We can ignore errors here, as this is just for testing +func GCSRetryServer() http.Handler { + retries := 0 + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + retries++ + path := strings.Split(strings.TrimLeft(r.URL.Path, "/"), "/") + if r.Method == http.MethodGet && retries >= 3 { + switch len(path) { + case 2: + if path[0] == "b" { + if buckets[path[1]] { + w.Write([]byte(fetchBucket[path[1]])) + return + } + } else if buckets[path[0]] && availableObjects[path[0]][path[1]] { + w.Write([]byte(objects[path[0]][path[1]])) + return + } + case 3: + if path[0] == "b" && path[2] == "o" { + if buckets[path[1]] { + w.Write([]byte(objectList[path[1]])) + return + } + } else if buckets[path[0]] { + objName := strings.Join(path[1:], "/") + if availableObjects[path[0]][objName] { + w.Write([]byte(objects[path[0]][objName])) + return + } + } + default: + w.WriteHeader(http.StatusNotFound) + return + } + } + w.WriteHeader(http.StatusInternalServerError) + }) +} diff --git a/x-pack/filebeat/input/gcs/types.go b/x-pack/filebeat/input/gcs/types.go index a34c7f7160f..82fb737cdb6 100644 --- a/x-pack/filebeat/input/gcs/types.go +++ b/x-pack/filebeat/input/gcs/types.go @@ -22,6 +22,7 @@ type Source struct { FileSelectors []fileSelectorConfig ReaderConfig readerConfig ExpandEventListFromField string + Retry retryConfig } func (s *Source) Name() string {