From 5e054215654d1c8d7c92fde6c5f9ea46fdfddc7f Mon Sep 17 00:00:00 2001 From: ShourieG Date: Wed, 4 Dec 2024 13:01:01 +0530 Subject: [PATCH 1/2] [filebeat][gcs] - Added support for retry config (#41862) * Added support for retry config along with necessary documentation and tests (cherry picked from commit 3f51793e33da83d62bb9260d9957df62eaf69fb7) --- CHANGELOG.next.asciidoc | 4 + .../filebeat/docs/inputs/input-gcs.asciidoc | 40 ++++++++++ x-pack/filebeat/input/gcs/config.go | 21 ++++++ x-pack/filebeat/input/gcs/input.go | 7 +- x-pack/filebeat/input/gcs/input_stateless.go | 8 +- x-pack/filebeat/input/gcs/input_test.go | 75 +++++++++++++++++++ x-pack/filebeat/input/gcs/mock/mock.go | 40 ++++++++++ x-pack/filebeat/input/gcs/types.go | 1 + 8 files changed, 193 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index e77db13d36cc..ea8c41c9264d 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -320,6 +320,10 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Add support for Journald in the System module. {pull}41555[41555] - Add ability to remove request trace logs from http_endpoint input. {pull}40005[40005] - Add ability to remove request trace logs from entityanalytics input. {pull}40004[40004] +- Refactor & cleanup with updates to default values and documentation. {pull}41834[41834] +- Update CEL mito extensions to v1.16.0. {pull}41727[41727] +- Add evaluation state dump debugging option to CEL input. {pull}41335[41335] +- Added support for retry configuration in GCS input. {issue}11580[11580] {pull}41862[41862] - Improve S3 polling mode states registry when using list prefix option. {pull}41869[41869] - AWS S3 input registry cleanup for untracked s3 objects. {pull}41694[41694] - The environment variable `BEATS_AZURE_EVENTHUB_INPUT_TRACING_ENABLED: true` enables internal logs tracer for the azure-eventhub input. {issue}41931[41931] {pull}41932[41932] diff --git a/x-pack/filebeat/docs/inputs/input-gcs.asciidoc b/x-pack/filebeat/docs/inputs/input-gcs.asciidoc index 2a762ddec18a..d85a7393440e 100644 --- a/x-pack/filebeat/docs/inputs/input-gcs.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-gcs.asciidoc @@ -164,6 +164,7 @@ Now let's explore the configuration attributes a bit more elaborately. 11. <> 12. <> 13. <> + 14. <> [id="attrib-project-id"] @@ -397,6 +398,45 @@ filebeat.inputs: The GCS APIs don't provide a direct way to filter files based on the timestamp, so the input will download all the files and then filter them based on the timestamp. This can cause a bottleneck in processing if the number of files are very high. It is recommended to use this attribute only when the number of files are limited or ample resources are available. This option scales vertically and not horizontally. +[id="attrib-retry-gcs"] +[float] +==== `retry` + +This attribute can be used to configure a list of sub attributes that directly control how the input should behave when a download for a file/object fails or gets interrupted. + + - `max_attempts`: This attribute defines the maximum number of retry attempts(including the initial api call) that should be attempted for a retryable error. The default value for this is `3`. + - `initial_backoff_duration`: This attribute defines the initial backoff time. The default value for this is `1s`. + - `max_backoff_duration`: This attribute defines the maximum backoff time. The default value for this is `30s`. + - `backoff_multiplier`: This attribute defines the backoff multiplication factor. The default value for this is `2`. + +NOTE: The `initial_backoff_duration` and `max_backoff_duration` attributes must have time units. Valid time units are `ns`, `us` (or `µs`), `ms`, `s`, `m`, `h`. + +By configuring these attributes, the user is given the flexibility to control how the input should behave when a download fails or gets interrupted. This attribute can only be +specified at the root level of the configuration and not at the bucket level. It applies uniformly to all the buckets. + +An example configuration is given below :- + +[source, yml] +---- +filebeat.inputs: +- type: gcs + project_id: my_project_id + auth.credentials_file.path: {{file_path}}/{{creds_file_name}}.json + retry: + max_attempts: 3 + initial_backoff_duration: 2s + max_backoff_duration: 60s + backoff_multiplier: 2 + buckets: + - name: obs-bucket + max_workers: 3 + poll: true + poll_interval: 11m + bucket_timeout: 10m +---- + +When configuring the `retry` attribute, the user should consider the `bucket_timeout` value. The `retry` attribute should be configured in such a way that the retries are completed within the `bucket_timeout` window. If the `retry` attribute is configured in such a way that the retries are not completed successfully within the `bucket_timeout` window, the input will suffer a `context timeout` for that specific object/file which it was retrying. This can cause gaps in ingested data to pile up over time. + [id="bucket-overrides"] *The sample configs below will explain the bucket level overriding of attributes a bit further :-* diff --git a/x-pack/filebeat/input/gcs/config.go b/x-pack/filebeat/input/gcs/config.go index 64f64c69bc5f..9a9f6dceadd4 100644 --- a/x-pack/filebeat/input/gcs/config.go +++ b/x-pack/filebeat/input/gcs/config.go @@ -50,6 +50,8 @@ type config struct { ExpandEventListFromField string `config:"expand_event_list_from_field"` // This field is only used for system test purposes, to override the HTTP endpoint. AlternativeHost string `config:"alternative_host"` + // Retry - Defines the retry configuration for the input. + Retry retryConfig `config:"retry"` } // bucket contains the config for each specific object storage bucket in the root account @@ -92,6 +94,19 @@ type jsonCredentialsConfig struct { AccountKey string `config:"account_key"` } +type retryConfig struct { + // MaxAttempts configures the maximum number of times an API call can be made in the case of retryable errors. + // For example, if you set MaxAttempts(5), the operation will be attempted up to 5 times total (initial call plus 4 retries). + // If you set MaxAttempts(1), the operation will be attempted only once and there will be no retries. This setting defaults to 3. + MaxAttempts int `config:"max_attempts" validate:"min=1"` + // InitialBackOffDuration is the initial value of the retry period, defaults to 1 second. + InitialBackOffDuration time.Duration `config:"initial_backoff_duration" validate:"min=1"` + // MaxBackOffDuration is the maximum value of the retry period, defaults to 30 seconds. + MaxBackOffDuration time.Duration `config:"max_backoff_duration" validate:"min=2"` + // BackOffMultiplier is the factor by which the retry period increases. It should be greater than 1 and defaults to 2. + BackOffMultiplier float64 `config:"backoff_multiplier" validate:"min=1.1"` +} + func (c authConfig) Validate() error { // credentials_file if c.CredentialsFile != nil { @@ -126,5 +141,11 @@ func defaultConfig() config { PollInterval: 5 * time.Minute, BucketTimeOut: 120 * time.Second, ParseJSON: false, + Retry: retryConfig{ + MaxAttempts: 3, + InitialBackOffDuration: time.Second, + MaxBackOffDuration: 30 * time.Second, + BackOffMultiplier: 2, + }, } } diff --git a/x-pack/filebeat/input/gcs/input.go b/x-pack/filebeat/input/gcs/input.go index 33e46d034d76..1d6bff8b857a 100644 --- a/x-pack/filebeat/input/gcs/input.go +++ b/x-pack/filebeat/input/gcs/input.go @@ -72,6 +72,7 @@ func configure(cfg *conf.C) ([]cursor.Source, cursor.Input, error) { ExpandEventListFromField: bucket.ExpandEventListFromField, FileSelectors: bucket.FileSelectors, ReaderConfig: bucket.ReaderConfig, + Retry: config.Retry, }) } @@ -158,9 +159,13 @@ func (input *gcsInput) Run(inputCtx v2.Context, src cursor.Source, } bucket := client.Bucket(currentSource.BucketName).Retryer( + // Use WithMaxAttempts to change the maximum number of attempts. + storage.WithMaxAttempts(currentSource.Retry.MaxAttempts), // Use WithBackoff to change the timing of the exponential backoff. storage.WithBackoff(gax.Backoff{ - Initial: 2 * time.Second, + Initial: currentSource.Retry.InitialBackOffDuration, + Max: currentSource.Retry.MaxBackOffDuration, + Multiplier: currentSource.Retry.BackOffMultiplier, }), // RetryAlways will retry the operation even if it is non-idempotent. // Since we are only reading, the operation is always idempotent diff --git a/x-pack/filebeat/input/gcs/input_stateless.go b/x-pack/filebeat/input/gcs/input_stateless.go index c0038bf31dce..b6901f0df25f 100644 --- a/x-pack/filebeat/input/gcs/input_stateless.go +++ b/x-pack/filebeat/input/gcs/input_stateless.go @@ -6,7 +6,6 @@ package gcs import ( "context" - "time" "cloud.google.com/go/storage" gax "github.com/googleapis/gax-go/v2" @@ -64,6 +63,7 @@ func (in *statelessInput) Run(inputCtx v2.Context, publisher stateless.Publisher ExpandEventListFromField: bucket.ExpandEventListFromField, FileSelectors: bucket.FileSelectors, ReaderConfig: bucket.ReaderConfig, + Retry: in.config.Retry, } st := newState() @@ -80,9 +80,13 @@ func (in *statelessInput) Run(inputCtx v2.Context, publisher stateless.Publisher }() bkt := client.Bucket(currentSource.BucketName).Retryer( + // Use WithMaxAttempts to change the maximum number of attempts. + storage.WithMaxAttempts(currentSource.Retry.MaxAttempts), // Use WithBackoff to change the timing of the exponential backoff. storage.WithBackoff(gax.Backoff{ - Initial: 2 * time.Second, + Initial: currentSource.Retry.InitialBackOffDuration, + Max: currentSource.Retry.MaxBackOffDuration, + Multiplier: currentSource.Retry.BackOffMultiplier, }), // RetryAlways will retry the operation even if it is non-idempotent. // Since we are only reading, the operation is always idempotent diff --git a/x-pack/filebeat/input/gcs/input_test.go b/x-pack/filebeat/input/gcs/input_test.go index 5595622c93e5..345c7c579096 100644 --- a/x-pack/filebeat/input/gcs/input_test.go +++ b/x-pack/filebeat/input/gcs/input_test.go @@ -520,6 +520,81 @@ func Test_StorageClient(t *testing.T) { mock.Gcs_test_new_object_docs_ata_json: true, }, }, + { + name: "RetryWithDefaultValues", + baseConfig: map[string]interface{}{ + "project_id": "elastic-sa", + "auth.credentials_file.path": "testdata/gcs_creds.json", + "max_workers": 1, + "poll": true, + "poll_interval": "1m", + "bucket_timeout": "1m", + "buckets": []map[string]interface{}{ + { + "name": "gcs-test-new", + }, + }, + }, + mockHandler: mock.GCSRetryServer, + expected: map[string]bool{ + mock.Gcs_test_new_object_ata_json: true, + mock.Gcs_test_new_object_data3_json: true, + mock.Gcs_test_new_object_docs_ata_json: true, + }, + }, + { + name: "RetryWithCustomValues", + baseConfig: map[string]interface{}{ + "project_id": "elastic-sa", + "auth.credentials_file.path": "testdata/gcs_creds.json", + "max_workers": 1, + "poll": true, + "poll_interval": "10s", + "bucket_timeout": "10s", + "retry": map[string]interface{}{ + "max_attempts": 5, + "initial_backoff_duration": "1s", + "max_backoff_duration": "3s", + "backoff_multiplier": 1.4, + }, + "buckets": []map[string]interface{}{ + { + "name": "gcs-test-new", + }, + }, + }, + mockHandler: mock.GCSRetryServer, + expected: map[string]bool{ + mock.Gcs_test_new_object_ata_json: true, + mock.Gcs_test_new_object_data3_json: true, + mock.Gcs_test_new_object_docs_ata_json: true, + }, + }, + { + name: "RetryMinimumValueCheck", + baseConfig: map[string]interface{}{ + "project_id": "elastic-sa", + "auth.credentials_file.path": "testdata/gcs_creds.json", + "max_workers": 1, + "poll": true, + "poll_interval": "10s", + "bucket_timeout": "10s", + "retry": map[string]interface{}{ + "max_attempts": 5, + "initial_backoff_duration": "1s", + "max_backoff_duration": "3s", + "backoff_multiplier": 1, + }, + "buckets": []map[string]interface{}{ + { + "name": "gcs-test-new", + }, + }, + }, + mockHandler: mock.GCSRetryServer, + expected: map[string]bool{}, + isError: errors.New("requires value >= 1.1 accessing 'retry.backoff_multiplier'"), + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/x-pack/filebeat/input/gcs/mock/mock.go b/x-pack/filebeat/input/gcs/mock/mock.go index 50d2a431e018..1def436511a2 100644 --- a/x-pack/filebeat/input/gcs/mock/mock.go +++ b/x-pack/filebeat/input/gcs/mock/mock.go @@ -98,3 +98,43 @@ func GCSFileServer() http.Handler { w.Write([]byte("resource not found")) }) } + +//nolint:errcheck // We can ignore errors here, as this is just for testing +func GCSRetryServer() http.Handler { + retries := 0 + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + retries++ + path := strings.Split(strings.TrimLeft(r.URL.Path, "/"), "/") + if r.Method == http.MethodGet && retries >= 3 { + switch len(path) { + case 2: + if path[0] == "b" { + if buckets[path[1]] { + w.Write([]byte(fetchBucket[path[1]])) + return + } + } else if buckets[path[0]] && availableObjects[path[0]][path[1]] { + w.Write([]byte(objects[path[0]][path[1]])) + return + } + case 3: + if path[0] == "b" && path[2] == "o" { + if buckets[path[1]] { + w.Write([]byte(objectList[path[1]])) + return + } + } else if buckets[path[0]] { + objName := strings.Join(path[1:], "/") + if availableObjects[path[0]][objName] { + w.Write([]byte(objects[path[0]][objName])) + return + } + } + default: + w.WriteHeader(http.StatusNotFound) + return + } + } + w.WriteHeader(http.StatusInternalServerError) + }) +} diff --git a/x-pack/filebeat/input/gcs/types.go b/x-pack/filebeat/input/gcs/types.go index a34c7f7160ff..82fb737cdb65 100644 --- a/x-pack/filebeat/input/gcs/types.go +++ b/x-pack/filebeat/input/gcs/types.go @@ -22,6 +22,7 @@ type Source struct { FileSelectors []fileSelectorConfig ReaderConfig readerConfig ExpandEventListFromField string + Retry retryConfig } func (s *Source) Name() string { From a8f60067b3f761001bfa55ab2aeca3aa77e83248 Mon Sep 17 00:00:00 2001 From: ShourieG Date: Wed, 11 Dec 2024 16:34:56 +0530 Subject: [PATCH 2/2] Update CHANGELOG.next.asciidoc --- CHANGELOG.next.asciidoc | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index ea8c41c9264d..8c2da31dc991 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -320,14 +320,11 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Add support for Journald in the System module. {pull}41555[41555] - Add ability to remove request trace logs from http_endpoint input. {pull}40005[40005] - Add ability to remove request trace logs from entityanalytics input. {pull}40004[40004] -- Refactor & cleanup with updates to default values and documentation. {pull}41834[41834] -- Update CEL mito extensions to v1.16.0. {pull}41727[41727] -- Add evaluation state dump debugging option to CEL input. {pull}41335[41335] -- Added support for retry configuration in GCS input. {issue}11580[11580] {pull}41862[41862] - Improve S3 polling mode states registry when using list prefix option. {pull}41869[41869] - AWS S3 input registry cleanup for untracked s3 objects. {pull}41694[41694] - The environment variable `BEATS_AZURE_EVENTHUB_INPUT_TRACING_ENABLED: true` enables internal logs tracer for the azure-eventhub input. {issue}41931[41931] {pull}41932[41932] - Refactor & cleanup with updates to default values and documentation. {pull}41834[41834] +- Added support for retry configuration in GCS input. {issue}11580[11580] {pull}41862[41862] *Auditbeat*