From 631669fc3f72122a8aa1a44f84fc1fc97a15049b Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Wed, 20 Nov 2024 15:26:48 +0530 Subject: [PATCH 1/7] added retry config, refactored the defauyltConfig() and updated docs --- .../filebeat/docs/inputs/input-gcs.asciidoc | 56 ++++++++++++++---- x-pack/filebeat/input/gcs/client.go | 4 +- x-pack/filebeat/input/gcs/config.go | 23 ++++++-- x-pack/filebeat/input/gcs/input.go | 57 ++++++++++--------- x-pack/filebeat/input/gcs/input_stateless.go | 9 ++- x-pack/filebeat/input/gcs/input_test.go | 56 +++++++++++++++++- x-pack/filebeat/input/gcs/mock/mock.go | 40 +++++++++++++ x-pack/filebeat/input/gcs/types.go | 1 + 8 files changed, 194 insertions(+), 52 deletions(-) diff --git a/x-pack/filebeat/docs/inputs/input-gcs.asciidoc b/x-pack/filebeat/docs/inputs/input-gcs.asciidoc index 23ac0e021c6..eb49c25344b 100644 --- a/x-pack/filebeat/docs/inputs/input-gcs.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-gcs.asciidoc @@ -10,9 +10,7 @@ ++++ Use the `google cloud storage input` to read content from files stored in buckets which reside on your Google Cloud. -The input can be configured to work with and without polling, though currently, if polling is disabled it will only -perform a one time passthrough, list the file contents and end the process. Polling is generally recommented for most cases -even though it can get expensive with dealing with a very large number of files. +The input can be configured to work with and without polling, though if polling is disabled, it will only perform a one time passthrough, list the file contents and end the process. *To mitigate errors and ensure a stable processing environment, this input employs the following features :* @@ -66,12 +64,11 @@ many buckets as we deem fit. We are also able to configure the attributes `max_w then be applied to all buckets which do not specify any of these attributes explicitly. NOTE: If the attributes `max_workers`, `poll`, `poll_interval` and `bucket_timeout` are specified at the root level, these can still be overridden at the bucket level with -different values, thus offering extensive flexibility and customization. Examples <> show this behaviour. +different values, thus offering extensive flexibility and customization. Examples <> show this behavior. On receiving this config the google cloud storage input will connect to the service and retrieve a `Storage Client` using the given `bucket_name` and `auth.credentials_file`, then it will spawn two main go-routines, one for each bucket. After this each of these routines (threads) will initialize a scheduler -which will in turn use the `max_workers` value to initialize an in-memory worker pool (thread pool) with `3` `workers` available. Basically that equates to two instances of a worker pool, -one per bucket, each having 3 workers. These `workers` will be responsible for performing `jobs` that process a file (in this case read and output the contents of a file). +which will in turn use the `max_workers` value to initialize an in-memory worker pool (thread pool) with `3` `workers` available. Basically that equates to two instances of a worker pool, one per bucket, each having 3 workers. These `workers` will be responsible for performing `jobs` that process a file (in this case read and output the contents of a file). NOTE: The scheduler is responsible for scheduling jobs, and uses the `maximum available workers` in the pool, at each iteration, to decide the number of files to retrieve and process. This keeps work distribution efficient. The scheduler uses `poll_interval` attribute value to decide how long to wait after each iteration. The `bucket_timeout` value is used to timeout calls to the bucket list api if it exceeds the given value. Each iteration consists of processing a certain number of files, decided by the `maximum available workers` value. @@ -167,6 +164,7 @@ Now let's explore the configuration attributes a bit more elaborately. 11. <> 12. <> 13. <> + 14. <> [id="attrib-project-id"] @@ -213,7 +211,7 @@ This is a specific subfield of a bucket. It specifies the bucket name. This attribute defines the maximum amount of time after which a bucket operation will give and stop if no response is recieved (example: reading a file / listing a file). It can be defined in the following formats : `{{x}}s`, `{{x}}m`, `{{x}}h`, here `s = seconds`, `m = minutes` and `h = hours`. The value `{{x}}` can be anything we wish. -If no value is specified for this, by default its initialized to `50 seconds`. This attribute can be specified both at the root level of the configuration as well at the bucket level. The bucket level values will always take priority and override the root level values if both are specified. The value of `bucket_timeout` that should be used depends on the size of the files and the network speed. If the timeout is too low, the input will not be able to read the file completely and `context_deadline_exceeded` errors will be seen in the logs. If the timeout is too high, the input will wait for a long time for the file to be read, which can cause the input to be slow. The ratio between the `bucket_timeout` and `poll_interval` should be considered while setting both the values. A low `poll_interval` and a very high `bucket_timeout` can cause resource utilization issues as schedule ops will be spawned every poll iteration. If previous poll ops are still running, this could result in concurrently running ops and so could cause a bottleneck over time. +If no value is specified for this, by default its initialized to `120 seconds`. This attribute can be specified both at the root level of the configuration as well at the bucket level. The bucket level values will always take priority and override the root level values if both are specified. The value of `bucket_timeout` that should be used depends on the size of the files and the network speed. If the timeout is too low, the input will not be able to read the file completely and `context_deadline_exceeded` errors will be seen in the logs. If the timeout is too high, the input will wait for a long time for the file to be read, which can cause the input to be slow. The ratio between the `bucket_timeout` and `poll_interval` should be considered while setting both the values. A low `poll_interval` and a very high `bucket_timeout` can cause resource utilization issues as schedule ops will be spawned every poll iteration. If previous poll ops are still running, this could result in concurrently running ops and so could cause a bottleneck over time. [id="attrib-max_workers-gcs"] [float] @@ -228,9 +226,8 @@ NOTE: The value of `max_workers` is tied to the `batch_size` currently to ensure [float] ==== `poll` -This attribute informs the scheduler whether to keep polling for new files or not. Default value of this is `false`, so it will not keep polling if not explicitly -specified. This attribute can be specified both at the root level of the configuration as well at the bucket level. The bucket level values will always -take priority and override the root level values if both are specified. +This attribute informs the scheduler whether to keep polling for new files or not. Default value of this is set to `true`. This attribute can be specified both at the +root level of the configuration as well at the bucket level. The bucket level values will always take priority and override the root level values if both are specified. [id="attrib-poll_interval-gcs"] [float] @@ -238,7 +235,7 @@ take priority and override the root level values if both are specified. This attribute defines the maximum amount of time after which the internal scheduler will make the polling call for the next set of objects/files. It can be defined in the following formats : `{{x}}s`, `{{x}}m`, `{{x}}h`, here `s = seconds`, `m = minutes` and `h = hours`. The value `{{x}}` can be anything we wish. -Example : `10s` would mean we would like the polling to occur every 10 seconds. If no value is specified for this, by default its initialized to `300 seconds`. +Example : `10s` would mean we would like the polling to occur every 10 seconds. If no value is specified for this, by default its initialized to `5 minutes`. This attribute can be specified both at the root level of the configuration as well at the bucket level. The bucket level values will always take priority and override the root level values if both are specified. The `poll_interval` should be set to a value that is equal to the `bucket_timeout` value. This would ensure that another schedule operation is not started before the current buckets have all been processed. If the `poll_interval` is set to a value that is less than the `bucket_timeout`, then the input will start another schedule operation before the current one has finished, which can cause a bottleneck over time. Having a lower `poll_interval` can make the input faster at the cost of more resource utilization. @@ -401,6 +398,43 @@ filebeat.inputs: The GCS APIs don't provide a direct way to filter files based on the timestamp, so the input will download all the files and then filter them based on the timestamp. This can cause a bottleneck in processing if the number of files are very high. It is recommended to use this attribute only when the number of files are limited or ample resources are available. This option scales vertically and not horizontally. +[id="attrib-retry-gcs"] +[float] +==== `retry` + +This attribute can be used to configure a list of sub attributes that directly control how the input should behave when a download for a file/object fails or gets interrupted. The list of sub attributes are as follows :- + + 1. `max_attempts`: This attribute defines the maximum number of retries that should be attempted for a failed download. The default value for this is `3`. + 2. `initial_backoff_duration`: This attribute defines the initial backoff time in seconds. The default value for this is `1s`. + 3. `max_backoff_duration`: This attribute defines the maximum backoff time in seconds. The default value for this is `30s`. + 4. `backoff_multiplier`: This attribute defines the backoff multiplication factor. The default value for this is `2`. + +By configuring these attributes, the user is given the flexibility to control how the input should behave when a download fails or gets interrupted. This attribute can only be +specified at the root level of the configuration and not at the bucket level. It applies uniformly to all the buckets. + +An example configuration is given below :- + +[source, yml] +---- +filebeat.inputs: +- type: gcs + project_id: my_project_id + auth.credentials_file.path: {{file_path}}/{{creds_file_name}}.json + retry: + max_attempts: 3 + initial_backoff_duration: 2s + max_backoff_duration: 60s + backoff_multiplier: 2 + buckets: + - name: obs-bucket + max_workers: 3 + poll: true + poll_interval: 11m + bucket_timeout: 10m +---- + +While configuring the `retry` attribute, the user should take into consideration the `bucket_timeout` value. The `retry` attribute should be configured in such a way that the retries are completed within the `bucket_timeout` window. If the `retry` attribute is configured in such a way that the retries are not completed successfully within the `bucket_timeout` window, then the input will suffer a `context timeout` for that specific object/file which it was retrying. This can cause gaps in ingested data to pile up over time. + [id="bucket-overrides"] *The sample configs below will explain the bucket level overriding of attributes a bit further :-* diff --git a/x-pack/filebeat/input/gcs/client.go b/x-pack/filebeat/input/gcs/client.go index 7fd45d2d0a9..1846e08c5ab 100644 --- a/x-pack/filebeat/input/gcs/client.go +++ b/x-pack/filebeat/input/gcs/client.go @@ -12,11 +12,9 @@ import ( "cloud.google.com/go/storage" "golang.org/x/oauth2/google" "google.golang.org/api/option" - - "github.com/elastic/elastic-agent-libs/logp" ) -func fetchStorageClient(ctx context.Context, cfg config, log *logp.Logger) (*storage.Client, error) { +func fetchStorageClient(ctx context.Context, cfg config) (*storage.Client, error) { if cfg.AlternativeHost != "" { var h *url.URL h, err := url.Parse(cfg.AlternativeHost) diff --git a/x-pack/filebeat/input/gcs/config.go b/x-pack/filebeat/input/gcs/config.go index 6a7b93d5e47..3f0837960c8 100644 --- a/x-pack/filebeat/input/gcs/config.go +++ b/x-pack/filebeat/input/gcs/config.go @@ -28,16 +28,16 @@ type config struct { // Auth - Defines the authentication mechanism to be used for accessing the gcs bucket. Auth authConfig `config:"auth"` // MaxWorkers - Defines the maximum number of go routines that will be spawned. - MaxWorkers *int `config:"max_workers,omitempty" validate:"max=5000"` + MaxWorkers int `config:"max_workers,omitempty" validate:"max=5000"` // Poll - Defines if polling should be performed on the input bucket source. - Poll *bool `config:"poll,omitempty"` + Poll bool `config:"poll,omitempty"` // PollInterval - Defines the maximum amount of time to wait before polling for the next batch of objects from the bucket. - PollInterval *time.Duration `config:"poll_interval,omitempty"` + PollInterval time.Duration `config:"poll_interval,omitempty"` // ParseJSON - Informs the publisher whether to parse & objectify json data or not. By default this is set to // false, since it can get expensive dealing with highly nested json data. - ParseJSON *bool `config:"parse_json,omitempty"` + ParseJSON bool `config:"parse_json,omitempty"` // BucketTimeOut - Defines the maximum time that the sdk will wait for a bucket api response before timing out. - BucketTimeOut *time.Duration `config:"bucket_timeout,omitempty"` + BucketTimeOut time.Duration `config:"bucket_timeout,omitempty"` // Buckets - Defines a list of buckets that will be polled for objects. Buckets []bucket `config:"buckets" validate:"required"` // FileSelectors - Defines a list of regex patterns that can be used to filter out objects from the bucket. @@ -50,6 +50,8 @@ type config struct { ExpandEventListFromField string `config:"expand_event_list_from_field"` // This field is only used for system test purposes, to override the HTTP endpoint. AlternativeHost string `config:"alternative_host,omitempty"` + // Retry - Defines the retry configuration for the input. + Retry retryConfig `config:"retry"` } // bucket contains the config for each specific object storage bucket in the root account @@ -90,6 +92,17 @@ type jsonCredentialsConfig struct { AccountKey string `config:"account_key"` } +type retryConfig struct { + // MaxAttempts is the maximum number of retry attempts, defaults to 3. + MaxAttempts int `config:"max_attempts" validate:"min=0"` + // InitialBackOffDuration is the initial value of the retry period, defaults to 1 second. + InitialBackOffDuration time.Duration `config:"initial_backoff_duration" validate:"min=1"` + // MaxBackOffDuration is the maximum value of the retry period, defaults to 30 seconds. + MaxBackOffDuration time.Duration `config:"max_backoff_duration" validate:"min=2"` + // BackOffMultiplier is the factor by which the retry period increases. It should be greater than 1 and defaults to 2. + BackOffMultiplier float64 `config:"backoff_multiplier" validate:"min=2"` +} + func (c authConfig) Validate() error { // credentials_file if c.CredentialsFile != nil { diff --git a/x-pack/filebeat/input/gcs/input.go b/x-pack/filebeat/input/gcs/input.go index cc0e9ad74bb..0b5532cad30 100644 --- a/x-pack/filebeat/input/gcs/input.go +++ b/x-pack/filebeat/input/gcs/input.go @@ -72,6 +72,7 @@ func configure(cfg *conf.C) ([]cursor.Source, cursor.Input, error) { ExpandEventListFromField: bucket.ExpandEventListFromField, FileSelectors: bucket.FileSelectors, ReaderConfig: bucket.ReaderConfig, + Retry: config.Retry, }) } @@ -83,39 +84,19 @@ func configure(cfg *conf.C) ([]cursor.Source, cursor.Input, error) { // are absent, assigns default values func tryOverrideOrDefault(cfg config, b bucket) bucket { if b.MaxWorkers == nil { - maxWorkers := 1 - if cfg.MaxWorkers != nil { - maxWorkers = *cfg.MaxWorkers - } - b.MaxWorkers = &maxWorkers + b.MaxWorkers = &cfg.MaxWorkers } if b.Poll == nil { - var poll bool - if cfg.Poll != nil { - poll = *cfg.Poll - } - b.Poll = &poll + b.Poll = &cfg.Poll } if b.PollInterval == nil { - interval := time.Second * 300 - if cfg.PollInterval != nil { - interval = *cfg.PollInterval - } - b.PollInterval = &interval + b.PollInterval = &cfg.PollInterval } if b.ParseJSON == nil { - parse := false - if cfg.ParseJSON != nil { - parse = *cfg.ParseJSON - } - b.ParseJSON = &parse + b.ParseJSON = &cfg.ParseJSON } if b.BucketTimeOut == nil { - timeOut := time.Second * 50 - if cfg.BucketTimeOut != nil { - timeOut = *cfg.BucketTimeOut - } - b.BucketTimeOut = &timeOut + b.BucketTimeOut = &cfg.BucketTimeOut } if b.TimeStampEpoch == nil { b.TimeStampEpoch = cfg.TimeStampEpoch @@ -131,6 +112,23 @@ func tryOverrideOrDefault(cfg config, b bucket) bucket { return b } +// defaultConfig returns the default configuration for the input +func defaultConfig() config { + return config{ + MaxWorkers: 1, + Poll: true, + PollInterval: 5 * time.Minute, + BucketTimeOut: 120 * time.Second, + ParseJSON: false, + Retry: retryConfig{ + MaxAttempts: 3, + InitialBackOffDuration: 1 * time.Second, + MaxBackOffDuration: 30 * time.Second, + BackOffMultiplier: 2, + }, + } +} + // isValidUnixTimestamp checks if the timestamp is a valid Unix timestamp func isValidUnixTimestamp(timestamp int64) bool { // checks if the timestamp is within the valid range @@ -173,15 +171,20 @@ func (input *gcsInput) Run(inputCtx v2.Context, src cursor.Source, cancel() }() - client, err := fetchStorageClient(ctx, input.config, log) + client, err := fetchStorageClient(ctx, input.config) if err != nil { metrics.errorsTotal.Inc() return err } + bucket := client.Bucket(currentSource.BucketName).Retryer( + // Use WithMaxAttempts to change the maximum number of attempts. + storage.WithMaxAttempts(currentSource.Retry.MaxAttempts), // Use WithBackoff to change the timing of the exponential backoff. storage.WithBackoff(gax.Backoff{ - Initial: 2 * time.Second, + Initial: currentSource.Retry.InitialBackOffDuration, + Max: currentSource.Retry.MaxBackOffDuration, + Multiplier: currentSource.Retry.BackOffMultiplier, }), // RetryAlways will retry the operation even if it is non-idempotent. // Since we are only reading, the operation is always idempotent diff --git a/x-pack/filebeat/input/gcs/input_stateless.go b/x-pack/filebeat/input/gcs/input_stateless.go index f56f7f35bc5..b6901f0df25 100644 --- a/x-pack/filebeat/input/gcs/input_stateless.go +++ b/x-pack/filebeat/input/gcs/input_stateless.go @@ -6,7 +6,6 @@ package gcs import ( "context" - "time" "cloud.google.com/go/storage" gax "github.com/googleapis/gax-go/v2" @@ -64,6 +63,7 @@ func (in *statelessInput) Run(inputCtx v2.Context, publisher stateless.Publisher ExpandEventListFromField: bucket.ExpandEventListFromField, FileSelectors: bucket.FileSelectors, ReaderConfig: bucket.ReaderConfig, + Retry: in.config.Retry, } st := newState() @@ -80,15 +80,18 @@ func (in *statelessInput) Run(inputCtx v2.Context, publisher stateless.Publisher }() bkt := client.Bucket(currentSource.BucketName).Retryer( + // Use WithMaxAttempts to change the maximum number of attempts. + storage.WithMaxAttempts(currentSource.Retry.MaxAttempts), // Use WithBackoff to change the timing of the exponential backoff. storage.WithBackoff(gax.Backoff{ - Initial: 2 * time.Second, + Initial: currentSource.Retry.InitialBackOffDuration, + Max: currentSource.Retry.MaxBackOffDuration, + Multiplier: currentSource.Retry.BackOffMultiplier, }), // RetryAlways will retry the operation even if it is non-idempotent. // Since we are only reading, the operation is always idempotent storage.WithPolicy(storage.RetryAlways), ) - scheduler := newScheduler(pub, bkt, currentSource, &in.config, st, metrics, log) // allows multiple containers to be scheduled concurrently while testing // the stateless input is triggered only while testing and till now it did not mimic diff --git a/x-pack/filebeat/input/gcs/input_test.go b/x-pack/filebeat/input/gcs/input_test.go index 8accb774f38..9f14ec80057 100644 --- a/x-pack/filebeat/input/gcs/input_test.go +++ b/x-pack/filebeat/input/gcs/input_test.go @@ -520,6 +520,56 @@ func Test_StorageClient(t *testing.T) { mock.Gcs_test_new_object_docs_ata_json: true, }, }, + { + name: "RetryWithDefaultValues", + baseConfig: map[string]interface{}{ + "project_id": "elastic-sa", + "auth.credentials_file.path": "testdata/gcs_creds.json", + "max_workers": 1, + "poll": true, + "poll_interval": "1m", + "bucket_timeout": "1m", + "buckets": []map[string]interface{}{ + { + "name": "gcs-test-new", + }, + }, + }, + mockHandler: mock.GCSRetryServer, + expected: map[string]bool{ + mock.Gcs_test_new_object_ata_json: true, + mock.Gcs_test_new_object_data3_json: true, + mock.Gcs_test_new_object_docs_ata_json: true, + }, + }, + { + name: "RetryWithCustomValues", + baseConfig: map[string]interface{}{ + "project_id": "elastic-sa", + "auth.credentials_file.path": "testdata/gcs_creds.json", + "max_workers": 1, + "poll": true, + "poll_interval": "10s", + "bucket_timeout": "10s", + "retry": map[string]interface{}{ + "max_attempts": 5, + "initial_backoff_duration": "1s", + "max_backoff_duration": "3s", + "backoff_multiplier": 2, + }, + "buckets": []map[string]interface{}{ + { + "name": "gcs-test-new", + }, + }, + }, + mockHandler: mock.GCSRetryServer, + expected: map[string]bool{ + mock.Gcs_test_new_object_ata_json: true, + mock.Gcs_test_new_object_data3_json: true, + mock.Gcs_test_new_object_docs_ata_json: true, + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -535,7 +585,7 @@ func Test_StorageClient(t *testing.T) { client, _ := storage.NewClient(context.Background(), option.WithEndpoint(serv.URL), option.WithoutAuthentication(), option.WithHTTPClient(&httpclient)) cfg := conf.MustNewConfigFrom(tt.baseConfig) - conf := config{} + conf := defaultConfig() err := cfg.Unpack(&conf) if err != nil { assert.EqualError(t, err, fmt.Sprint(tt.isError)) @@ -558,8 +608,8 @@ func Test_StorageClient(t *testing.T) { }) var timeout *time.Timer - if conf.PollInterval != nil { - timeout = time.NewTimer(1*time.Second + *conf.PollInterval) + if conf.PollInterval != 0 { + timeout = time.NewTimer(1*time.Second + conf.PollInterval) } else { timeout = time.NewTimer(5 * time.Second) } diff --git a/x-pack/filebeat/input/gcs/mock/mock.go b/x-pack/filebeat/input/gcs/mock/mock.go index 50d2a431e01..1def436511a 100644 --- a/x-pack/filebeat/input/gcs/mock/mock.go +++ b/x-pack/filebeat/input/gcs/mock/mock.go @@ -98,3 +98,43 @@ func GCSFileServer() http.Handler { w.Write([]byte("resource not found")) }) } + +//nolint:errcheck // We can ignore errors here, as this is just for testing +func GCSRetryServer() http.Handler { + retries := 0 + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + retries++ + path := strings.Split(strings.TrimLeft(r.URL.Path, "/"), "/") + if r.Method == http.MethodGet && retries >= 3 { + switch len(path) { + case 2: + if path[0] == "b" { + if buckets[path[1]] { + w.Write([]byte(fetchBucket[path[1]])) + return + } + } else if buckets[path[0]] && availableObjects[path[0]][path[1]] { + w.Write([]byte(objects[path[0]][path[1]])) + return + } + case 3: + if path[0] == "b" && path[2] == "o" { + if buckets[path[1]] { + w.Write([]byte(objectList[path[1]])) + return + } + } else if buckets[path[0]] { + objName := strings.Join(path[1:], "/") + if availableObjects[path[0]][objName] { + w.Write([]byte(objects[path[0]][objName])) + return + } + } + default: + w.WriteHeader(http.StatusNotFound) + return + } + } + w.WriteHeader(http.StatusInternalServerError) + }) +} diff --git a/x-pack/filebeat/input/gcs/types.go b/x-pack/filebeat/input/gcs/types.go index a34c7f7160f..82fb737cdb6 100644 --- a/x-pack/filebeat/input/gcs/types.go +++ b/x-pack/filebeat/input/gcs/types.go @@ -22,6 +22,7 @@ type Source struct { FileSelectors []fileSelectorConfig ReaderConfig readerConfig ExpandEventListFromField string + Retry retryConfig } func (s *Source) Name() string { From 2884058efcee880f7ad8b4567eb53d30d1a59310 Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Wed, 20 Nov 2024 15:36:32 +0530 Subject: [PATCH 2/7] updated changelog --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 9d2b177ab9f..389b9d68b7a 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -333,6 +333,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Add support for Journald in the System module. {pull}41555[41555] - Add ability to remove request trace logs from http_endpoint input. {pull}40005[40005] - Add ability to remove request trace logs from entityanalytics input. {pull}40004[40004] +- Added support for retry config in GCS input. {issue}11580[11580] {pull}41699[41699] *Auditbeat* From ecc815130f495facaa1292151bac384cb1f64eca Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Fri, 29 Nov 2024 16:08:15 +0530 Subject: [PATCH 3/7] reworked older PR to contain only refactor/cleanup changes along with changes in default values --- .../filebeat/docs/inputs/input-gcs.asciidoc | 38 -------------- x-pack/filebeat/input/gcs/config.go | 13 ----- x-pack/filebeat/input/gcs/input.go | 13 +---- x-pack/filebeat/input/gcs/input_stateless.go | 8 +-- x-pack/filebeat/input/gcs/input_test.go | 50 ------------------- x-pack/filebeat/input/gcs/mock/mock.go | 40 --------------- x-pack/filebeat/input/gcs/types.go | 1 - 7 files changed, 3 insertions(+), 160 deletions(-) diff --git a/x-pack/filebeat/docs/inputs/input-gcs.asciidoc b/x-pack/filebeat/docs/inputs/input-gcs.asciidoc index eb49c25344b..a4797801a87 100644 --- a/x-pack/filebeat/docs/inputs/input-gcs.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-gcs.asciidoc @@ -164,7 +164,6 @@ Now let's explore the configuration attributes a bit more elaborately. 11. <> 12. <> 13. <> - 14. <> [id="attrib-project-id"] @@ -398,43 +397,6 @@ filebeat.inputs: The GCS APIs don't provide a direct way to filter files based on the timestamp, so the input will download all the files and then filter them based on the timestamp. This can cause a bottleneck in processing if the number of files are very high. It is recommended to use this attribute only when the number of files are limited or ample resources are available. This option scales vertically and not horizontally. -[id="attrib-retry-gcs"] -[float] -==== `retry` - -This attribute can be used to configure a list of sub attributes that directly control how the input should behave when a download for a file/object fails or gets interrupted. The list of sub attributes are as follows :- - - 1. `max_attempts`: This attribute defines the maximum number of retries that should be attempted for a failed download. The default value for this is `3`. - 2. `initial_backoff_duration`: This attribute defines the initial backoff time in seconds. The default value for this is `1s`. - 3. `max_backoff_duration`: This attribute defines the maximum backoff time in seconds. The default value for this is `30s`. - 4. `backoff_multiplier`: This attribute defines the backoff multiplication factor. The default value for this is `2`. - -By configuring these attributes, the user is given the flexibility to control how the input should behave when a download fails or gets interrupted. This attribute can only be -specified at the root level of the configuration and not at the bucket level. It applies uniformly to all the buckets. - -An example configuration is given below :- - -[source, yml] ----- -filebeat.inputs: -- type: gcs - project_id: my_project_id - auth.credentials_file.path: {{file_path}}/{{creds_file_name}}.json - retry: - max_attempts: 3 - initial_backoff_duration: 2s - max_backoff_duration: 60s - backoff_multiplier: 2 - buckets: - - name: obs-bucket - max_workers: 3 - poll: true - poll_interval: 11m - bucket_timeout: 10m ----- - -While configuring the `retry` attribute, the user should take into consideration the `bucket_timeout` value. The `retry` attribute should be configured in such a way that the retries are completed within the `bucket_timeout` window. If the `retry` attribute is configured in such a way that the retries are not completed successfully within the `bucket_timeout` window, then the input will suffer a `context timeout` for that specific object/file which it was retrying. This can cause gaps in ingested data to pile up over time. - [id="bucket-overrides"] *The sample configs below will explain the bucket level overriding of attributes a bit further :-* diff --git a/x-pack/filebeat/input/gcs/config.go b/x-pack/filebeat/input/gcs/config.go index 3f0837960c8..5cc1430ed71 100644 --- a/x-pack/filebeat/input/gcs/config.go +++ b/x-pack/filebeat/input/gcs/config.go @@ -50,8 +50,6 @@ type config struct { ExpandEventListFromField string `config:"expand_event_list_from_field"` // This field is only used for system test purposes, to override the HTTP endpoint. AlternativeHost string `config:"alternative_host,omitempty"` - // Retry - Defines the retry configuration for the input. - Retry retryConfig `config:"retry"` } // bucket contains the config for each specific object storage bucket in the root account @@ -92,17 +90,6 @@ type jsonCredentialsConfig struct { AccountKey string `config:"account_key"` } -type retryConfig struct { - // MaxAttempts is the maximum number of retry attempts, defaults to 3. - MaxAttempts int `config:"max_attempts" validate:"min=0"` - // InitialBackOffDuration is the initial value of the retry period, defaults to 1 second. - InitialBackOffDuration time.Duration `config:"initial_backoff_duration" validate:"min=1"` - // MaxBackOffDuration is the maximum value of the retry period, defaults to 30 seconds. - MaxBackOffDuration time.Duration `config:"max_backoff_duration" validate:"min=2"` - // BackOffMultiplier is the factor by which the retry period increases. It should be greater than 1 and defaults to 2. - BackOffMultiplier float64 `config:"backoff_multiplier" validate:"min=2"` -} - func (c authConfig) Validate() error { // credentials_file if c.CredentialsFile != nil { diff --git a/x-pack/filebeat/input/gcs/input.go b/x-pack/filebeat/input/gcs/input.go index 0b5532cad30..2e4ed914c36 100644 --- a/x-pack/filebeat/input/gcs/input.go +++ b/x-pack/filebeat/input/gcs/input.go @@ -72,7 +72,6 @@ func configure(cfg *conf.C) ([]cursor.Source, cursor.Input, error) { ExpandEventListFromField: bucket.ExpandEventListFromField, FileSelectors: bucket.FileSelectors, ReaderConfig: bucket.ReaderConfig, - Retry: config.Retry, }) } @@ -120,12 +119,6 @@ func defaultConfig() config { PollInterval: 5 * time.Minute, BucketTimeOut: 120 * time.Second, ParseJSON: false, - Retry: retryConfig{ - MaxAttempts: 3, - InitialBackOffDuration: 1 * time.Second, - MaxBackOffDuration: 30 * time.Second, - BackOffMultiplier: 2, - }, } } @@ -178,13 +171,9 @@ func (input *gcsInput) Run(inputCtx v2.Context, src cursor.Source, } bucket := client.Bucket(currentSource.BucketName).Retryer( - // Use WithMaxAttempts to change the maximum number of attempts. - storage.WithMaxAttempts(currentSource.Retry.MaxAttempts), // Use WithBackoff to change the timing of the exponential backoff. storage.WithBackoff(gax.Backoff{ - Initial: currentSource.Retry.InitialBackOffDuration, - Max: currentSource.Retry.MaxBackOffDuration, - Multiplier: currentSource.Retry.BackOffMultiplier, + Initial: 2 * time.Second, }), // RetryAlways will retry the operation even if it is non-idempotent. // Since we are only reading, the operation is always idempotent diff --git a/x-pack/filebeat/input/gcs/input_stateless.go b/x-pack/filebeat/input/gcs/input_stateless.go index b6901f0df25..c0038bf31dc 100644 --- a/x-pack/filebeat/input/gcs/input_stateless.go +++ b/x-pack/filebeat/input/gcs/input_stateless.go @@ -6,6 +6,7 @@ package gcs import ( "context" + "time" "cloud.google.com/go/storage" gax "github.com/googleapis/gax-go/v2" @@ -63,7 +64,6 @@ func (in *statelessInput) Run(inputCtx v2.Context, publisher stateless.Publisher ExpandEventListFromField: bucket.ExpandEventListFromField, FileSelectors: bucket.FileSelectors, ReaderConfig: bucket.ReaderConfig, - Retry: in.config.Retry, } st := newState() @@ -80,13 +80,9 @@ func (in *statelessInput) Run(inputCtx v2.Context, publisher stateless.Publisher }() bkt := client.Bucket(currentSource.BucketName).Retryer( - // Use WithMaxAttempts to change the maximum number of attempts. - storage.WithMaxAttempts(currentSource.Retry.MaxAttempts), // Use WithBackoff to change the timing of the exponential backoff. storage.WithBackoff(gax.Backoff{ - Initial: currentSource.Retry.InitialBackOffDuration, - Max: currentSource.Retry.MaxBackOffDuration, - Multiplier: currentSource.Retry.BackOffMultiplier, + Initial: 2 * time.Second, }), // RetryAlways will retry the operation even if it is non-idempotent. // Since we are only reading, the operation is always idempotent diff --git a/x-pack/filebeat/input/gcs/input_test.go b/x-pack/filebeat/input/gcs/input_test.go index 9f14ec80057..5595622c93e 100644 --- a/x-pack/filebeat/input/gcs/input_test.go +++ b/x-pack/filebeat/input/gcs/input_test.go @@ -520,56 +520,6 @@ func Test_StorageClient(t *testing.T) { mock.Gcs_test_new_object_docs_ata_json: true, }, }, - { - name: "RetryWithDefaultValues", - baseConfig: map[string]interface{}{ - "project_id": "elastic-sa", - "auth.credentials_file.path": "testdata/gcs_creds.json", - "max_workers": 1, - "poll": true, - "poll_interval": "1m", - "bucket_timeout": "1m", - "buckets": []map[string]interface{}{ - { - "name": "gcs-test-new", - }, - }, - }, - mockHandler: mock.GCSRetryServer, - expected: map[string]bool{ - mock.Gcs_test_new_object_ata_json: true, - mock.Gcs_test_new_object_data3_json: true, - mock.Gcs_test_new_object_docs_ata_json: true, - }, - }, - { - name: "RetryWithCustomValues", - baseConfig: map[string]interface{}{ - "project_id": "elastic-sa", - "auth.credentials_file.path": "testdata/gcs_creds.json", - "max_workers": 1, - "poll": true, - "poll_interval": "10s", - "bucket_timeout": "10s", - "retry": map[string]interface{}{ - "max_attempts": 5, - "initial_backoff_duration": "1s", - "max_backoff_duration": "3s", - "backoff_multiplier": 2, - }, - "buckets": []map[string]interface{}{ - { - "name": "gcs-test-new", - }, - }, - }, - mockHandler: mock.GCSRetryServer, - expected: map[string]bool{ - mock.Gcs_test_new_object_ata_json: true, - mock.Gcs_test_new_object_data3_json: true, - mock.Gcs_test_new_object_docs_ata_json: true, - }, - }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/x-pack/filebeat/input/gcs/mock/mock.go b/x-pack/filebeat/input/gcs/mock/mock.go index 1def436511a..50d2a431e01 100644 --- a/x-pack/filebeat/input/gcs/mock/mock.go +++ b/x-pack/filebeat/input/gcs/mock/mock.go @@ -98,43 +98,3 @@ func GCSFileServer() http.Handler { w.Write([]byte("resource not found")) }) } - -//nolint:errcheck // We can ignore errors here, as this is just for testing -func GCSRetryServer() http.Handler { - retries := 0 - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - retries++ - path := strings.Split(strings.TrimLeft(r.URL.Path, "/"), "/") - if r.Method == http.MethodGet && retries >= 3 { - switch len(path) { - case 2: - if path[0] == "b" { - if buckets[path[1]] { - w.Write([]byte(fetchBucket[path[1]])) - return - } - } else if buckets[path[0]] && availableObjects[path[0]][path[1]] { - w.Write([]byte(objects[path[0]][path[1]])) - return - } - case 3: - if path[0] == "b" && path[2] == "o" { - if buckets[path[1]] { - w.Write([]byte(objectList[path[1]])) - return - } - } else if buckets[path[0]] { - objName := strings.Join(path[1:], "/") - if availableObjects[path[0]][objName] { - w.Write([]byte(objects[path[0]][objName])) - return - } - } - default: - w.WriteHeader(http.StatusNotFound) - return - } - } - w.WriteHeader(http.StatusInternalServerError) - }) -} diff --git a/x-pack/filebeat/input/gcs/types.go b/x-pack/filebeat/input/gcs/types.go index 82fb737cdb6..a34c7f7160f 100644 --- a/x-pack/filebeat/input/gcs/types.go +++ b/x-pack/filebeat/input/gcs/types.go @@ -22,7 +22,6 @@ type Source struct { FileSelectors []fileSelectorConfig ReaderConfig readerConfig ExpandEventListFromField string - Retry retryConfig } func (s *Source) Name() string { From b27d3becb6b0495b4d550fb5c4d7773e0e22cb93 Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Fri, 29 Nov 2024 16:20:32 +0530 Subject: [PATCH 4/7] updated changelog --- CHANGELOG.next.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index a8e1f231e71..fd4b8f82400 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -346,7 +346,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Add support for Journald in the System module. {pull}41555[41555] - Add ability to remove request trace logs from http_endpoint input. {pull}40005[40005] - Add ability to remove request trace logs from entityanalytics input. {pull}40004[40004] -- Added support for retry config in GCS input. {issue}11580[11580] {pull}41699[41699] +- Refactor & cleanup with updates to default values and documentation {pull}41834[41834] - Update CEL mito extensions to v1.16.0. {pull}41727[41727] *Auditbeat* From eb4c0d6402a9d2e9c8541dce819998cb0ac85d6a Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Fri, 29 Nov 2024 16:21:05 +0530 Subject: [PATCH 5/7] updated changelog --- CHANGELOG.next.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index fd4b8f82400..60c1ff3ef9e 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -346,7 +346,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Add support for Journald in the System module. {pull}41555[41555] - Add ability to remove request trace logs from http_endpoint input. {pull}40005[40005] - Add ability to remove request trace logs from entityanalytics input. {pull}40004[40004] -- Refactor & cleanup with updates to default values and documentation {pull}41834[41834] +- Refactor & cleanup with updates to default values and documentation. {pull}41834[41834] - Update CEL mito extensions to v1.16.0. {pull}41727[41727] *Auditbeat* From 28f362eb0add6d238ff9a87f9f65ff67fa5baed3 Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Mon, 2 Dec 2024 10:09:26 +0530 Subject: [PATCH 6/7] addressed Dan's suggestions and removed omitempty tags and updated defaultConfig() --- .../filebeat/docs/inputs/input-gcs.asciidoc | 2 +- x-pack/filebeat/input/gcs/config.go | 37 +++++++++++++------ x-pack/filebeat/input/gcs/input.go | 13 +------ 3 files changed, 27 insertions(+), 25 deletions(-) diff --git a/x-pack/filebeat/docs/inputs/input-gcs.asciidoc b/x-pack/filebeat/docs/inputs/input-gcs.asciidoc index a4797801a87..2a762ddec18 100644 --- a/x-pack/filebeat/docs/inputs/input-gcs.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-gcs.asciidoc @@ -10,7 +10,7 @@ ++++ Use the `google cloud storage input` to read content from files stored in buckets which reside on your Google Cloud. -The input can be configured to work with and without polling, though if polling is disabled, it will only perform a one time passthrough, list the file contents and end the process. +The input can be configured to work with and without polling, though if polling is disabled it will only perform a single collection of data, list the file contents and end the process. *To mitigate errors and ensure a stable processing environment, this input employs the following features :* diff --git a/x-pack/filebeat/input/gcs/config.go b/x-pack/filebeat/input/gcs/config.go index 5cc1430ed71..64f64c69bc5 100644 --- a/x-pack/filebeat/input/gcs/config.go +++ b/x-pack/filebeat/input/gcs/config.go @@ -28,16 +28,16 @@ type config struct { // Auth - Defines the authentication mechanism to be used for accessing the gcs bucket. Auth authConfig `config:"auth"` // MaxWorkers - Defines the maximum number of go routines that will be spawned. - MaxWorkers int `config:"max_workers,omitempty" validate:"max=5000"` + MaxWorkers int `config:"max_workers" validate:"max=5000"` // Poll - Defines if polling should be performed on the input bucket source. - Poll bool `config:"poll,omitempty"` + Poll bool `config:"poll"` // PollInterval - Defines the maximum amount of time to wait before polling for the next batch of objects from the bucket. - PollInterval time.Duration `config:"poll_interval,omitempty"` + PollInterval time.Duration `config:"poll_interval"` // ParseJSON - Informs the publisher whether to parse & objectify json data or not. By default this is set to // false, since it can get expensive dealing with highly nested json data. - ParseJSON bool `config:"parse_json,omitempty"` + ParseJSON bool `config:"parse_json"` // BucketTimeOut - Defines the maximum time that the sdk will wait for a bucket api response before timing out. - BucketTimeOut time.Duration `config:"bucket_timeout,omitempty"` + BucketTimeOut time.Duration `config:"bucket_timeout"` // Buckets - Defines a list of buckets that will be polled for objects. Buckets []bucket `config:"buckets" validate:"required"` // FileSelectors - Defines a list of regex patterns that can be used to filter out objects from the bucket. @@ -49,17 +49,17 @@ type config struct { // ExpandEventListFromField - Defines the field name that will be used to expand the event into separate events. ExpandEventListFromField string `config:"expand_event_list_from_field"` // This field is only used for system test purposes, to override the HTTP endpoint. - AlternativeHost string `config:"alternative_host,omitempty"` + AlternativeHost string `config:"alternative_host"` } // bucket contains the config for each specific object storage bucket in the root account type bucket struct { Name string `config:"name" validate:"required"` - MaxWorkers *int `config:"max_workers,omitempty" validate:"max=5000"` - BucketTimeOut *time.Duration `config:"bucket_timeout,omitempty"` - Poll *bool `config:"poll,omitempty"` - PollInterval *time.Duration `config:"poll_interval,omitempty"` - ParseJSON *bool `config:"parse_json,omitempty"` + MaxWorkers *int `config:"max_workers" validate:"max=5000"` + BucketTimeOut *time.Duration `config:"bucket_timeout"` + Poll *bool `config:"poll"` + PollInterval *time.Duration `config:"poll_interval"` + ParseJSON *bool `config:"parse_json"` FileSelectors []fileSelectorConfig `config:"file_selectors"` ReaderConfig readerConfig `config:",inline"` TimeStampEpoch *int64 `config:"timestamp_epoch"` @@ -78,13 +78,15 @@ type readerConfig struct { Decoding decoderConfig `config:"decoding"` } +// authConfig defines the authentication mechanism to be used for accessing the gcs bucket. +// If either is configured the 'omitempty' tag will prevent the other option from being serialized in the config. type authConfig struct { CredentialsJSON *jsonCredentialsConfig `config:"credentials_json,omitempty"` CredentialsFile *fileCredentialsConfig `config:"credentials_file,omitempty"` } type fileCredentialsConfig struct { - Path string `config:"path,omitempty"` + Path string `config:"path"` } type jsonCredentialsConfig struct { AccountKey string `config:"account_key"` @@ -115,3 +117,14 @@ func (c authConfig) Validate() error { return fmt.Errorf("no authentication credentials were configured or detected " + "(credentials_file, credentials_json, and application default credentials (ADC))") } + +// defaultConfig returns the default configuration for the input +func defaultConfig() config { + return config{ + MaxWorkers: 1, + Poll: true, + PollInterval: 5 * time.Minute, + BucketTimeOut: 120 * time.Second, + ParseJSON: false, + } +} diff --git a/x-pack/filebeat/input/gcs/input.go b/x-pack/filebeat/input/gcs/input.go index 2e4ed914c36..a538d8b7333 100644 --- a/x-pack/filebeat/input/gcs/input.go +++ b/x-pack/filebeat/input/gcs/input.go @@ -50,7 +50,7 @@ func Plugin(log *logp.Logger, store cursor.StateStore) v2.Plugin { } func configure(cfg *conf.C) ([]cursor.Source, cursor.Input, error) { - config := config{} + config := defaultConfig() if err := cfg.Unpack(&config); err != nil { return nil, nil, err } @@ -111,17 +111,6 @@ func tryOverrideOrDefault(cfg config, b bucket) bucket { return b } -// defaultConfig returns the default configuration for the input -func defaultConfig() config { - return config{ - MaxWorkers: 1, - Poll: true, - PollInterval: 5 * time.Minute, - BucketTimeOut: 120 * time.Second, - ParseJSON: false, - } -} - // isValidUnixTimestamp checks if the timestamp is a valid Unix timestamp func isValidUnixTimestamp(timestamp int64) bool { // checks if the timestamp is within the valid range From 76fee04ade28395df591f0b03119ebc9189b7798 Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Tue, 3 Dec 2024 09:10:45 +0530 Subject: [PATCH 7/7] updated tryOverrideOrDefault comment --- x-pack/filebeat/input/gcs/input.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/x-pack/filebeat/input/gcs/input.go b/x-pack/filebeat/input/gcs/input.go index a538d8b7333..33e46d034d7 100644 --- a/x-pack/filebeat/input/gcs/input.go +++ b/x-pack/filebeat/input/gcs/input.go @@ -78,9 +78,7 @@ func configure(cfg *conf.C) ([]cursor.Source, cursor.Input, error) { return sources, &gcsInput{config: config}, nil } -// tryOverrideOrDefault, overrides global values with local -// bucket level values if present. If both global & local values -// are absent, assigns default values +// tryOverrideOrDefault, overrides the bucket level values with global values if the bucket fields are not set func tryOverrideOrDefault(cfg config, b bucket) bucket { if b.MaxWorkers == nil { b.MaxWorkers = &cfg.MaxWorkers