From 08b7d84d4ab66ff988652a607a837bf93dc35e4f Mon Sep 17 00:00:00 2001 From: Chris Berkhout Date: Thu, 12 Dec 2024 10:14:46 +0100 Subject: [PATCH] .../input/entityanalytics/provider/okta: Rate limiting fix, improvements (#41977) - Fix a bug in the stopping of requests when `x-rate-limit-remaining: 0`. - Add a deadline so long waits return immediately as errors. - Add an option to set a fixed request rate. --- CHANGELOG.next.asciidoc | 2 + .../inputs/input-entity-analytics.asciidoc | 18 ++- .../entityanalytics/provider/okta/conf.go | 4 + .../provider/okta/internal/okta/okta.go | 36 ++--- .../provider/okta/internal/okta/okta_test.go | 54 +++---- .../okta/internal/okta/ratelimiter.go | 114 +++++++++++--- .../okta/internal/okta/ratelimiter_test.go | 143 ++++++++++++++---- .../entityanalytics/provider/okta/okta.go | 16 +- .../provider/okta/okta_test.go | 2 +- 9 files changed, 282 insertions(+), 107 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 22148d5a4f4..6840ef4ed53 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -193,6 +193,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Fix handling of http_endpoint request exceeding memory limits. {issue}41764[41764] {pull}41765[41765] - Rate limiting fixes in the Okta provider of the Entity Analytics input. {issue}40106[40106] {pull}41583[41583] - Redact authorization headers in HTTPJSON debug logs. {pull}41920[41920] +- Further rate limiting fix in the Okta provider of the Entity Analytics input. {issue}40106[40106] {pull}41977[41977] *Heartbeat* @@ -365,6 +366,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Add support for SSL and Proxy configurations for websoket type in streaming input. {pull}41934[41934] - AWS S3 input registry cleanup for untracked s3 objects. {pull}41694[41694] - The environment variable `BEATS_AZURE_EVENTHUB_INPUT_TRACING_ENABLED: true` enables internal logs tracer for the azure-eventhub input. {issue}41931[41931] {pull}41932[41932] +- Rate limiting operability improvements in the Okta provider of the Entity Analytics input. {issue}40106[40106] {pull}41977[41977] *Auditbeat* diff --git a/x-pack/filebeat/docs/inputs/input-entity-analytics.asciidoc b/x-pack/filebeat/docs/inputs/input-entity-analytics.asciidoc index b4b701d3919..faa50cf732d 100644 --- a/x-pack/filebeat/docs/inputs/input-entity-analytics.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-entity-analytics.asciidoc @@ -1015,7 +1015,21 @@ shorter than the full synchronization interval (`sync_interval`). Expressed as a duration string (e.g., 1m, 3h, 24h). Defaults to `15m` (15 minutes). [float] -==== `tracer.enabled` +===== `limit_window` + +The time between Okta API rate limit resets. +Expressed as a duration string (e.g., 1m, 3h, 24h). Defaults to `1m` (1 minute). + +[float] +===== `limit_fixed` + +The number of requests to allow in each limit window, if set. +This parameter should only be set in exceptional cases. When it is set, rate +limit information in API responses will be ignored in favor of the fixed limit. +The limit is applied separately to each endopint. Defaults to unset. + +[float] +===== `tracer.enabled` It is possible to log HTTP requests and responses to the Okta API to a local file-system for debugging configurations. This option is enabled by setting `tracer.enabled` to true and setting the `tracer.filename` value. @@ -1025,7 +1039,7 @@ to false without unsetting the filename option. Enabling this option compromises security and should only be used for debugging. [float] -==== `tracer.filename` +===== `tracer.filename` To differentiate the trace files generated from different input instances, a placeholder `*` can be added to the filename and will be replaced with the input instance id. For Example, `http-request-trace-*.ndjson`. diff --git a/x-pack/filebeat/input/entityanalytics/provider/okta/conf.go b/x-pack/filebeat/input/entityanalytics/provider/okta/conf.go index 41a3895a70d..61a61aed25e 100644 --- a/x-pack/filebeat/input/entityanalytics/provider/okta/conf.go +++ b/x-pack/filebeat/input/entityanalytics/provider/okta/conf.go @@ -68,6 +68,10 @@ type conf struct { // API limit resets. LimitWindow time.Duration `config:"limit_window"` + // LimitFixed is a number of requests to allow in each LimitWindow, + // overriding the guidance in API responses. + LimitFixed *int `config:"limit_fixed"` + // Request is the configuration for establishing // HTTP requests to the API. Request *requestConfig `config:"request"` diff --git a/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/okta.go b/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/okta.go index 42ffc060178..b2bacba89a8 100644 --- a/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/okta.go +++ b/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/okta.go @@ -185,13 +185,13 @@ func (o Response) String() string { // Parts of the response may be omitted using the omit parameter. // // The provided rate limiter must allow at least request and will be updated with the -// response's X-Rate-Limit headers considering the rate limit window time. Details -// for rate limits are available at https://help.okta.com/en-us/Content/Topics/Security/API-rate-limits.htm +// response's X-Rate-Limit headers. Details for rate limits are available at +// https://help.okta.com/en-us/Content/Topics/Security/API-rate-limits.htm // and account rate limits and windows can be seen on the Okta admin dashboard at // https://${yourOktaDomain}/reports/rate-limit. // // See https://developer.okta.com/docs/reference/api/users/#list-users for details. -func GetUserDetails(ctx context.Context, cli *http.Client, host, key, user string, query url.Values, omit Response, lim RateLimiter, window time.Duration, log *logp.Logger) ([]User, http.Header, error) { +func GetUserDetails(ctx context.Context, cli *http.Client, host, key, user string, query url.Values, omit Response, lim *RateLimiter, log *logp.Logger) ([]User, http.Header, error) { var endpoint, path string if user == "" { endpoint = "/api/v1/users" @@ -207,7 +207,7 @@ func GetUserDetails(ctx context.Context, cli *http.Client, host, key, user strin Path: path, RawQuery: query.Encode(), } - return getDetails[User](ctx, cli, u, endpoint, key, user == "", omit, lim, window, log) + return getDetails[User](ctx, cli, u, endpoint, key, user == "", omit, lim, log) } // GetUserFactors returns Okta group roles using the groups API endpoint. host is the @@ -216,7 +216,7 @@ func GetUserDetails(ctx context.Context, cli *http.Client, host, key, user strin // See GetUserDetails for details of the query and rate limit parameters. // // See https://developer.okta.com/docs/api/openapi/okta-management/management/tag/UserFactor/#tag/UserFactor/operation/listFactors. -func GetUserFactors(ctx context.Context, cli *http.Client, host, key, user string, lim RateLimiter, window time.Duration, log *logp.Logger) ([]Factor, http.Header, error) { +func GetUserFactors(ctx context.Context, cli *http.Client, host, key, user string, lim *RateLimiter, log *logp.Logger) ([]Factor, http.Header, error) { if user == "" { return nil, nil, errors.New("no user specified") } @@ -229,7 +229,7 @@ func GetUserFactors(ctx context.Context, cli *http.Client, host, key, user strin Host: host, Path: path, } - return getDetails[Factor](ctx, cli, u, endpoint, key, true, OmitNone, lim, window, log) + return getDetails[Factor](ctx, cli, u, endpoint, key, true, OmitNone, lim, log) } // GetUserRoles returns Okta group roles using the groups API endpoint. host is the @@ -238,7 +238,7 @@ func GetUserFactors(ctx context.Context, cli *http.Client, host, key, user strin // See GetUserDetails for details of the query and rate limit parameters. // // See https://developer.okta.com/docs/api/openapi/okta-management/management/tag/RoleAssignmentBGroup/#tag/RoleAssignmentBGroup/operation/listGroupAssignedRoles. -func GetUserRoles(ctx context.Context, cli *http.Client, host, key, user string, lim RateLimiter, window time.Duration, log *logp.Logger) ([]Role, http.Header, error) { +func GetUserRoles(ctx context.Context, cli *http.Client, host, key, user string, lim *RateLimiter, log *logp.Logger) ([]Role, http.Header, error) { if user == "" { return nil, nil, errors.New("no user specified") } @@ -251,7 +251,7 @@ func GetUserRoles(ctx context.Context, cli *http.Client, host, key, user string, Host: host, Path: path, } - return getDetails[Role](ctx, cli, u, endpoint, key, true, OmitNone, lim, window, log) + return getDetails[Role](ctx, cli, u, endpoint, key, true, OmitNone, lim, log) } // GetUserGroupDetails returns Okta group details using the users API endpoint. host is the @@ -260,7 +260,7 @@ func GetUserRoles(ctx context.Context, cli *http.Client, host, key, user string, // See GetUserDetails for details of the query and rate limit parameters. // // See https://developer.okta.com/docs/reference/api/users/#request-parameters-8 (no anchor exists on the page for this endpoint) for details. -func GetUserGroupDetails(ctx context.Context, cli *http.Client, host, key, user string, lim RateLimiter, window time.Duration, log *logp.Logger) ([]Group, http.Header, error) { +func GetUserGroupDetails(ctx context.Context, cli *http.Client, host, key, user string, lim *RateLimiter, log *logp.Logger) ([]Group, http.Header, error) { if user == "" { return nil, nil, errors.New("no user specified") } @@ -273,7 +273,7 @@ func GetUserGroupDetails(ctx context.Context, cli *http.Client, host, key, user Host: host, Path: path, } - return getDetails[Group](ctx, cli, u, endpoint, key, true, OmitNone, lim, window, log) + return getDetails[Group](ctx, cli, u, endpoint, key, true, OmitNone, lim, log) } // GetGroupRoles returns Okta group roles using the groups API endpoint. host is the @@ -282,7 +282,7 @@ func GetUserGroupDetails(ctx context.Context, cli *http.Client, host, key, user // See GetUserDetails for details of the query and rate limit parameters. // // See https://developer.okta.com/docs/api/openapi/okta-management/management/tag/RoleAssignmentBGroup/#tag/RoleAssignmentBGroup/operation/listGroupAssignedRoles. -func GetGroupRoles(ctx context.Context, cli *http.Client, host, key, group string, lim RateLimiter, window time.Duration, log *logp.Logger) ([]Role, http.Header, error) { +func GetGroupRoles(ctx context.Context, cli *http.Client, host, key, group string, lim *RateLimiter, log *logp.Logger) ([]Role, http.Header, error) { if group == "" { return nil, nil, errors.New("no group specified") } @@ -295,7 +295,7 @@ func GetGroupRoles(ctx context.Context, cli *http.Client, host, key, group strin Host: host, Path: path, } - return getDetails[Role](ctx, cli, u, endpoint, key, true, OmitNone, lim, window, log) + return getDetails[Role](ctx, cli, u, endpoint, key, true, OmitNone, lim, log) } // GetDeviceDetails returns Okta device details using the list devices API endpoint. host is the @@ -305,7 +305,7 @@ func GetGroupRoles(ctx context.Context, cli *http.Client, host, key, group strin // See GetUserDetails for details of the query and rate limit parameters. // // See https://developer.okta.com/docs/api/openapi/okta-management/management/tag/Device/#tag/Device/operation/listDevices for details. -func GetDeviceDetails(ctx context.Context, cli *http.Client, host, key, device string, query url.Values, lim RateLimiter, window time.Duration, log *logp.Logger) ([]Device, http.Header, error) { +func GetDeviceDetails(ctx context.Context, cli *http.Client, host, key, device string, query url.Values, lim *RateLimiter, log *logp.Logger) ([]Device, http.Header, error) { var endpoint string var path string if device == "" { @@ -322,7 +322,7 @@ func GetDeviceDetails(ctx context.Context, cli *http.Client, host, key, device s Path: path, RawQuery: query.Encode(), } - return getDetails[Device](ctx, cli, u, endpoint, key, device == "", OmitNone, lim, window, log) + return getDetails[Device](ctx, cli, u, endpoint, key, device == "", OmitNone, lim, log) } // GetDeviceUsers returns Okta user details for users associated with the provided device identifier @@ -332,7 +332,7 @@ func GetDeviceDetails(ctx context.Context, cli *http.Client, host, key, device s // See GetUserDetails for details of the query and rate limit parameters. // // See https://developer.okta.com/docs/api/openapi/okta-management/management/tag/Device/#tag/Device/operation/listDeviceUsers for details. -func GetDeviceUsers(ctx context.Context, cli *http.Client, host, key, device string, query url.Values, omit Response, lim RateLimiter, window time.Duration, log *logp.Logger) ([]User, http.Header, error) { +func GetDeviceUsers(ctx context.Context, cli *http.Client, host, key, device string, query url.Values, omit Response, lim *RateLimiter, log *logp.Logger) ([]User, http.Header, error) { if device == "" { // No user associated with a null device. Not an error. return nil, nil, nil @@ -347,7 +347,7 @@ func GetDeviceUsers(ctx context.Context, cli *http.Client, host, key, device str Path: path, RawQuery: query.Encode(), } - du, h, err := getDetails[devUser](ctx, cli, u, endpoint, key, true, omit, lim, window, log) + du, h, err := getDetails[devUser](ctx, cli, u, endpoint, key, true, omit, lim, log) if err != nil { return nil, h, err } @@ -372,7 +372,7 @@ type devUser struct { // for the specific user are returned, otherwise a list of all users is returned. // // See GetUserDetails for details of the query and rate limit parameters. -func getDetails[E entity](ctx context.Context, cli *http.Client, u *url.URL, endpoint string, key string, all bool, omit Response, lim RateLimiter, window time.Duration, log *logp.Logger) ([]E, http.Header, error) { +func getDetails[E entity](ctx context.Context, cli *http.Client, u *url.URL, endpoint string, key string, all bool, omit Response, lim *RateLimiter, log *logp.Logger) ([]E, http.Header, error) { url := u.String() req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) if err != nil { @@ -395,7 +395,7 @@ func getDetails[E entity](ctx context.Context, cli *http.Client, u *url.URL, end return nil, nil, err } defer resp.Body.Close() - err = lim.Update(endpoint, resp.Header, window, log) + err = lim.Update(endpoint, resp.Header, log) if err != nil { io.Copy(io.Discard, resp.Body) return nil, nil, err diff --git a/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/okta_test.go b/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/okta_test.go index 45b1b2a4ca4..d362d72f391 100644 --- a/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/okta_test.go +++ b/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/okta_test.go @@ -44,14 +44,14 @@ func Test(t *testing.T) { t.Skip("okta tests require ${OKTA_TOKEN} to be set") } - // Make a global limiter - limiter := NewRateLimiter() - // There are a variety of windows, the most conservative is one minute. // The rate limit will be adjusted on the second call to the API if // window is actually used to rate limit calculations. const window = time.Minute + // Make a global limiter + limiter := NewRateLimiter(window, nil) + for _, omit := range []Response{ OmitNone, OmitCredentials, @@ -65,7 +65,7 @@ func Test(t *testing.T) { t.Run("me", func(t *testing.T) { query := make(url.Values) query.Set("limit", "200") - users, _, err := GetUserDetails(context.Background(), http.DefaultClient, host, key, "me", query, omit, limiter, window, logger) + users, _, err := GetUserDetails(context.Background(), http.DefaultClient, host, key, "me", query, omit, limiter, logger) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -94,7 +94,7 @@ func Test(t *testing.T) { t.Run("my_groups", func(t *testing.T) { query := make(url.Values) query.Set("limit", "200") - groups, _, err := GetUserGroupDetails(context.Background(), http.DefaultClient, host, key, me.ID, limiter, window, logger) + groups, _, err := GetUserGroupDetails(context.Background(), http.DefaultClient, host, key, me.ID, limiter, logger) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -119,7 +119,7 @@ func Test(t *testing.T) { t.Run("my_roles", func(t *testing.T) { query := make(url.Values) query.Set("limit", "200") - roles, _, err := GetUserRoles(context.Background(), http.DefaultClient, host, key, me.ID, limiter, window, logger) + roles, _, err := GetUserRoles(context.Background(), http.DefaultClient, host, key, me.ID, limiter, logger) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -144,7 +144,7 @@ func Test(t *testing.T) { t.Run("my_factors", func(t *testing.T) { query := make(url.Values) query.Set("limit", "200") - factors, _, err := GetUserFactors(context.Background(), http.DefaultClient, host, key, me.ID, limiter, window, logger) + factors, _, err := GetUserFactors(context.Background(), http.DefaultClient, host, key, me.ID, limiter, logger) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -175,7 +175,7 @@ func Test(t *testing.T) { query := make(url.Values) query.Set("limit", "200") - users, _, err := GetUserDetails(context.Background(), http.DefaultClient, host, key, login, query, omit, limiter, window, logger) + users, _, err := GetUserDetails(context.Background(), http.DefaultClient, host, key, login, query, omit, limiter, logger) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -190,7 +190,7 @@ func Test(t *testing.T) { t.Run("all", func(t *testing.T) { query := make(url.Values) query.Set("limit", "200") - users, _, err := GetUserDetails(context.Background(), http.DefaultClient, host, key, "", query, omit, limiter, window, logger) + users, _, err := GetUserDetails(context.Background(), http.DefaultClient, host, key, "", query, omit, limiter, logger) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -218,7 +218,7 @@ func Test(t *testing.T) { query := make(url.Values) query.Set("limit", "200") query.Add("search", `not (status pr)`) // This cannot ever be true. - _, _, err := GetUserDetails(context.Background(), http.DefaultClient, host, key, "", query, omit, limiter, window, logger) + _, _, err := GetUserDetails(context.Background(), http.DefaultClient, host, key, "", query, omit, limiter, logger) oktaErr := &Error{} if !errors.As(err, &oktaErr) { // Don't test the value of the error since it was @@ -234,7 +234,7 @@ func Test(t *testing.T) { t.Run("device", func(t *testing.T) { query := make(url.Values) query.Set("limit", "200") - devices, _, err := GetDeviceDetails(context.Background(), http.DefaultClient, host, key, "", query, limiter, window, logger) + devices, _, err := GetDeviceDetails(context.Background(), http.DefaultClient, host, key, "", query, limiter, logger) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -246,7 +246,7 @@ func Test(t *testing.T) { t.Logf("devices: %s", b) } for _, d := range devices { - users, _, err := GetDeviceUsers(context.Background(), http.DefaultClient, host, key, d.ID, query, OmitCredentials, limiter, window, logger) + users, _, err := GetDeviceUsers(context.Background(), http.DefaultClient, host, key, d.ID, query, OmitCredentials, limiter, logger) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -263,15 +263,15 @@ var localTests = []struct { name string msg string id string - fn func(ctx context.Context, cli *http.Client, host, key, user string, query url.Values, lim RateLimiter, window time.Duration, log *logp.Logger) (any, http.Header, error) + fn func(ctx context.Context, cli *http.Client, host, key, user string, query url.Values, lim *RateLimiter, log *logp.Logger) (any, http.Header, error) mkWant func(string) (any, error) }{ { // Test case constructed from API-returned value with details anonymised. name: "users", msg: `[{"id":"userid","status":"STATUS","created":"2023-05-14T13:37:20.000Z","activated":null,"statusChanged":"2023-05-15T01:50:30.000Z","lastLogin":"2023-05-15T01:59:20.000Z","lastUpdated":"2023-05-15T01:50:32.000Z","passwordChanged":"2023-05-15T01:50:32.000Z","recovery_question":{"question":"Who's a major player in the cowboy scene?","answer":"Annie Oakley"},"type":{"id":"typeid"},"profile":{"firstName":"name","lastName":"surname","mobilePhone":null,"secondEmail":null,"login":"name.surname@example.com","email":"name.surname@example.com"},"credentials":{"password":{"value":"secret"},"emails":[{"value":"name.surname@example.com","status":"VERIFIED","type":"PRIMARY"}],"provider":{"type":"OKTA","name":"OKTA"}},"_links":{"self":{"href":"https://localhost/api/v1/users/userid"}}}]`, - fn: func(ctx context.Context, cli *http.Client, host, key, user string, query url.Values, lim RateLimiter, window time.Duration, log *logp.Logger) (any, http.Header, error) { - return GetUserDetails(context.Background(), cli, host, key, user, query, OmitNone, lim, window, log) + fn: func(ctx context.Context, cli *http.Client, host, key, user string, query url.Values, lim *RateLimiter, log *logp.Logger) (any, http.Header, error) { + return GetUserDetails(context.Background(), cli, host, key, user, query, OmitNone, lim, log) }, mkWant: mkWant[User], }, @@ -279,8 +279,8 @@ var localTests = []struct { // Test case from https://developer.okta.com/docs/api/openapi/okta-management/management/tag/Device/#tag/Device/operation/listDevices name: "devices", msg: `[{"id":"devid","status":"CREATED","created":"2019-10-02T18:03:07.000Z","lastUpdated":"2019-10-02T18:03:07.000Z","profile":{"displayName":"Example Device name 1","platform":"WINDOWS","serialNumber":"XXDDRFCFRGF3M8MD6D","sid":"S-1-11-111","registered":true,"secureHardwarePresent":false,"diskEncryptionType":"ALL_INTERNAL_VOLUMES"},"resourceType":"UDDevice","resourceDisplayName":{"value":"Example Device name 1","sensitive":false},"resourceAlternateId":null,"resourceId":"guo4a5u7YAHhjXrMK0g4","_links":{"activate":{"href":"https://{yourOktaDomain}/api/v1/devices/guo4a5u7YAHhjXrMK0g4/lifecycle/activate","hints":{"allow":["POST"]}},"self":{"href":"https://{yourOktaDomain}/api/v1/devices/guo4a5u7YAHhjXrMK0g4","hints":{"allow":["GET","PATCH","PUT"]}},"users":{"href":"https://{yourOktaDomain}/api/v1/devices/guo4a5u7YAHhjXrMK0g4/users","hints":{"allow":["GET"]}}}},{"id":"guo4a5u7YAHhjXrMK0g5","status":"ACTIVE","created":"2023-06-21T23:24:02.000Z","lastUpdated":"2023-06-21T23:24:02.000Z","profile":{"displayName":"Example Device name 2","platform":"ANDROID","manufacturer":"Google","model":"Pixel 6","osVersion":"13:2023-05-05","registered":true,"secureHardwarePresent":true,"diskEncryptionType":"USER"},"resourceType":"UDDevice","resourceDisplayName":{"value":"Example Device name 2","sensitive":false},"resourceAlternateId":null,"resourceId":"guo4a5u7YAHhjXrMK0g5","_links":{"activate":{"href":"https://{yourOktaDomain}/api/v1/devices/guo4a5u7YAHhjXrMK0g5/lifecycle/activate","hints":{"allow":["POST"]}},"self":{"href":"https://{yourOktaDomain}/api/v1/devices/guo4a5u7YAHhjXrMK0g5","hints":{"allow":["GET","PATCH","PUT"]}},"users":{"href":"https://{yourOktaDomain}/api/v1/devices/guo4a5u7YAHhjXrMK0g5/users","hints":{"allow":["GET"]}}}}]`, - fn: func(ctx context.Context, cli *http.Client, host, key, device string, query url.Values, lim RateLimiter, window time.Duration, log *logp.Logger) (any, http.Header, error) { - return GetDeviceDetails(context.Background(), cli, host, key, device, query, lim, window, log) + fn: func(ctx context.Context, cli *http.Client, host, key, device string, query url.Values, lim *RateLimiter, log *logp.Logger) (any, http.Header, error) { + return GetDeviceDetails(context.Background(), cli, host, key, device, query, lim, log) }, mkWant: mkWant[Device], }, @@ -289,8 +289,8 @@ var localTests = []struct { name: "devices_users", msg: `[{"created":"2023-08-07T21:48:27.000Z","managementStatus":"NOT_MANAGED","user":{"id":"userid","status":"STATUS","created":"2023-05-14T13:37:20.000Z","activated":null,"statusChanged":"2023-05-15T01:50:30.000Z","lastLogin":"2023-05-15T01:59:20.000Z","lastUpdated":"2023-05-15T01:50:32.000Z","passwordChanged":"2023-05-15T01:50:32.000Z","type":{"id":"typeid"},"profile":{"firstName":"name","lastName":"surname","mobilePhone":null,"secondEmail":null,"login":"name.surname@example.com","email":"name.surname@example.com"},"credentials":{"password":{"value":"secret"},"recovery_question":{"question":"Who's a major player in the cowboy scene?","answer":"Annie Oakley"},"emails":[{"value":"name.surname@example.com","status":"VERIFIED","type":"PRIMARY"}],"provider":{"type":"OKTA","name":"OKTA"}},"_links":{"self":{"href":"https://localhost/api/v1/users/userid"}}}}]`, id: "devid", - fn: func(ctx context.Context, cli *http.Client, host, key, device string, query url.Values, lim RateLimiter, window time.Duration, log *logp.Logger) (any, http.Header, error) { - return GetDeviceUsers(context.Background(), cli, host, key, device, query, OmitNone, lim, window, log) + fn: func(ctx context.Context, cli *http.Client, host, key, device string, query url.Values, lim *RateLimiter, log *logp.Logger) (any, http.Header, error) { + return GetDeviceUsers(context.Background(), cli, host, key, device, query, OmitNone, lim, log) }, mkWant: mkWant[devUser], }, @@ -315,12 +315,12 @@ func TestLocal(t *testing.T) { for _, test := range localTests { t.Run(test.name, func(t *testing.T) { - limiter := NewRateLimiter() - // There are a variety of windows, the most conservative is one minute. // The rate limit will be adjusted on the second call to the API if // window is actually used to rate limit calculations. const window = time.Minute + var fixedLimit *int = nil + limiter := NewRateLimiter(window, fixedLimit) const key = "token" want, err := test.mkWant(test.msg) @@ -366,7 +366,7 @@ func TestLocal(t *testing.T) { query := make(url.Values) query.Set("limit", "200") - got, h, err := test.fn(context.Background(), ts.Client(), host, key, test.id, query, limiter, window, logger) + got, h, err := test.fn(context.Background(), ts.Client(), host, key, test.id, query, limiter, logger) if err != nil { t.Fatalf("unexpected error from Get_Details: %v", err) } @@ -375,15 +375,15 @@ func TestLocal(t *testing.T) { t.Errorf("unexpected result:\n- want\n+ got\n%s", cmp.Diff(want, got)) } - if len(limiter) != 1 { - t.Errorf("unexpected number endpoints track by rate limiter: %d", len(limiter)) + if len(limiter.byEndpoint) != 1 { + t.Errorf("unexpected number endpoints track by rate limiter: %d", len(limiter.byEndpoint)) } // retrieve the rate.Limiter parameters for the one endpoint var limit rate.Limit var burst int - for _, l := range limiter { - limit = l.Limit() - burst = l.Burst() + for _, e := range limiter.byEndpoint { + limit = e.limiter.Limit() + burst = e.limiter.Burst() break } diff --git a/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/ratelimiter.go b/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/ratelimiter.go index 1b58e01328c..2dfd8ae923b 100644 --- a/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/ratelimiter.go +++ b/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/ratelimiter.go @@ -16,33 +16,90 @@ import ( "github.com/elastic/elastic-agent-libs/logp" ) -type RateLimiter map[string]*rate.Limiter +// RateLimiter holds rate limiting information for an API. +// +// Each API endpoint has its own rate limit, which can be dynamically updated +// using response headers. If a fixed limit is set, it takes precedence over any +// information from response headers. +type RateLimiter struct { + window time.Duration + fixedLimit *int + byEndpoint map[string]endpointRateLimiter +} -func NewRateLimiter() RateLimiter { - r := make(RateLimiter) - return r +// endpointRateLimiter represents rate limiting information for a single API endpoint. +type endpointRateLimiter struct { + limiter *rate.Limiter + ready chan struct{} } -func (r RateLimiter) limiter(path string) *rate.Limiter { - if existing, ok := r[path]; ok { +// maxWait defines the maximum wait duration allowed for rate limiting. +// Longer waits are considered errors. +const maxWait = 30 * time.Minute + +// NewRateLimiter constructs a new RateLimiter. +// +// Parameters: +// - `window`: The time between API limit resets. Used for setting an initial +// target rate. +// - `fixedLimit`: A fixed number of requests to allow in each `window`, +// overriding the guidance in API responses. +// +// Returns: +// - A pointer to a new RateLimiter instance. +func NewRateLimiter(window time.Duration, fixedLimit *int) *RateLimiter { + endpoints := make(map[string]endpointRateLimiter) + r := RateLimiter{ + window: window, + fixedLimit: fixedLimit, + byEndpoint: endpoints, + } + r.fixedLimit = fixedLimit + return &r +} + +var immediatelyReady = make(chan struct{}) + +func init() { close(immediatelyReady) } + +func (r RateLimiter) endpoint(path string) endpointRateLimiter { + if existing, ok := r.byEndpoint[path]; ok { return existing } - initial := rate.NewLimiter(1, 1) // Allow a single fetch operation to obtain limits from the API - r[path] = initial - return initial + limit := rate.Limit(1) + if r.fixedLimit != nil { + limit = rate.Limit(float64(*r.fixedLimit) / r.window.Seconds()) + } + limiter := rate.NewLimiter(limit, 1) // Allow a single fetch operation to obtain limits from the API + newEndpointRateLimiter := endpointRateLimiter{ + limiter: limiter, + ready: immediatelyReady, + } + r.byEndpoint[path] = newEndpointRateLimiter + return newEndpointRateLimiter } func (r RateLimiter) Wait(ctx context.Context, endpoint string, url *url.URL, log *logp.Logger) (err error) { - limiter := r.limiter(endpoint) - log.Debugw("rate limit", "limit", limiter.Limit(), "burst", limiter.Burst(), "url", url.String()) - return limiter.Wait(ctx) + e := r.endpoint(endpoint) + log.Debugw("rate limit", "limit", e.limiter.Limit(), "burst", e.limiter.Burst(), "url", url.String()) + ctxWithDeadline, cancel := context.WithDeadline(ctx, time.Now().Add(maxWait)) + defer cancel() + select { + case <-e.ready: + case <-ctxWithDeadline.Done(): + return ctxWithDeadline.Err() + } + return e.limiter.Wait(ctxWithDeadline) } // Update implements the Okta rate limit policy translation. // // See https://developer.okta.com/docs/reference/rl-best-practices/ for details. -func (r RateLimiter) Update(endpoint string, h http.Header, window time.Duration, log *logp.Logger) error { - limiter := r.limiter(endpoint) +func (r RateLimiter) Update(endpoint string, h http.Header, log *logp.Logger) error { + if r.fixedLimit != nil { + return nil + } + e := r.endpoint(endpoint) limit := h.Get("X-Rate-Limit-Limit") remaining := h.Get("X-Rate-Limit-Remaining") reset := h.Get("X-Rate-Limit-Reset") @@ -78,20 +135,33 @@ func (r RateLimiter) Update(endpoint string, h http.Header, window time.Duration if rateLimit <= 0 { // Reset limiter to block requests until reset limiter := rate.NewLimiter(0, 0) - r[endpoint] = limiter + ready := make(chan struct{}) + newEndpointRateLimiter := endpointRateLimiter{ + limiter: limiter, + ready: ready, + } + r.byEndpoint[endpoint] = newEndpointRateLimiter + + resetTimeUTC := resetTime.UTC() + log.Debugw("rate limit block until reset", "reset_time", resetTimeUTC) // next gives us a sane next window estimate, but the // estimate will be overwritten when we make the next // permissible API request. - next := rate.Limit(lim / window.Seconds()) - waitUntil := resetTime.UTC() - limiter.SetLimitAt(waitUntil, next) - limiter.SetBurstAt(waitUntil, burst) - log.Debugw("rate limit reset", "reset_time", waitUntil, "next_rate", next, "next_burst", burst) + next := rate.Limit(lim / r.window.Seconds()) + waitFor := time.Until(resetTimeUTC) + + time.AfterFunc(waitFor, func() { + limiter.SetLimit(next) + limiter.SetBurst(burst) + close(ready) + log.Debugw("rate limit reset", "reset_time", resetTimeUTC, "reset_rate", next, "reset_burst", burst) + }) + return nil } - limiter.SetLimit(rateLimit) - limiter.SetBurst(burst) + e.limiter.SetLimit(rateLimit) + e.limiter.SetBurst(burst) log.Debugw("rate limit adjust", "set_rate", rateLimit, "set_burst", burst) return nil } diff --git a/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/ratelimiter_test.go b/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/ratelimiter_test.go index 1492e55c8a6..cc8a832bee2 100644 --- a/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/ratelimiter_test.go +++ b/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/ratelimiter_test.go @@ -5,7 +5,9 @@ package okta import ( + "context" "net/http" + "net/url" "strconv" "testing" "time" @@ -17,68 +19,151 @@ func TestRateLimiter(t *testing.T) { logp.TestingSetup() t.Run("separation by endpoint", func(t *testing.T) { - r := NewRateLimiter() - limiter1 := r.limiter("/foo") - limiter2 := r.limiter("/bar") + const window = time.Minute + var fixedLimit *int = nil + r := NewRateLimiter(window, fixedLimit) + e1 := r.endpoint("/foo") + e2 := r.endpoint("/bar") - limiter1.SetBurst(1000) + e1.limiter.SetBurst(1000) - if limiter2.Burst() == 1000 { + if e2.limiter.Burst() == 1000 { t.Errorf("changes to one endpoint's limits affected another") } }) t.Run("Update stops requests when none are remaining", func(t *testing.T) { - r := NewRateLimiter() - + const window = time.Minute + var fixedLimit *int = nil + r := NewRateLimiter(window, fixedLimit) const endpoint = "/foo" - limiter := r.limiter(endpoint) + url, err := url.Parse(endpoint) + if err != nil { + t.Errorf("unexpected error from url.Parse(): %v", err) + } + ctx := context.Background() + log := logp.L() + e := r.endpoint(endpoint) - if !limiter.Allow() { + if !e.limiter.Allow() { t.Errorf("doesn't allow an initial request") } + // update to none remaining, reset soon now := time.Now().Unix() - reset := now + 30 - + resetSoon := now + 30 headers := http.Header{ "X-Rate-Limit-Limit": []string{"60"}, "X-Rate-Limit-Remaining": []string{"0"}, - "X-Rate-Limit-Reset": []string{strconv.FormatInt(reset, 10)}, + "X-Rate-Limit-Reset": []string{strconv.FormatInt(resetSoon, 10)}, } - window := time.Minute - - err := r.Update(endpoint, headers, window, logp.L()) + err = r.Update(endpoint, headers, logp.L()) if err != nil { t.Errorf("unexpected error from Update(): %v", err) } - limiter = r.limiter(endpoint) + e = r.endpoint(endpoint) - if limiter.Allow() { + if e.limiter.Allow() { t.Errorf("allowed a request when none are remaining") } - - if limiter.AllowN(time.Unix(reset-1, 999999999), 1) { + if e.limiter.AllowN(time.Unix(resetSoon-1, 999999999), 1) { t.Errorf("allowed a request before reset, when none are remaining") } - if !limiter.AllowN(time.Unix(reset+1, 0), 1) { - t.Errorf("doesn't allow requests to resume after reset") + // update to none remaining, reset now + headers = http.Header{ + "X-Rate-Limit-Limit": []string{"60"}, + "X-Rate-Limit-Remaining": []string{"0"}, + "X-Rate-Limit-Reset": []string{strconv.FormatInt(now, 10)}, + } + err = r.Update(endpoint, headers, logp.L()) + if err != nil { + t.Errorf("unexpected error from Update(): %v", err) + } + e = r.endpoint(endpoint) + + start := time.Now() + r.Wait(ctx, endpoint, url, log) + wait := time.Since(start) + + if wait > 1010*time.Millisecond { + t.Errorf("doesn't allow requests to resume after reset. had to wait %s", wait) + } + if e.limiter.Limit() != 1.0 { + t.Errorf("unexpected rate following reset (not 60 requests / 60 seconds): %f", e.limiter.Limit()) + } + if e.limiter.Burst() != 1 { + t.Errorf("unexpected burst following reset (not 1): %d", e.limiter.Burst()) + } + + e.limiter.SetBurst(100) // increase bucket size to check token accumulation + tokens := e.limiter.TokensAt(time.Unix(0, time.Now().Add(30*time.Second).UnixNano())) + target := 30.0 + buffer := 0.01 + + if tokens < target-buffer || target+buffer < tokens { + t.Errorf("tokens don't accumulate at the expected rate over 30s: %f", tokens) + } + }) + + t.Run("Very long waits are considered errors", func(t *testing.T) { + const window = time.Minute + var fixedLimit *int = nil + r := NewRateLimiter(window, fixedLimit) + + const endpoint = "/foo" + + url, err := url.Parse(endpoint) + if err != nil { + t.Errorf("unexpected error from url.Parse(): %v", err) } + reset := time.Now().Add(31 * time.Minute).Unix() + headers := http.Header{ + "X-Rate-Limit-Limit": []string{"60"}, + "X-Rate-Limit-Remaining": []string{"1"}, + "X-Rate-Limit-Reset": []string{strconv.FormatInt(reset, 10)}, + } + log := logp.L() + ctx := context.Background() + + r.Wait(ctx, endpoint, url, log) // consume the initial request + r.Update(endpoint, headers, log) // update to a slow rate - if limiter.Limit() != 1.0 { - t.Errorf("unexpected rate following reset (not 60 requests / 60 seconds): %f", limiter.Limit()) + err = r.Wait(ctx, endpoint, url, log) + + const expectedErr = "rate: Wait(n=1) would exceed context deadline" + if err == nil { + t.Errorf("expected error message %q, but got no error", expectedErr) + } else if err.Error() != expectedErr { + t.Errorf("expected error message %q, but got %q", expectedErr, err.Error()) } + }) - if limiter.Burst() != 1 { - t.Errorf("unexpected burst following reset (not 1): %d", limiter.Burst()) + t.Run("A fixed limit overrides response information", func(t *testing.T) { + const window = time.Minute + var fixedLimit int = 120 + r := NewRateLimiter(window, &fixedLimit) + const endpoint = "/foo" + e := r.endpoint(endpoint) + + if e.limiter.Limit() != 120/60 { + t.Errorf("unexpected rate (for fixed 120 reqs / 60 secs): %f", e.limiter.Limit()) } - limiter.SetBurstAt(time.Unix(reset, 0), 100) // increase bucket size to check token accumulation - tokens := limiter.TokensAt(time.Unix(reset+30, 0)) - if tokens < 29.5 || tokens > 30.0 { - t.Errorf("tokens don't accumulate at the expected rate. tokens 30s after reset: %f", tokens) + // update to 15 requests remaining, reset in 30s + headers := http.Header{ + "X-Rate-Limit-Limit": []string{"60"}, + "X-Rate-Limit-Remaining": []string{"15"}, + "X-Rate-Limit-Reset": []string{strconv.FormatInt(time.Now().Unix()+30, 10)}, } + err := r.Update(endpoint, headers, logp.L()) + if err != nil { + t.Errorf("unexpected error from Update(): %v", err) + } + e = r.endpoint(endpoint) + if e.limiter.Limit() != 120/60 { + t.Errorf("unexpected rate following Update() (for fixed 120 reqs / 60 secs): %f", e.limiter.Limit()) + } }) } diff --git a/x-pack/filebeat/input/entityanalytics/provider/okta/okta.go b/x-pack/filebeat/input/entityanalytics/provider/okta/okta.go index 30103d3ccdb..0e33e93cc9b 100644 --- a/x-pack/filebeat/input/entityanalytics/provider/okta/okta.go +++ b/x-pack/filebeat/input/entityanalytics/provider/okta/okta.go @@ -59,7 +59,7 @@ type oktaInput struct { cfg conf client *http.Client - lim okta.RateLimiter + lim *okta.RateLimiter metrics *inputMetrics logger *logp.Logger @@ -110,7 +110,7 @@ func (p *oktaInput) Run(inputCtx v2.Context, store *kvstore.Store, client beat.C updateTimer := time.NewTimer(updateWaitTime) // Allow a single fetch operation to obtain limits from the API. - p.lim = okta.NewRateLimiter() + p.lim = okta.NewRateLimiter(p.cfg.LimitWindow, p.cfg.LimitFixed) if p.cfg.Tracer != nil { id := sanitizeFileName(inputCtx.IDWithoutName) @@ -451,7 +451,7 @@ func (p *oktaInput) doFetchUsers(ctx context.Context, state *stateStore, fullSyn lastUpdated time.Time ) for { - batch, h, err := okta.GetUserDetails(ctx, p.client, p.cfg.OktaDomain, p.cfg.OktaToken, "", query, omit, p.lim, p.cfg.LimitWindow, p.logger) + batch, h, err := okta.GetUserDetails(ctx, p.client, p.cfg.OktaDomain, p.cfg.OktaToken, "", query, omit, p.lim, p.logger) if err != nil { p.logger.Debugf("received %d users from API", len(users)) return nil, err @@ -512,7 +512,7 @@ func (p *oktaInput) addUserMetadata(ctx context.Context, u okta.User, state *sta return su } if slices.Contains(p.cfg.EnrichWith, "groups") { - groups, _, err := okta.GetUserGroupDetails(ctx, p.client, p.cfg.OktaDomain, p.cfg.OktaToken, u.ID, p.lim, p.cfg.LimitWindow, p.logger) + groups, _, err := okta.GetUserGroupDetails(ctx, p.client, p.cfg.OktaDomain, p.cfg.OktaToken, u.ID, p.lim, p.logger) if err != nil { p.logger.Warnf("failed to get user group membership for %s: %v", u.ID, err) } else { @@ -520,7 +520,7 @@ func (p *oktaInput) addUserMetadata(ctx context.Context, u okta.User, state *sta } } if slices.Contains(p.cfg.EnrichWith, "factors") { - factors, _, err := okta.GetUserFactors(ctx, p.client, p.cfg.OktaDomain, p.cfg.OktaToken, u.ID, p.lim, p.cfg.LimitWindow, p.logger) + factors, _, err := okta.GetUserFactors(ctx, p.client, p.cfg.OktaDomain, p.cfg.OktaToken, u.ID, p.lim, p.logger) if err != nil { p.logger.Warnf("failed to get user factors for %s: %v", u.ID, err) } else { @@ -528,7 +528,7 @@ func (p *oktaInput) addUserMetadata(ctx context.Context, u okta.User, state *sta } } if slices.Contains(p.cfg.EnrichWith, "roles") { - roles, _, err := okta.GetUserRoles(ctx, p.client, p.cfg.OktaDomain, p.cfg.OktaToken, u.ID, p.lim, p.cfg.LimitWindow, p.logger) + roles, _, err := okta.GetUserRoles(ctx, p.client, p.cfg.OktaDomain, p.cfg.OktaToken, u.ID, p.lim, p.logger) if err != nil { p.logger.Warnf("failed to get user roles for %s: %v", u.ID, err) } else { @@ -580,7 +580,7 @@ func (p *oktaInput) doFetchDevices(ctx context.Context, state *stateStore, fullS lastUpdated time.Time ) for { - batch, h, err := okta.GetDeviceDetails(ctx, p.client, p.cfg.OktaDomain, p.cfg.OktaToken, "", deviceQuery, p.lim, p.cfg.LimitWindow, p.logger) + batch, h, err := okta.GetDeviceDetails(ctx, p.client, p.cfg.OktaDomain, p.cfg.OktaToken, "", deviceQuery, p.lim, p.logger) if err != nil { p.logger.Debugf("received %d devices from API", len(devices)) return nil, err @@ -599,7 +599,7 @@ func (p *oktaInput) doFetchDevices(ctx context.Context, state *stateStore, fullS const omit = okta.OmitCredentials | okta.OmitCredentialsLinks | okta.OmitTransitioningToStatus - users, h, err := okta.GetDeviceUsers(ctx, p.client, p.cfg.OktaDomain, p.cfg.OktaToken, d.ID, userQuery, omit, p.lim, p.cfg.LimitWindow, p.logger) + users, h, err := okta.GetDeviceUsers(ctx, p.client, p.cfg.OktaDomain, p.cfg.OktaToken, d.ID, userQuery, omit, p.lim, p.logger) if err != nil { p.logger.Debugf("received %d device users from API", len(users)) return nil, err diff --git a/x-pack/filebeat/input/entityanalytics/provider/okta/okta_test.go b/x-pack/filebeat/input/entityanalytics/provider/okta/okta_test.go index e7e2bffbba2..ea2a710d8c5 100644 --- a/x-pack/filebeat/input/entityanalytics/provider/okta/okta_test.go +++ b/x-pack/filebeat/input/entityanalytics/provider/okta/okta_test.go @@ -176,7 +176,7 @@ func TestOktaDoFetch(t *testing.T) { if err != nil { t.Errorf("failed to parse server URL: %v", err) } - rateLimiter := okta.NewRateLimiter() + rateLimiter := okta.NewRateLimiter(window, nil) a := oktaInput{ cfg: conf{ OktaDomain: u.Host,