From cbbe461b4dd4e0a791f5f8f4551fb37ea5e70c64 Mon Sep 17 00:00:00 2001 From: Chris Berkhout Date: Mon, 9 Dec 2024 11:05:46 +0100 Subject: [PATCH 01/12] Set a deadline for Wait. --- .../okta/internal/okta/ratelimiter.go | 6 +++- .../okta/internal/okta/ratelimiter_test.go | 31 +++++++++++++++++++ 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/ratelimiter.go b/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/ratelimiter.go index 1b58e01328c..8a9c64a2d1b 100644 --- a/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/ratelimiter.go +++ b/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/ratelimiter.go @@ -18,6 +18,8 @@ import ( type RateLimiter map[string]*rate.Limiter +const waitDeadline = 30 * time.Minute + func NewRateLimiter() RateLimiter { r := make(RateLimiter) return r @@ -35,7 +37,9 @@ func (r RateLimiter) limiter(path string) *rate.Limiter { func (r RateLimiter) Wait(ctx context.Context, endpoint string, url *url.URL, log *logp.Logger) (err error) { limiter := r.limiter(endpoint) log.Debugw("rate limit", "limit", limiter.Limit(), "burst", limiter.Burst(), "url", url.String()) - return limiter.Wait(ctx) + ctxWithDeadline, cancel := context.WithDeadline(ctx, time.Now().Add(waitDeadline)) + defer cancel() + return limiter.Wait(ctxWithDeadline) } // Update implements the Okta rate limit policy translation. diff --git a/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/ratelimiter_test.go b/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/ratelimiter_test.go index 1492e55c8a6..05cd9b39128 100644 --- a/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/ratelimiter_test.go +++ b/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/ratelimiter_test.go @@ -5,7 +5,9 @@ package okta import ( + "context" "net/http" + "net/url" "strconv" "testing" "time" @@ -81,4 +83,33 @@ func TestRateLimiter(t *testing.T) { } }) + + t.Run("Very long waits are considered errors", func(t *testing.T) { + r := NewRateLimiter() + + const endpoint = "/foo" + + url, _ := url.Parse(endpoint) + reset := time.Now().Add(31 * time.Minute).Unix() + headers := http.Header{ + "X-Rate-Limit-Limit": []string{"60"}, + "X-Rate-Limit-Remaining": []string{"1"}, + "X-Rate-Limit-Reset": []string{strconv.FormatInt(reset, 10)}, + } + window := time.Minute + log := logp.L() + ctx := context.Background() + + r.Wait(ctx, endpoint, url, log) // consume the initial request + r.Update(endpoint, headers, window, log) // update to a slow rate + + err := r.Wait(ctx, endpoint, url, log) + + const expectedErr = "rate: Wait(n=1) would exceed context deadline" + if err == nil { + t.Errorf("expected error message %q, but got no error", expectedErr) + } else if err.Error() != expectedErr { + t.Errorf("expected error message %q, but got %q", expectedErr, err.Error()) + } + }) } From b80729ab28fe08fb714861127c24a1cebf77cffc Mon Sep 17 00:00:00 2001 From: Chris Berkhout Date: Mon, 9 Dec 2024 16:38:22 +0100 Subject: [PATCH 02/12] RateLimiter handles blocking when there are no remaining requests, and reactivating later. --- .../provider/okta/internal/okta/okta_test.go | 6 +- .../okta/internal/okta/ratelimiter.go | 57 ++++++++++----- .../okta/internal/okta/ratelimiter_test.go | 72 +++++++++++-------- 3 files changed, 88 insertions(+), 47 deletions(-) diff --git a/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/okta_test.go b/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/okta_test.go index 45b1b2a4ca4..82ee42e069c 100644 --- a/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/okta_test.go +++ b/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/okta_test.go @@ -381,9 +381,9 @@ func TestLocal(t *testing.T) { // retrieve the rate.Limiter parameters for the one endpoint var limit rate.Limit var burst int - for _, l := range limiter { - limit = l.Limit() - burst = l.Burst() + for _, e := range limiter { + limit = e.limiter.Limit() + burst = e.limiter.Burst() break } diff --git a/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/ratelimiter.go b/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/ratelimiter.go index 8a9c64a2d1b..25a25e0982a 100644 --- a/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/ratelimiter.go +++ b/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/ratelimiter.go @@ -16,7 +16,12 @@ import ( "github.com/elastic/elastic-agent-libs/logp" ) -type RateLimiter map[string]*rate.Limiter +type RateLimiter map[string]endpointRateLimiter + +type endpointRateLimiter struct { + limiter *rate.Limiter + ready chan struct{} +} const waitDeadline = 30 * time.Minute @@ -25,28 +30,35 @@ func NewRateLimiter() RateLimiter { return r } -func (r RateLimiter) limiter(path string) *rate.Limiter { +func (r RateLimiter) endpoint(path string) endpointRateLimiter { if existing, ok := r[path]; ok { return existing } - initial := rate.NewLimiter(1, 1) // Allow a single fetch operation to obtain limits from the API - r[path] = initial - return initial + limiter := rate.NewLimiter(1, 1) // Allow a single fetch operation to obtain limits from the API + ready := make(chan struct{}) + close(ready) + newEndpointRateLimiter := endpointRateLimiter{ + limiter: limiter, + ready: ready, + } + r[path] = newEndpointRateLimiter + return r[path] } func (r RateLimiter) Wait(ctx context.Context, endpoint string, url *url.URL, log *logp.Logger) (err error) { - limiter := r.limiter(endpoint) - log.Debugw("rate limit", "limit", limiter.Limit(), "burst", limiter.Burst(), "url", url.String()) + e := r.endpoint(endpoint) + <-e.ready + log.Debugw("rate limit", "limit", e.limiter.Limit(), "burst", e.limiter.Burst(), "url", url.String()) ctxWithDeadline, cancel := context.WithDeadline(ctx, time.Now().Add(waitDeadline)) defer cancel() - return limiter.Wait(ctxWithDeadline) + return e.limiter.Wait(ctxWithDeadline) } // Update implements the Okta rate limit policy translation. // // See https://developer.okta.com/docs/reference/rl-best-practices/ for details. func (r RateLimiter) Update(endpoint string, h http.Header, window time.Duration, log *logp.Logger) error { - limiter := r.limiter(endpoint) + e := r.endpoint(endpoint) limit := h.Get("X-Rate-Limit-Limit") remaining := h.Get("X-Rate-Limit-Remaining") reset := h.Get("X-Rate-Limit-Reset") @@ -82,20 +94,33 @@ func (r RateLimiter) Update(endpoint string, h http.Header, window time.Duration if rateLimit <= 0 { // Reset limiter to block requests until reset limiter := rate.NewLimiter(0, 0) - r[endpoint] = limiter + ready := make(chan struct{}) + newEndpointRateLimiter := endpointRateLimiter{ + limiter: limiter, + ready: ready, + } + r[endpoint] = newEndpointRateLimiter + + resetTimeUTC := resetTime.UTC() + log.Debugw("rate limit block until reset", "reset_time", resetTimeUTC) // next gives us a sane next window estimate, but the // estimate will be overwritten when we make the next // permissible API request. next := rate.Limit(lim / window.Seconds()) - waitUntil := resetTime.UTC() - limiter.SetLimitAt(waitUntil, next) - limiter.SetBurstAt(waitUntil, burst) - log.Debugw("rate limit reset", "reset_time", waitUntil, "next_rate", next, "next_burst", burst) + waitFor := time.Until(resetTimeUTC) + + time.AfterFunc(waitFor, func() { + limiter.SetLimit(next) + limiter.SetBurst(burst) + close(ready) + log.Debugw("rate limit reset", "reset_time", resetTimeUTC, "reset_rate", next, "reset_burst", burst) + }) + return nil } - limiter.SetLimit(rateLimit) - limiter.SetBurst(burst) + e.limiter.SetLimit(rateLimit) + e.limiter.SetBurst(burst) log.Debugw("rate limit adjust", "set_rate", rateLimit, "set_burst", burst) return nil } diff --git a/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/ratelimiter_test.go b/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/ratelimiter_test.go index 05cd9b39128..78b76451f59 100644 --- a/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/ratelimiter_test.go +++ b/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/ratelimiter_test.go @@ -20,68 +20,84 @@ func TestRateLimiter(t *testing.T) { t.Run("separation by endpoint", func(t *testing.T) { r := NewRateLimiter() - limiter1 := r.limiter("/foo") - limiter2 := r.limiter("/bar") + e1 := r.endpoint("/foo") + e2 := r.endpoint("/bar") - limiter1.SetBurst(1000) + e1.limiter.SetBurst(1000) - if limiter2.Burst() == 1000 { + if e2.limiter.Burst() == 1000 { t.Errorf("changes to one endpoint's limits affected another") } }) t.Run("Update stops requests when none are remaining", func(t *testing.T) { r := NewRateLimiter() - const endpoint = "/foo" - limiter := r.limiter(endpoint) + const window = time.Minute + url, _ := url.Parse(endpoint) + ctx := context.Background() + log := logp.L() + e := r.endpoint(endpoint) - if !limiter.Allow() { + if !e.limiter.Allow() { t.Errorf("doesn't allow an initial request") } + // update to none remaining, reset soon now := time.Now().Unix() - reset := now + 30 - + resetSoon := now + 30 headers := http.Header{ "X-Rate-Limit-Limit": []string{"60"}, "X-Rate-Limit-Remaining": []string{"0"}, - "X-Rate-Limit-Reset": []string{strconv.FormatInt(reset, 10)}, + "X-Rate-Limit-Reset": []string{strconv.FormatInt(resetSoon, 10)}, } - window := time.Minute - err := r.Update(endpoint, headers, window, logp.L()) if err != nil { t.Errorf("unexpected error from Update(): %v", err) } - limiter = r.limiter(endpoint) + e = r.endpoint(endpoint) - if limiter.Allow() { + if e.limiter.Allow() { t.Errorf("allowed a request when none are remaining") } - - if limiter.AllowN(time.Unix(reset-1, 999999999), 1) { + if e.limiter.AllowN(time.Unix(resetSoon-1, 999999999), 1) { t.Errorf("allowed a request before reset, when none are remaining") } - if !limiter.AllowN(time.Unix(reset+1, 0), 1) { - t.Errorf("doesn't allow requests to resume after reset") + // update to none remaining, reset now + headers = http.Header{ + "X-Rate-Limit-Limit": []string{"60"}, + "X-Rate-Limit-Remaining": []string{"0"}, + "X-Rate-Limit-Reset": []string{strconv.FormatInt(now, 10)}, } - - if limiter.Limit() != 1.0 { - t.Errorf("unexpected rate following reset (not 60 requests / 60 seconds): %f", limiter.Limit()) + err = r.Update(endpoint, headers, window, logp.L()) + if err != nil { + t.Errorf("unexpected error from Update(): %v", err) } + e = r.endpoint(endpoint) - if limiter.Burst() != 1 { - t.Errorf("unexpected burst following reset (not 1): %d", limiter.Burst()) - } + start := time.Now() + r.Wait(ctx, endpoint, url, log) + wait := time.Since(start) - limiter.SetBurstAt(time.Unix(reset, 0), 100) // increase bucket size to check token accumulation - tokens := limiter.TokensAt(time.Unix(reset+30, 0)) - if tokens < 29.5 || tokens > 30.0 { - t.Errorf("tokens don't accumulate at the expected rate. tokens 30s after reset: %f", tokens) + if wait > 1010*time.Millisecond { + t.Errorf("doesn't allow requests to resume after reset. had to wait %d milliseconds", wait.Milliseconds()) + } + if e.limiter.Limit() != 1.0 { + t.Errorf("unexpected rate following reset (not 60 requests / 60 seconds): %f", e.limiter.Limit()) + } + if e.limiter.Burst() != 1 { + t.Errorf("unexpected burst following reset (not 1): %d", e.limiter.Burst()) } + e.limiter.SetBurst(100) // increase bucket size to check token accumulation + tokens := e.limiter.TokensAt(time.Unix(0, time.Now().Add(30*time.Second).UnixNano())) + target := 30.0 + buffer := 0.01 + + if tokens < target-buffer || tokens > target+buffer { + t.Errorf("tokens don't accumulate at the expected rate over 30s: %f", tokens) + } }) t.Run("Very long waits are considered errors", func(t *testing.T) { From 3ba9e5bf1cc91c8b026783fe9bc6ed71f1da648f Mon Sep 17 00:00:00 2001 From: Chris Berkhout Date: Tue, 10 Dec 2024 20:24:11 +0100 Subject: [PATCH 03/12] Add a configuration option to override with a constant rate. --- .../entityanalytics/provider/okta/conf.go | 5 ++ .../provider/okta/internal/okta/okta.go | 36 ++++++------- .../provider/okta/internal/okta/okta_test.go | 50 +++++++++---------- .../okta/internal/okta/ratelimiter.go | 39 +++++++++++---- .../okta/internal/okta/ratelimiter_test.go | 50 +++++++++++++++---- .../entityanalytics/provider/okta/okta.go | 16 +++--- .../provider/okta/okta_test.go | 2 +- 7 files changed, 126 insertions(+), 72 deletions(-) diff --git a/x-pack/filebeat/input/entityanalytics/provider/okta/conf.go b/x-pack/filebeat/input/entityanalytics/provider/okta/conf.go index 41a3895a70d..783b40a6b2d 100644 --- a/x-pack/filebeat/input/entityanalytics/provider/okta/conf.go +++ b/x-pack/filebeat/input/entityanalytics/provider/okta/conf.go @@ -27,6 +27,7 @@ func defaultConfig() conf { SyncInterval: 24 * time.Hour, UpdateInterval: 15 * time.Minute, LimitWindow: time.Minute, + LimitFixed: nil, Request: &requestConfig{ Retry: retryConfig{ MaxAttempts: &maxAttempts, @@ -68,6 +69,10 @@ type conf struct { // API limit resets. LimitWindow time.Duration `config:"limit_window"` + // LimitFixed is a number of requests to allow in each LimitWindow, + // overriding the guidance in API responses. + LimitFixed *int `config:"limit_fixed"` + // Request is the configuration for establishing // HTTP requests to the API. Request *requestConfig `config:"request"` diff --git a/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/okta.go b/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/okta.go index 42ffc060178..b2bacba89a8 100644 --- a/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/okta.go +++ b/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/okta.go @@ -185,13 +185,13 @@ func (o Response) String() string { // Parts of the response may be omitted using the omit parameter. // // The provided rate limiter must allow at least request and will be updated with the -// response's X-Rate-Limit headers considering the rate limit window time. Details -// for rate limits are available at https://help.okta.com/en-us/Content/Topics/Security/API-rate-limits.htm +// response's X-Rate-Limit headers. Details for rate limits are available at +// https://help.okta.com/en-us/Content/Topics/Security/API-rate-limits.htm // and account rate limits and windows can be seen on the Okta admin dashboard at // https://${yourOktaDomain}/reports/rate-limit. // // See https://developer.okta.com/docs/reference/api/users/#list-users for details. -func GetUserDetails(ctx context.Context, cli *http.Client, host, key, user string, query url.Values, omit Response, lim RateLimiter, window time.Duration, log *logp.Logger) ([]User, http.Header, error) { +func GetUserDetails(ctx context.Context, cli *http.Client, host, key, user string, query url.Values, omit Response, lim *RateLimiter, log *logp.Logger) ([]User, http.Header, error) { var endpoint, path string if user == "" { endpoint = "/api/v1/users" @@ -207,7 +207,7 @@ func GetUserDetails(ctx context.Context, cli *http.Client, host, key, user strin Path: path, RawQuery: query.Encode(), } - return getDetails[User](ctx, cli, u, endpoint, key, user == "", omit, lim, window, log) + return getDetails[User](ctx, cli, u, endpoint, key, user == "", omit, lim, log) } // GetUserFactors returns Okta group roles using the groups API endpoint. host is the @@ -216,7 +216,7 @@ func GetUserDetails(ctx context.Context, cli *http.Client, host, key, user strin // See GetUserDetails for details of the query and rate limit parameters. // // See https://developer.okta.com/docs/api/openapi/okta-management/management/tag/UserFactor/#tag/UserFactor/operation/listFactors. -func GetUserFactors(ctx context.Context, cli *http.Client, host, key, user string, lim RateLimiter, window time.Duration, log *logp.Logger) ([]Factor, http.Header, error) { +func GetUserFactors(ctx context.Context, cli *http.Client, host, key, user string, lim *RateLimiter, log *logp.Logger) ([]Factor, http.Header, error) { if user == "" { return nil, nil, errors.New("no user specified") } @@ -229,7 +229,7 @@ func GetUserFactors(ctx context.Context, cli *http.Client, host, key, user strin Host: host, Path: path, } - return getDetails[Factor](ctx, cli, u, endpoint, key, true, OmitNone, lim, window, log) + return getDetails[Factor](ctx, cli, u, endpoint, key, true, OmitNone, lim, log) } // GetUserRoles returns Okta group roles using the groups API endpoint. host is the @@ -238,7 +238,7 @@ func GetUserFactors(ctx context.Context, cli *http.Client, host, key, user strin // See GetUserDetails for details of the query and rate limit parameters. // // See https://developer.okta.com/docs/api/openapi/okta-management/management/tag/RoleAssignmentBGroup/#tag/RoleAssignmentBGroup/operation/listGroupAssignedRoles. -func GetUserRoles(ctx context.Context, cli *http.Client, host, key, user string, lim RateLimiter, window time.Duration, log *logp.Logger) ([]Role, http.Header, error) { +func GetUserRoles(ctx context.Context, cli *http.Client, host, key, user string, lim *RateLimiter, log *logp.Logger) ([]Role, http.Header, error) { if user == "" { return nil, nil, errors.New("no user specified") } @@ -251,7 +251,7 @@ func GetUserRoles(ctx context.Context, cli *http.Client, host, key, user string, Host: host, Path: path, } - return getDetails[Role](ctx, cli, u, endpoint, key, true, OmitNone, lim, window, log) + return getDetails[Role](ctx, cli, u, endpoint, key, true, OmitNone, lim, log) } // GetUserGroupDetails returns Okta group details using the users API endpoint. host is the @@ -260,7 +260,7 @@ func GetUserRoles(ctx context.Context, cli *http.Client, host, key, user string, // See GetUserDetails for details of the query and rate limit parameters. // // See https://developer.okta.com/docs/reference/api/users/#request-parameters-8 (no anchor exists on the page for this endpoint) for details. -func GetUserGroupDetails(ctx context.Context, cli *http.Client, host, key, user string, lim RateLimiter, window time.Duration, log *logp.Logger) ([]Group, http.Header, error) { +func GetUserGroupDetails(ctx context.Context, cli *http.Client, host, key, user string, lim *RateLimiter, log *logp.Logger) ([]Group, http.Header, error) { if user == "" { return nil, nil, errors.New("no user specified") } @@ -273,7 +273,7 @@ func GetUserGroupDetails(ctx context.Context, cli *http.Client, host, key, user Host: host, Path: path, } - return getDetails[Group](ctx, cli, u, endpoint, key, true, OmitNone, lim, window, log) + return getDetails[Group](ctx, cli, u, endpoint, key, true, OmitNone, lim, log) } // GetGroupRoles returns Okta group roles using the groups API endpoint. host is the @@ -282,7 +282,7 @@ func GetUserGroupDetails(ctx context.Context, cli *http.Client, host, key, user // See GetUserDetails for details of the query and rate limit parameters. // // See https://developer.okta.com/docs/api/openapi/okta-management/management/tag/RoleAssignmentBGroup/#tag/RoleAssignmentBGroup/operation/listGroupAssignedRoles. -func GetGroupRoles(ctx context.Context, cli *http.Client, host, key, group string, lim RateLimiter, window time.Duration, log *logp.Logger) ([]Role, http.Header, error) { +func GetGroupRoles(ctx context.Context, cli *http.Client, host, key, group string, lim *RateLimiter, log *logp.Logger) ([]Role, http.Header, error) { if group == "" { return nil, nil, errors.New("no group specified") } @@ -295,7 +295,7 @@ func GetGroupRoles(ctx context.Context, cli *http.Client, host, key, group strin Host: host, Path: path, } - return getDetails[Role](ctx, cli, u, endpoint, key, true, OmitNone, lim, window, log) + return getDetails[Role](ctx, cli, u, endpoint, key, true, OmitNone, lim, log) } // GetDeviceDetails returns Okta device details using the list devices API endpoint. host is the @@ -305,7 +305,7 @@ func GetGroupRoles(ctx context.Context, cli *http.Client, host, key, group strin // See GetUserDetails for details of the query and rate limit parameters. // // See https://developer.okta.com/docs/api/openapi/okta-management/management/tag/Device/#tag/Device/operation/listDevices for details. -func GetDeviceDetails(ctx context.Context, cli *http.Client, host, key, device string, query url.Values, lim RateLimiter, window time.Duration, log *logp.Logger) ([]Device, http.Header, error) { +func GetDeviceDetails(ctx context.Context, cli *http.Client, host, key, device string, query url.Values, lim *RateLimiter, log *logp.Logger) ([]Device, http.Header, error) { var endpoint string var path string if device == "" { @@ -322,7 +322,7 @@ func GetDeviceDetails(ctx context.Context, cli *http.Client, host, key, device s Path: path, RawQuery: query.Encode(), } - return getDetails[Device](ctx, cli, u, endpoint, key, device == "", OmitNone, lim, window, log) + return getDetails[Device](ctx, cli, u, endpoint, key, device == "", OmitNone, lim, log) } // GetDeviceUsers returns Okta user details for users associated with the provided device identifier @@ -332,7 +332,7 @@ func GetDeviceDetails(ctx context.Context, cli *http.Client, host, key, device s // See GetUserDetails for details of the query and rate limit parameters. // // See https://developer.okta.com/docs/api/openapi/okta-management/management/tag/Device/#tag/Device/operation/listDeviceUsers for details. -func GetDeviceUsers(ctx context.Context, cli *http.Client, host, key, device string, query url.Values, omit Response, lim RateLimiter, window time.Duration, log *logp.Logger) ([]User, http.Header, error) { +func GetDeviceUsers(ctx context.Context, cli *http.Client, host, key, device string, query url.Values, omit Response, lim *RateLimiter, log *logp.Logger) ([]User, http.Header, error) { if device == "" { // No user associated with a null device. Not an error. return nil, nil, nil @@ -347,7 +347,7 @@ func GetDeviceUsers(ctx context.Context, cli *http.Client, host, key, device str Path: path, RawQuery: query.Encode(), } - du, h, err := getDetails[devUser](ctx, cli, u, endpoint, key, true, omit, lim, window, log) + du, h, err := getDetails[devUser](ctx, cli, u, endpoint, key, true, omit, lim, log) if err != nil { return nil, h, err } @@ -372,7 +372,7 @@ type devUser struct { // for the specific user are returned, otherwise a list of all users is returned. // // See GetUserDetails for details of the query and rate limit parameters. -func getDetails[E entity](ctx context.Context, cli *http.Client, u *url.URL, endpoint string, key string, all bool, omit Response, lim RateLimiter, window time.Duration, log *logp.Logger) ([]E, http.Header, error) { +func getDetails[E entity](ctx context.Context, cli *http.Client, u *url.URL, endpoint string, key string, all bool, omit Response, lim *RateLimiter, log *logp.Logger) ([]E, http.Header, error) { url := u.String() req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) if err != nil { @@ -395,7 +395,7 @@ func getDetails[E entity](ctx context.Context, cli *http.Client, u *url.URL, end return nil, nil, err } defer resp.Body.Close() - err = lim.Update(endpoint, resp.Header, window, log) + err = lim.Update(endpoint, resp.Header, log) if err != nil { io.Copy(io.Discard, resp.Body) return nil, nil, err diff --git a/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/okta_test.go b/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/okta_test.go index 82ee42e069c..d362d72f391 100644 --- a/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/okta_test.go +++ b/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/okta_test.go @@ -44,14 +44,14 @@ func Test(t *testing.T) { t.Skip("okta tests require ${OKTA_TOKEN} to be set") } - // Make a global limiter - limiter := NewRateLimiter() - // There are a variety of windows, the most conservative is one minute. // The rate limit will be adjusted on the second call to the API if // window is actually used to rate limit calculations. const window = time.Minute + // Make a global limiter + limiter := NewRateLimiter(window, nil) + for _, omit := range []Response{ OmitNone, OmitCredentials, @@ -65,7 +65,7 @@ func Test(t *testing.T) { t.Run("me", func(t *testing.T) { query := make(url.Values) query.Set("limit", "200") - users, _, err := GetUserDetails(context.Background(), http.DefaultClient, host, key, "me", query, omit, limiter, window, logger) + users, _, err := GetUserDetails(context.Background(), http.DefaultClient, host, key, "me", query, omit, limiter, logger) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -94,7 +94,7 @@ func Test(t *testing.T) { t.Run("my_groups", func(t *testing.T) { query := make(url.Values) query.Set("limit", "200") - groups, _, err := GetUserGroupDetails(context.Background(), http.DefaultClient, host, key, me.ID, limiter, window, logger) + groups, _, err := GetUserGroupDetails(context.Background(), http.DefaultClient, host, key, me.ID, limiter, logger) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -119,7 +119,7 @@ func Test(t *testing.T) { t.Run("my_roles", func(t *testing.T) { query := make(url.Values) query.Set("limit", "200") - roles, _, err := GetUserRoles(context.Background(), http.DefaultClient, host, key, me.ID, limiter, window, logger) + roles, _, err := GetUserRoles(context.Background(), http.DefaultClient, host, key, me.ID, limiter, logger) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -144,7 +144,7 @@ func Test(t *testing.T) { t.Run("my_factors", func(t *testing.T) { query := make(url.Values) query.Set("limit", "200") - factors, _, err := GetUserFactors(context.Background(), http.DefaultClient, host, key, me.ID, limiter, window, logger) + factors, _, err := GetUserFactors(context.Background(), http.DefaultClient, host, key, me.ID, limiter, logger) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -175,7 +175,7 @@ func Test(t *testing.T) { query := make(url.Values) query.Set("limit", "200") - users, _, err := GetUserDetails(context.Background(), http.DefaultClient, host, key, login, query, omit, limiter, window, logger) + users, _, err := GetUserDetails(context.Background(), http.DefaultClient, host, key, login, query, omit, limiter, logger) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -190,7 +190,7 @@ func Test(t *testing.T) { t.Run("all", func(t *testing.T) { query := make(url.Values) query.Set("limit", "200") - users, _, err := GetUserDetails(context.Background(), http.DefaultClient, host, key, "", query, omit, limiter, window, logger) + users, _, err := GetUserDetails(context.Background(), http.DefaultClient, host, key, "", query, omit, limiter, logger) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -218,7 +218,7 @@ func Test(t *testing.T) { query := make(url.Values) query.Set("limit", "200") query.Add("search", `not (status pr)`) // This cannot ever be true. - _, _, err := GetUserDetails(context.Background(), http.DefaultClient, host, key, "", query, omit, limiter, window, logger) + _, _, err := GetUserDetails(context.Background(), http.DefaultClient, host, key, "", query, omit, limiter, logger) oktaErr := &Error{} if !errors.As(err, &oktaErr) { // Don't test the value of the error since it was @@ -234,7 +234,7 @@ func Test(t *testing.T) { t.Run("device", func(t *testing.T) { query := make(url.Values) query.Set("limit", "200") - devices, _, err := GetDeviceDetails(context.Background(), http.DefaultClient, host, key, "", query, limiter, window, logger) + devices, _, err := GetDeviceDetails(context.Background(), http.DefaultClient, host, key, "", query, limiter, logger) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -246,7 +246,7 @@ func Test(t *testing.T) { t.Logf("devices: %s", b) } for _, d := range devices { - users, _, err := GetDeviceUsers(context.Background(), http.DefaultClient, host, key, d.ID, query, OmitCredentials, limiter, window, logger) + users, _, err := GetDeviceUsers(context.Background(), http.DefaultClient, host, key, d.ID, query, OmitCredentials, limiter, logger) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -263,15 +263,15 @@ var localTests = []struct { name string msg string id string - fn func(ctx context.Context, cli *http.Client, host, key, user string, query url.Values, lim RateLimiter, window time.Duration, log *logp.Logger) (any, http.Header, error) + fn func(ctx context.Context, cli *http.Client, host, key, user string, query url.Values, lim *RateLimiter, log *logp.Logger) (any, http.Header, error) mkWant func(string) (any, error) }{ { // Test case constructed from API-returned value with details anonymised. name: "users", msg: `[{"id":"userid","status":"STATUS","created":"2023-05-14T13:37:20.000Z","activated":null,"statusChanged":"2023-05-15T01:50:30.000Z","lastLogin":"2023-05-15T01:59:20.000Z","lastUpdated":"2023-05-15T01:50:32.000Z","passwordChanged":"2023-05-15T01:50:32.000Z","recovery_question":{"question":"Who's a major player in the cowboy scene?","answer":"Annie Oakley"},"type":{"id":"typeid"},"profile":{"firstName":"name","lastName":"surname","mobilePhone":null,"secondEmail":null,"login":"name.surname@example.com","email":"name.surname@example.com"},"credentials":{"password":{"value":"secret"},"emails":[{"value":"name.surname@example.com","status":"VERIFIED","type":"PRIMARY"}],"provider":{"type":"OKTA","name":"OKTA"}},"_links":{"self":{"href":"https://localhost/api/v1/users/userid"}}}]`, - fn: func(ctx context.Context, cli *http.Client, host, key, user string, query url.Values, lim RateLimiter, window time.Duration, log *logp.Logger) (any, http.Header, error) { - return GetUserDetails(context.Background(), cli, host, key, user, query, OmitNone, lim, window, log) + fn: func(ctx context.Context, cli *http.Client, host, key, user string, query url.Values, lim *RateLimiter, log *logp.Logger) (any, http.Header, error) { + return GetUserDetails(context.Background(), cli, host, key, user, query, OmitNone, lim, log) }, mkWant: mkWant[User], }, @@ -279,8 +279,8 @@ var localTests = []struct { // Test case from https://developer.okta.com/docs/api/openapi/okta-management/management/tag/Device/#tag/Device/operation/listDevices name: "devices", msg: `[{"id":"devid","status":"CREATED","created":"2019-10-02T18:03:07.000Z","lastUpdated":"2019-10-02T18:03:07.000Z","profile":{"displayName":"Example Device name 1","platform":"WINDOWS","serialNumber":"XXDDRFCFRGF3M8MD6D","sid":"S-1-11-111","registered":true,"secureHardwarePresent":false,"diskEncryptionType":"ALL_INTERNAL_VOLUMES"},"resourceType":"UDDevice","resourceDisplayName":{"value":"Example Device name 1","sensitive":false},"resourceAlternateId":null,"resourceId":"guo4a5u7YAHhjXrMK0g4","_links":{"activate":{"href":"https://{yourOktaDomain}/api/v1/devices/guo4a5u7YAHhjXrMK0g4/lifecycle/activate","hints":{"allow":["POST"]}},"self":{"href":"https://{yourOktaDomain}/api/v1/devices/guo4a5u7YAHhjXrMK0g4","hints":{"allow":["GET","PATCH","PUT"]}},"users":{"href":"https://{yourOktaDomain}/api/v1/devices/guo4a5u7YAHhjXrMK0g4/users","hints":{"allow":["GET"]}}}},{"id":"guo4a5u7YAHhjXrMK0g5","status":"ACTIVE","created":"2023-06-21T23:24:02.000Z","lastUpdated":"2023-06-21T23:24:02.000Z","profile":{"displayName":"Example Device name 2","platform":"ANDROID","manufacturer":"Google","model":"Pixel 6","osVersion":"13:2023-05-05","registered":true,"secureHardwarePresent":true,"diskEncryptionType":"USER"},"resourceType":"UDDevice","resourceDisplayName":{"value":"Example Device name 2","sensitive":false},"resourceAlternateId":null,"resourceId":"guo4a5u7YAHhjXrMK0g5","_links":{"activate":{"href":"https://{yourOktaDomain}/api/v1/devices/guo4a5u7YAHhjXrMK0g5/lifecycle/activate","hints":{"allow":["POST"]}},"self":{"href":"https://{yourOktaDomain}/api/v1/devices/guo4a5u7YAHhjXrMK0g5","hints":{"allow":["GET","PATCH","PUT"]}},"users":{"href":"https://{yourOktaDomain}/api/v1/devices/guo4a5u7YAHhjXrMK0g5/users","hints":{"allow":["GET"]}}}}]`, - fn: func(ctx context.Context, cli *http.Client, host, key, device string, query url.Values, lim RateLimiter, window time.Duration, log *logp.Logger) (any, http.Header, error) { - return GetDeviceDetails(context.Background(), cli, host, key, device, query, lim, window, log) + fn: func(ctx context.Context, cli *http.Client, host, key, device string, query url.Values, lim *RateLimiter, log *logp.Logger) (any, http.Header, error) { + return GetDeviceDetails(context.Background(), cli, host, key, device, query, lim, log) }, mkWant: mkWant[Device], }, @@ -289,8 +289,8 @@ var localTests = []struct { name: "devices_users", msg: `[{"created":"2023-08-07T21:48:27.000Z","managementStatus":"NOT_MANAGED","user":{"id":"userid","status":"STATUS","created":"2023-05-14T13:37:20.000Z","activated":null,"statusChanged":"2023-05-15T01:50:30.000Z","lastLogin":"2023-05-15T01:59:20.000Z","lastUpdated":"2023-05-15T01:50:32.000Z","passwordChanged":"2023-05-15T01:50:32.000Z","type":{"id":"typeid"},"profile":{"firstName":"name","lastName":"surname","mobilePhone":null,"secondEmail":null,"login":"name.surname@example.com","email":"name.surname@example.com"},"credentials":{"password":{"value":"secret"},"recovery_question":{"question":"Who's a major player in the cowboy scene?","answer":"Annie Oakley"},"emails":[{"value":"name.surname@example.com","status":"VERIFIED","type":"PRIMARY"}],"provider":{"type":"OKTA","name":"OKTA"}},"_links":{"self":{"href":"https://localhost/api/v1/users/userid"}}}}]`, id: "devid", - fn: func(ctx context.Context, cli *http.Client, host, key, device string, query url.Values, lim RateLimiter, window time.Duration, log *logp.Logger) (any, http.Header, error) { - return GetDeviceUsers(context.Background(), cli, host, key, device, query, OmitNone, lim, window, log) + fn: func(ctx context.Context, cli *http.Client, host, key, device string, query url.Values, lim *RateLimiter, log *logp.Logger) (any, http.Header, error) { + return GetDeviceUsers(context.Background(), cli, host, key, device, query, OmitNone, lim, log) }, mkWant: mkWant[devUser], }, @@ -315,12 +315,12 @@ func TestLocal(t *testing.T) { for _, test := range localTests { t.Run(test.name, func(t *testing.T) { - limiter := NewRateLimiter() - // There are a variety of windows, the most conservative is one minute. // The rate limit will be adjusted on the second call to the API if // window is actually used to rate limit calculations. const window = time.Minute + var fixedLimit *int = nil + limiter := NewRateLimiter(window, fixedLimit) const key = "token" want, err := test.mkWant(test.msg) @@ -366,7 +366,7 @@ func TestLocal(t *testing.T) { query := make(url.Values) query.Set("limit", "200") - got, h, err := test.fn(context.Background(), ts.Client(), host, key, test.id, query, limiter, window, logger) + got, h, err := test.fn(context.Background(), ts.Client(), host, key, test.id, query, limiter, logger) if err != nil { t.Fatalf("unexpected error from Get_Details: %v", err) } @@ -375,13 +375,13 @@ func TestLocal(t *testing.T) { t.Errorf("unexpected result:\n- want\n+ got\n%s", cmp.Diff(want, got)) } - if len(limiter) != 1 { - t.Errorf("unexpected number endpoints track by rate limiter: %d", len(limiter)) + if len(limiter.byEndpoint) != 1 { + t.Errorf("unexpected number endpoints track by rate limiter: %d", len(limiter.byEndpoint)) } // retrieve the rate.Limiter parameters for the one endpoint var limit rate.Limit var burst int - for _, e := range limiter { + for _, e := range limiter.byEndpoint { limit = e.limiter.Limit() burst = e.limiter.Burst() break diff --git a/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/ratelimiter.go b/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/ratelimiter.go index 25a25e0982a..5ade8681f3d 100644 --- a/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/ratelimiter.go +++ b/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/ratelimiter.go @@ -16,7 +16,11 @@ import ( "github.com/elastic/elastic-agent-libs/logp" ) -type RateLimiter map[string]endpointRateLimiter +type RateLimiter struct { + window time.Duration + fixedLimit *int + byEndpoint map[string]endpointRateLimiter +} type endpointRateLimiter struct { limiter *rate.Limiter @@ -25,24 +29,34 @@ type endpointRateLimiter struct { const waitDeadline = 30 * time.Minute -func NewRateLimiter() RateLimiter { - r := make(RateLimiter) - return r +func NewRateLimiter(window time.Duration, fixedLimit *int) *RateLimiter { + endpoints := make(map[string]endpointRateLimiter) + r := RateLimiter{ + window: window, + fixedLimit: fixedLimit, + byEndpoint: endpoints, + } + r.fixedLimit = fixedLimit + return &r } func (r RateLimiter) endpoint(path string) endpointRateLimiter { - if existing, ok := r[path]; ok { + if existing, ok := r.byEndpoint[path]; ok { return existing } - limiter := rate.NewLimiter(1, 1) // Allow a single fetch operation to obtain limits from the API + limit := rate.Limit(1) + if r.fixedLimit != nil { + limit = rate.Limit(float64(*r.fixedLimit) / r.window.Seconds()) + } + limiter := rate.NewLimiter(limit, 1) // Allow a single fetch operation to obtain limits from the API ready := make(chan struct{}) close(ready) newEndpointRateLimiter := endpointRateLimiter{ limiter: limiter, ready: ready, } - r[path] = newEndpointRateLimiter - return r[path] + r.byEndpoint[path] = newEndpointRateLimiter + return newEndpointRateLimiter } func (r RateLimiter) Wait(ctx context.Context, endpoint string, url *url.URL, log *logp.Logger) (err error) { @@ -57,7 +71,10 @@ func (r RateLimiter) Wait(ctx context.Context, endpoint string, url *url.URL, lo // Update implements the Okta rate limit policy translation. // // See https://developer.okta.com/docs/reference/rl-best-practices/ for details. -func (r RateLimiter) Update(endpoint string, h http.Header, window time.Duration, log *logp.Logger) error { +func (r RateLimiter) Update(endpoint string, h http.Header, log *logp.Logger) error { + if r.fixedLimit != nil { + return nil + } e := r.endpoint(endpoint) limit := h.Get("X-Rate-Limit-Limit") remaining := h.Get("X-Rate-Limit-Remaining") @@ -99,7 +116,7 @@ func (r RateLimiter) Update(endpoint string, h http.Header, window time.Duration limiter: limiter, ready: ready, } - r[endpoint] = newEndpointRateLimiter + r.byEndpoint[endpoint] = newEndpointRateLimiter resetTimeUTC := resetTime.UTC() log.Debugw("rate limit block until reset", "reset_time", resetTimeUTC) @@ -107,7 +124,7 @@ func (r RateLimiter) Update(endpoint string, h http.Header, window time.Duration // next gives us a sane next window estimate, but the // estimate will be overwritten when we make the next // permissible API request. - next := rate.Limit(lim / window.Seconds()) + next := rate.Limit(lim / r.window.Seconds()) waitFor := time.Until(resetTimeUTC) time.AfterFunc(waitFor, func() { diff --git a/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/ratelimiter_test.go b/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/ratelimiter_test.go index 78b76451f59..b19e9d0f3c0 100644 --- a/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/ratelimiter_test.go +++ b/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/ratelimiter_test.go @@ -19,7 +19,9 @@ func TestRateLimiter(t *testing.T) { logp.TestingSetup() t.Run("separation by endpoint", func(t *testing.T) { - r := NewRateLimiter() + const window = time.Minute + var fixedLimit *int = nil + r := NewRateLimiter(window, fixedLimit) e1 := r.endpoint("/foo") e2 := r.endpoint("/bar") @@ -31,9 +33,10 @@ func TestRateLimiter(t *testing.T) { }) t.Run("Update stops requests when none are remaining", func(t *testing.T) { - r := NewRateLimiter() - const endpoint = "/foo" const window = time.Minute + var fixedLimit *int = nil + r := NewRateLimiter(window, fixedLimit) + const endpoint = "/foo" url, _ := url.Parse(endpoint) ctx := context.Background() log := logp.L() @@ -51,7 +54,7 @@ func TestRateLimiter(t *testing.T) { "X-Rate-Limit-Remaining": []string{"0"}, "X-Rate-Limit-Reset": []string{strconv.FormatInt(resetSoon, 10)}, } - err := r.Update(endpoint, headers, window, logp.L()) + err := r.Update(endpoint, headers, logp.L()) if err != nil { t.Errorf("unexpected error from Update(): %v", err) } @@ -70,7 +73,7 @@ func TestRateLimiter(t *testing.T) { "X-Rate-Limit-Remaining": []string{"0"}, "X-Rate-Limit-Reset": []string{strconv.FormatInt(now, 10)}, } - err = r.Update(endpoint, headers, window, logp.L()) + err = r.Update(endpoint, headers, logp.L()) if err != nil { t.Errorf("unexpected error from Update(): %v", err) } @@ -101,7 +104,9 @@ func TestRateLimiter(t *testing.T) { }) t.Run("Very long waits are considered errors", func(t *testing.T) { - r := NewRateLimiter() + const window = time.Minute + var fixedLimit *int = nil + r := NewRateLimiter(window, fixedLimit) const endpoint = "/foo" @@ -112,12 +117,11 @@ func TestRateLimiter(t *testing.T) { "X-Rate-Limit-Remaining": []string{"1"}, "X-Rate-Limit-Reset": []string{strconv.FormatInt(reset, 10)}, } - window := time.Minute log := logp.L() ctx := context.Background() - r.Wait(ctx, endpoint, url, log) // consume the initial request - r.Update(endpoint, headers, window, log) // update to a slow rate + r.Wait(ctx, endpoint, url, log) // consume the initial request + r.Update(endpoint, headers, log) // update to a slow rate err := r.Wait(ctx, endpoint, url, log) @@ -128,4 +132,32 @@ func TestRateLimiter(t *testing.T) { t.Errorf("expected error message %q, but got %q", expectedErr, err.Error()) } }) + + t.Run("A fixed limit overrides response information", func(t *testing.T) { + const window = time.Minute + var fixedLimit int = 120 + r := NewRateLimiter(window, &fixedLimit) + const endpoint = "/foo" + e := r.endpoint(endpoint) + + if e.limiter.Limit() != 120/60 { + t.Errorf("unexpected rate (for fixed 120 reqs / 60 secs): %f", e.limiter.Limit()) + } + + // update to 15 requests remaining, reset in 30s + headers := http.Header{ + "X-Rate-Limit-Limit": []string{"60"}, + "X-Rate-Limit-Remaining": []string{"15"}, + "X-Rate-Limit-Reset": []string{strconv.FormatInt(time.Now().Unix()+30, 10)}, + } + err := r.Update(endpoint, headers, logp.L()) + if err != nil { + t.Errorf("unexpected error from Update(): %v", err) + } + e = r.endpoint(endpoint) + + if e.limiter.Limit() != 120/60 { + t.Errorf("unexpected rate following Update() (for fixed 120 reqs / 60 secs): %f", e.limiter.Limit()) + } + }) } diff --git a/x-pack/filebeat/input/entityanalytics/provider/okta/okta.go b/x-pack/filebeat/input/entityanalytics/provider/okta/okta.go index 30103d3ccdb..0e33e93cc9b 100644 --- a/x-pack/filebeat/input/entityanalytics/provider/okta/okta.go +++ b/x-pack/filebeat/input/entityanalytics/provider/okta/okta.go @@ -59,7 +59,7 @@ type oktaInput struct { cfg conf client *http.Client - lim okta.RateLimiter + lim *okta.RateLimiter metrics *inputMetrics logger *logp.Logger @@ -110,7 +110,7 @@ func (p *oktaInput) Run(inputCtx v2.Context, store *kvstore.Store, client beat.C updateTimer := time.NewTimer(updateWaitTime) // Allow a single fetch operation to obtain limits from the API. - p.lim = okta.NewRateLimiter() + p.lim = okta.NewRateLimiter(p.cfg.LimitWindow, p.cfg.LimitFixed) if p.cfg.Tracer != nil { id := sanitizeFileName(inputCtx.IDWithoutName) @@ -451,7 +451,7 @@ func (p *oktaInput) doFetchUsers(ctx context.Context, state *stateStore, fullSyn lastUpdated time.Time ) for { - batch, h, err := okta.GetUserDetails(ctx, p.client, p.cfg.OktaDomain, p.cfg.OktaToken, "", query, omit, p.lim, p.cfg.LimitWindow, p.logger) + batch, h, err := okta.GetUserDetails(ctx, p.client, p.cfg.OktaDomain, p.cfg.OktaToken, "", query, omit, p.lim, p.logger) if err != nil { p.logger.Debugf("received %d users from API", len(users)) return nil, err @@ -512,7 +512,7 @@ func (p *oktaInput) addUserMetadata(ctx context.Context, u okta.User, state *sta return su } if slices.Contains(p.cfg.EnrichWith, "groups") { - groups, _, err := okta.GetUserGroupDetails(ctx, p.client, p.cfg.OktaDomain, p.cfg.OktaToken, u.ID, p.lim, p.cfg.LimitWindow, p.logger) + groups, _, err := okta.GetUserGroupDetails(ctx, p.client, p.cfg.OktaDomain, p.cfg.OktaToken, u.ID, p.lim, p.logger) if err != nil { p.logger.Warnf("failed to get user group membership for %s: %v", u.ID, err) } else { @@ -520,7 +520,7 @@ func (p *oktaInput) addUserMetadata(ctx context.Context, u okta.User, state *sta } } if slices.Contains(p.cfg.EnrichWith, "factors") { - factors, _, err := okta.GetUserFactors(ctx, p.client, p.cfg.OktaDomain, p.cfg.OktaToken, u.ID, p.lim, p.cfg.LimitWindow, p.logger) + factors, _, err := okta.GetUserFactors(ctx, p.client, p.cfg.OktaDomain, p.cfg.OktaToken, u.ID, p.lim, p.logger) if err != nil { p.logger.Warnf("failed to get user factors for %s: %v", u.ID, err) } else { @@ -528,7 +528,7 @@ func (p *oktaInput) addUserMetadata(ctx context.Context, u okta.User, state *sta } } if slices.Contains(p.cfg.EnrichWith, "roles") { - roles, _, err := okta.GetUserRoles(ctx, p.client, p.cfg.OktaDomain, p.cfg.OktaToken, u.ID, p.lim, p.cfg.LimitWindow, p.logger) + roles, _, err := okta.GetUserRoles(ctx, p.client, p.cfg.OktaDomain, p.cfg.OktaToken, u.ID, p.lim, p.logger) if err != nil { p.logger.Warnf("failed to get user roles for %s: %v", u.ID, err) } else { @@ -580,7 +580,7 @@ func (p *oktaInput) doFetchDevices(ctx context.Context, state *stateStore, fullS lastUpdated time.Time ) for { - batch, h, err := okta.GetDeviceDetails(ctx, p.client, p.cfg.OktaDomain, p.cfg.OktaToken, "", deviceQuery, p.lim, p.cfg.LimitWindow, p.logger) + batch, h, err := okta.GetDeviceDetails(ctx, p.client, p.cfg.OktaDomain, p.cfg.OktaToken, "", deviceQuery, p.lim, p.logger) if err != nil { p.logger.Debugf("received %d devices from API", len(devices)) return nil, err @@ -599,7 +599,7 @@ func (p *oktaInput) doFetchDevices(ctx context.Context, state *stateStore, fullS const omit = okta.OmitCredentials | okta.OmitCredentialsLinks | okta.OmitTransitioningToStatus - users, h, err := okta.GetDeviceUsers(ctx, p.client, p.cfg.OktaDomain, p.cfg.OktaToken, d.ID, userQuery, omit, p.lim, p.cfg.LimitWindow, p.logger) + users, h, err := okta.GetDeviceUsers(ctx, p.client, p.cfg.OktaDomain, p.cfg.OktaToken, d.ID, userQuery, omit, p.lim, p.logger) if err != nil { p.logger.Debugf("received %d device users from API", len(users)) return nil, err diff --git a/x-pack/filebeat/input/entityanalytics/provider/okta/okta_test.go b/x-pack/filebeat/input/entityanalytics/provider/okta/okta_test.go index e7e2bffbba2..ea2a710d8c5 100644 --- a/x-pack/filebeat/input/entityanalytics/provider/okta/okta_test.go +++ b/x-pack/filebeat/input/entityanalytics/provider/okta/okta_test.go @@ -176,7 +176,7 @@ func TestOktaDoFetch(t *testing.T) { if err != nil { t.Errorf("failed to parse server URL: %v", err) } - rateLimiter := okta.NewRateLimiter() + rateLimiter := okta.NewRateLimiter(window, nil) a := oktaInput{ cfg: conf{ OktaDomain: u.Host, From 799c175608d9ec302508227aadec980db9ec0f23 Mon Sep 17 00:00:00 2001 From: Chris Berkhout Date: Wed, 11 Dec 2024 09:25:57 +0100 Subject: [PATCH 04/12] Doc update. --- .../inputs/input-entity-analytics.asciidoc | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/x-pack/filebeat/docs/inputs/input-entity-analytics.asciidoc b/x-pack/filebeat/docs/inputs/input-entity-analytics.asciidoc index b4b701d3919..faa50cf732d 100644 --- a/x-pack/filebeat/docs/inputs/input-entity-analytics.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-entity-analytics.asciidoc @@ -1015,7 +1015,21 @@ shorter than the full synchronization interval (`sync_interval`). Expressed as a duration string (e.g., 1m, 3h, 24h). Defaults to `15m` (15 minutes). [float] -==== `tracer.enabled` +===== `limit_window` + +The time between Okta API rate limit resets. +Expressed as a duration string (e.g., 1m, 3h, 24h). Defaults to `1m` (1 minute). + +[float] +===== `limit_fixed` + +The number of requests to allow in each limit window, if set. +This parameter should only be set in exceptional cases. When it is set, rate +limit information in API responses will be ignored in favor of the fixed limit. +The limit is applied separately to each endopint. Defaults to unset. + +[float] +===== `tracer.enabled` It is possible to log HTTP requests and responses to the Okta API to a local file-system for debugging configurations. This option is enabled by setting `tracer.enabled` to true and setting the `tracer.filename` value. @@ -1025,7 +1039,7 @@ to false without unsetting the filename option. Enabling this option compromises security and should only be used for debugging. [float] -==== `tracer.filename` +===== `tracer.filename` To differentiate the trace files generated from different input instances, a placeholder `*` can be added to the filename and will be replaced with the input instance id. For Example, `http-request-trace-*.ndjson`. From 3376731e02be89a65e4fbf1e372231a3e5bca6eb Mon Sep 17 00:00:00 2001 From: Chris Berkhout Date: Wed, 11 Dec 2024 09:29:02 +0100 Subject: [PATCH 05/12] Remove redundant nil from defaultConfig(). --- x-pack/filebeat/input/entityanalytics/provider/okta/conf.go | 1 - 1 file changed, 1 deletion(-) diff --git a/x-pack/filebeat/input/entityanalytics/provider/okta/conf.go b/x-pack/filebeat/input/entityanalytics/provider/okta/conf.go index 783b40a6b2d..61a61aed25e 100644 --- a/x-pack/filebeat/input/entityanalytics/provider/okta/conf.go +++ b/x-pack/filebeat/input/entityanalytics/provider/okta/conf.go @@ -27,7 +27,6 @@ func defaultConfig() conf { SyncInterval: 24 * time.Hour, UpdateInterval: 15 * time.Minute, LimitWindow: time.Minute, - LimitFixed: nil, Request: &requestConfig{ Retry: retryConfig{ MaxAttempts: &maxAttempts, From 80cb3549b3e3f8a549db8fdfa594bbc04f97be89 Mon Sep 17 00:00:00 2001 From: Chris Berkhout Date: Wed, 11 Dec 2024 18:15:29 +0100 Subject: [PATCH 06/12] Godoc and naming. --- .../okta/internal/okta/ratelimiter.go | 22 +++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/ratelimiter.go b/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/ratelimiter.go index 5ade8681f3d..921c8daf881 100644 --- a/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/ratelimiter.go +++ b/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/ratelimiter.go @@ -16,19 +16,37 @@ import ( "github.com/elastic/elastic-agent-libs/logp" ) +// RateLimiter holds rate limiting information for an API. +// +// Each API endpoint has its own rate limit, which can be dynamically updated +// using response headers. If a fixed limit is set, it takes precedence over any +// information from response headers. type RateLimiter struct { window time.Duration fixedLimit *int byEndpoint map[string]endpointRateLimiter } +// endpointRateLimiter represents rate limiting information for a single API endpoint. type endpointRateLimiter struct { limiter *rate.Limiter ready chan struct{} } -const waitDeadline = 30 * time.Minute +// maxWait defines the maximum wait duration allowed for rate limiting. +// Longer waits are considered errors. +const maxWait = 30 * time.Minute +// NewRateLimiter constructs a new RateLimiter. +// +// Parameters: +// - `window`: The time between API limit resets. Used for setting an initial +// target rate. +// - `fixedLimit`: A fixed number of requests to allow in each `window`, +// overriding the guidance in API responses. +// +// Returns: +// - A pointer to a new RateLimiter instance. func NewRateLimiter(window time.Duration, fixedLimit *int) *RateLimiter { endpoints := make(map[string]endpointRateLimiter) r := RateLimiter{ @@ -63,7 +81,7 @@ func (r RateLimiter) Wait(ctx context.Context, endpoint string, url *url.URL, lo e := r.endpoint(endpoint) <-e.ready log.Debugw("rate limit", "limit", e.limiter.Limit(), "burst", e.limiter.Burst(), "url", url.String()) - ctxWithDeadline, cancel := context.WithDeadline(ctx, time.Now().Add(waitDeadline)) + ctxWithDeadline, cancel := context.WithDeadline(ctx, time.Now().Add(maxWait)) defer cancel() return e.limiter.Wait(ctxWithDeadline) } From be813e8c0ab379501b12e542f349ae475b37b076 Mon Sep 17 00:00:00 2001 From: Chris Berkhout Date: Wed, 11 Dec 2024 18:16:15 +0100 Subject: [PATCH 07/12] Make fewer channels. --- .../provider/okta/internal/okta/ratelimiter.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/ratelimiter.go b/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/ratelimiter.go index 921c8daf881..aa9130a4f42 100644 --- a/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/ratelimiter.go +++ b/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/ratelimiter.go @@ -58,6 +58,10 @@ func NewRateLimiter(window time.Duration, fixedLimit *int) *RateLimiter { return &r } +var immediatelyReady = make(chan struct{}) + +func init() { close(immediatelyReady) } + func (r RateLimiter) endpoint(path string) endpointRateLimiter { if existing, ok := r.byEndpoint[path]; ok { return existing @@ -67,11 +71,9 @@ func (r RateLimiter) endpoint(path string) endpointRateLimiter { limit = rate.Limit(float64(*r.fixedLimit) / r.window.Seconds()) } limiter := rate.NewLimiter(limit, 1) // Allow a single fetch operation to obtain limits from the API - ready := make(chan struct{}) - close(ready) newEndpointRateLimiter := endpointRateLimiter{ limiter: limiter, - ready: ready, + ready: immediatelyReady, } r.byEndpoint[path] = newEndpointRateLimiter return newEndpointRateLimiter From 4d26f534e84112cc90c9c8cdf9e8521861779249 Mon Sep 17 00:00:00 2001 From: Chris Berkhout Date: Wed, 11 Dec 2024 18:19:15 +0100 Subject: [PATCH 08/12] Apply deadline and handle its expiry also during blocking of all requests. --- .../provider/okta/internal/okta/ratelimiter.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/ratelimiter.go b/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/ratelimiter.go index aa9130a4f42..2dfd8ae923b 100644 --- a/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/ratelimiter.go +++ b/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/ratelimiter.go @@ -81,10 +81,14 @@ func (r RateLimiter) endpoint(path string) endpointRateLimiter { func (r RateLimiter) Wait(ctx context.Context, endpoint string, url *url.URL, log *logp.Logger) (err error) { e := r.endpoint(endpoint) - <-e.ready log.Debugw("rate limit", "limit", e.limiter.Limit(), "burst", e.limiter.Burst(), "url", url.String()) ctxWithDeadline, cancel := context.WithDeadline(ctx, time.Now().Add(maxWait)) defer cancel() + select { + case <-e.ready: + case <-ctxWithDeadline.Done(): + return ctxWithDeadline.Err() + } return e.limiter.Wait(ctxWithDeadline) } From c02c8e98cab42a96869a96ca41497d6e729f1c96 Mon Sep 17 00:00:00 2001 From: Chris Berkhout Date: Wed, 11 Dec 2024 18:19:41 +0100 Subject: [PATCH 09/12] Check errors from url.Parse(). --- .../okta/internal/okta/ratelimiter_test.go | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/ratelimiter_test.go b/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/ratelimiter_test.go index b19e9d0f3c0..696e9868043 100644 --- a/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/ratelimiter_test.go +++ b/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/ratelimiter_test.go @@ -37,7 +37,10 @@ func TestRateLimiter(t *testing.T) { var fixedLimit *int = nil r := NewRateLimiter(window, fixedLimit) const endpoint = "/foo" - url, _ := url.Parse(endpoint) + url, err := url.Parse(endpoint) + if err != nil { + t.Errorf("unexpected error from url.Parse(): %v", err) + } ctx := context.Background() log := logp.L() e := r.endpoint(endpoint) @@ -54,7 +57,7 @@ func TestRateLimiter(t *testing.T) { "X-Rate-Limit-Remaining": []string{"0"}, "X-Rate-Limit-Reset": []string{strconv.FormatInt(resetSoon, 10)}, } - err := r.Update(endpoint, headers, logp.L()) + err = r.Update(endpoint, headers, logp.L()) if err != nil { t.Errorf("unexpected error from Update(): %v", err) } @@ -110,7 +113,10 @@ func TestRateLimiter(t *testing.T) { const endpoint = "/foo" - url, _ := url.Parse(endpoint) + url, err := url.Parse(endpoint) + if err != nil { + t.Errorf("unexpected error from url.Parse(): %v", err) + } reset := time.Now().Add(31 * time.Minute).Unix() headers := http.Header{ "X-Rate-Limit-Limit": []string{"60"}, @@ -123,7 +129,7 @@ func TestRateLimiter(t *testing.T) { r.Wait(ctx, endpoint, url, log) // consume the initial request r.Update(endpoint, headers, log) // update to a slow rate - err := r.Wait(ctx, endpoint, url, log) + err = r.Wait(ctx, endpoint, url, log) const expectedErr = "rate: Wait(n=1) would exceed context deadline" if err == nil { From 996fc71665135baf182d5b98ccaa95f0b9bafb01 Mon Sep 17 00:00:00 2001 From: Chris Berkhout Date: Wed, 11 Dec 2024 18:20:10 +0100 Subject: [PATCH 10/12] Nicer test error message formatting. --- .../provider/okta/internal/okta/ratelimiter_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/ratelimiter_test.go b/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/ratelimiter_test.go index 696e9868043..8a0811f4ab2 100644 --- a/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/ratelimiter_test.go +++ b/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/ratelimiter_test.go @@ -87,7 +87,7 @@ func TestRateLimiter(t *testing.T) { wait := time.Since(start) if wait > 1010*time.Millisecond { - t.Errorf("doesn't allow requests to resume after reset. had to wait %d milliseconds", wait.Milliseconds()) + t.Errorf("doesn't allow requests to resume after reset. had to wait %s", wait) } if e.limiter.Limit() != 1.0 { t.Errorf("unexpected rate following reset (not 60 requests / 60 seconds): %f", e.limiter.Limit()) From 18c8e962cb217af25c8204fc77b4093551914caa Mon Sep 17 00:00:00 2001 From: Chris Berkhout Date: Wed, 11 Dec 2024 18:20:43 +0100 Subject: [PATCH 11/12] Nicer threshold check in test. --- .../provider/okta/internal/okta/ratelimiter_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/ratelimiter_test.go b/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/ratelimiter_test.go index 8a0811f4ab2..cc8a832bee2 100644 --- a/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/ratelimiter_test.go +++ b/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/ratelimiter_test.go @@ -101,7 +101,7 @@ func TestRateLimiter(t *testing.T) { target := 30.0 buffer := 0.01 - if tokens < target-buffer || tokens > target+buffer { + if tokens < target-buffer || target+buffer < tokens { t.Errorf("tokens don't accumulate at the expected rate over 30s: %f", tokens) } }) From 73cd8f671db4d4a71347d21d52a9b343c25780b2 Mon Sep 17 00:00:00 2001 From: Chris Berkhout Date: Wed, 11 Dec 2024 18:23:55 +0100 Subject: [PATCH 12/12] CHANGELOG.next.asciidoc entries. --- CHANGELOG.next.asciidoc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index fc83979c173..c99bd4219e6 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -188,6 +188,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Improve S3 object size metric calculation to support situations where Content-Length is not available. {pull}41755[41755] - Fix handling of http_endpoint request exceeding memory limits. {issue}41764[41764] {pull}41765[41765] - Rate limiting fixes in the Okta provider of the Entity Analytics input. {issue}40106[40106] {pull}41583[41583] +- Further rate limiting fix in the Okta provider of the Entity Analytics input. {issue}40106[40106] {pull}41977[41977] *Heartbeat* @@ -356,6 +357,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Add evaluation state dump debugging option to CEL input. {pull}41335[41335] - Added support for retry configuration in GCS input. {issue}11580[11580] {pull}41862[41862] - Improve S3 polling mode states registry when using list prefix option. {pull}41869[41869] +- Rate limiting operability improvements in the Okta provider of the Entity Analytics input. {issue}40106[40106] {pull}41977[41977] *Auditbeat*