Skip to content

Commit

Permalink
.../input/entityanalytics/provider/okta: Rate limiting fix, improveme…
Browse files Browse the repository at this point in the history
…nts (#41977)

- Fix a bug in the stopping of requests when `x-rate-limit-remaining: 0`.
- Add a deadline so long waits return immediately as errors.
- Add an option to set a fixed request rate.
  • Loading branch information
chrisberkhout authored Dec 12, 2024
1 parent 16c753c commit 08b7d84
Show file tree
Hide file tree
Showing 9 changed files with 282 additions and 107 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Fix handling of http_endpoint request exceeding memory limits. {issue}41764[41764] {pull}41765[41765]
- Rate limiting fixes in the Okta provider of the Entity Analytics input. {issue}40106[40106] {pull}41583[41583]
- Redact authorization headers in HTTPJSON debug logs. {pull}41920[41920]
- Further rate limiting fix in the Okta provider of the Entity Analytics input. {issue}40106[40106] {pull}41977[41977]

*Heartbeat*

Expand Down Expand Up @@ -365,6 +366,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Add support for SSL and Proxy configurations for websoket type in streaming input. {pull}41934[41934]
- AWS S3 input registry cleanup for untracked s3 objects. {pull}41694[41694]
- The environment variable `BEATS_AZURE_EVENTHUB_INPUT_TRACING_ENABLED: true` enables internal logs tracer for the azure-eventhub input. {issue}41931[41931] {pull}41932[41932]
- Rate limiting operability improvements in the Okta provider of the Entity Analytics input. {issue}40106[40106] {pull}41977[41977]

*Auditbeat*

Expand Down
18 changes: 16 additions & 2 deletions x-pack/filebeat/docs/inputs/input-entity-analytics.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -1015,7 +1015,21 @@ shorter than the full synchronization interval (`sync_interval`). Expressed as a
duration string (e.g., 1m, 3h, 24h). Defaults to `15m` (15 minutes).

[float]
==== `tracer.enabled`
===== `limit_window`

The time between Okta API rate limit resets.
Expressed as a duration string (e.g., 1m, 3h, 24h). Defaults to `1m` (1 minute).

[float]
===== `limit_fixed`

The number of requests to allow in each limit window, if set.
This parameter should only be set in exceptional cases. When it is set, rate
limit information in API responses will be ignored in favor of the fixed limit.
The limit is applied separately to each endopint. Defaults to unset.

[float]
===== `tracer.enabled`

It is possible to log HTTP requests and responses to the Okta API to a local file-system for debugging configurations.
This option is enabled by setting `tracer.enabled` to true and setting the `tracer.filename` value.
Expand All @@ -1025,7 +1039,7 @@ to false without unsetting the filename option.
Enabling this option compromises security and should only be used for debugging.

[float]
==== `tracer.filename`
===== `tracer.filename`

To differentiate the trace files generated from different input instances, a placeholder `*` can be added to the
filename and will be replaced with the input instance id. For Example, `http-request-trace-*.ndjson`.
Expand Down
4 changes: 4 additions & 0 deletions x-pack/filebeat/input/entityanalytics/provider/okta/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ type conf struct {
// API limit resets.
LimitWindow time.Duration `config:"limit_window"`

// LimitFixed is a number of requests to allow in each LimitWindow,
// overriding the guidance in API responses.
LimitFixed *int `config:"limit_fixed"`

// Request is the configuration for establishing
// HTTP requests to the API.
Request *requestConfig `config:"request"`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,13 +185,13 @@ func (o Response) String() string {
// Parts of the response may be omitted using the omit parameter.
//
// The provided rate limiter must allow at least request and will be updated with the
// response's X-Rate-Limit headers considering the rate limit window time. Details
// for rate limits are available at https://help.okta.com/en-us/Content/Topics/Security/API-rate-limits.htm
// response's X-Rate-Limit headers. Details for rate limits are available at
// https://help.okta.com/en-us/Content/Topics/Security/API-rate-limits.htm
// and account rate limits and windows can be seen on the Okta admin dashboard at
// https://${yourOktaDomain}/reports/rate-limit.
//
// See https://developer.okta.com/docs/reference/api/users/#list-users for details.
func GetUserDetails(ctx context.Context, cli *http.Client, host, key, user string, query url.Values, omit Response, lim RateLimiter, window time.Duration, log *logp.Logger) ([]User, http.Header, error) {
func GetUserDetails(ctx context.Context, cli *http.Client, host, key, user string, query url.Values, omit Response, lim *RateLimiter, log *logp.Logger) ([]User, http.Header, error) {
var endpoint, path string
if user == "" {
endpoint = "/api/v1/users"
Expand All @@ -207,7 +207,7 @@ func GetUserDetails(ctx context.Context, cli *http.Client, host, key, user strin
Path: path,
RawQuery: query.Encode(),
}
return getDetails[User](ctx, cli, u, endpoint, key, user == "", omit, lim, window, log)
return getDetails[User](ctx, cli, u, endpoint, key, user == "", omit, lim, log)
}

// GetUserFactors returns Okta group roles using the groups API endpoint. host is the
Expand All @@ -216,7 +216,7 @@ func GetUserDetails(ctx context.Context, cli *http.Client, host, key, user strin
// See GetUserDetails for details of the query and rate limit parameters.
//
// See https://developer.okta.com/docs/api/openapi/okta-management/management/tag/UserFactor/#tag/UserFactor/operation/listFactors.
func GetUserFactors(ctx context.Context, cli *http.Client, host, key, user string, lim RateLimiter, window time.Duration, log *logp.Logger) ([]Factor, http.Header, error) {
func GetUserFactors(ctx context.Context, cli *http.Client, host, key, user string, lim *RateLimiter, log *logp.Logger) ([]Factor, http.Header, error) {
if user == "" {
return nil, nil, errors.New("no user specified")
}
Expand All @@ -229,7 +229,7 @@ func GetUserFactors(ctx context.Context, cli *http.Client, host, key, user strin
Host: host,
Path: path,
}
return getDetails[Factor](ctx, cli, u, endpoint, key, true, OmitNone, lim, window, log)
return getDetails[Factor](ctx, cli, u, endpoint, key, true, OmitNone, lim, log)
}

// GetUserRoles returns Okta group roles using the groups API endpoint. host is the
Expand All @@ -238,7 +238,7 @@ func GetUserFactors(ctx context.Context, cli *http.Client, host, key, user strin
// See GetUserDetails for details of the query and rate limit parameters.
//
// See https://developer.okta.com/docs/api/openapi/okta-management/management/tag/RoleAssignmentBGroup/#tag/RoleAssignmentBGroup/operation/listGroupAssignedRoles.
func GetUserRoles(ctx context.Context, cli *http.Client, host, key, user string, lim RateLimiter, window time.Duration, log *logp.Logger) ([]Role, http.Header, error) {
func GetUserRoles(ctx context.Context, cli *http.Client, host, key, user string, lim *RateLimiter, log *logp.Logger) ([]Role, http.Header, error) {
if user == "" {
return nil, nil, errors.New("no user specified")
}
Expand All @@ -251,7 +251,7 @@ func GetUserRoles(ctx context.Context, cli *http.Client, host, key, user string,
Host: host,
Path: path,
}
return getDetails[Role](ctx, cli, u, endpoint, key, true, OmitNone, lim, window, log)
return getDetails[Role](ctx, cli, u, endpoint, key, true, OmitNone, lim, log)
}

// GetUserGroupDetails returns Okta group details using the users API endpoint. host is the
Expand All @@ -260,7 +260,7 @@ func GetUserRoles(ctx context.Context, cli *http.Client, host, key, user string,
// See GetUserDetails for details of the query and rate limit parameters.
//
// See https://developer.okta.com/docs/reference/api/users/#request-parameters-8 (no anchor exists on the page for this endpoint) for details.
func GetUserGroupDetails(ctx context.Context, cli *http.Client, host, key, user string, lim RateLimiter, window time.Duration, log *logp.Logger) ([]Group, http.Header, error) {
func GetUserGroupDetails(ctx context.Context, cli *http.Client, host, key, user string, lim *RateLimiter, log *logp.Logger) ([]Group, http.Header, error) {
if user == "" {
return nil, nil, errors.New("no user specified")
}
Expand All @@ -273,7 +273,7 @@ func GetUserGroupDetails(ctx context.Context, cli *http.Client, host, key, user
Host: host,
Path: path,
}
return getDetails[Group](ctx, cli, u, endpoint, key, true, OmitNone, lim, window, log)
return getDetails[Group](ctx, cli, u, endpoint, key, true, OmitNone, lim, log)
}

// GetGroupRoles returns Okta group roles using the groups API endpoint. host is the
Expand All @@ -282,7 +282,7 @@ func GetUserGroupDetails(ctx context.Context, cli *http.Client, host, key, user
// See GetUserDetails for details of the query and rate limit parameters.
//
// See https://developer.okta.com/docs/api/openapi/okta-management/management/tag/RoleAssignmentBGroup/#tag/RoleAssignmentBGroup/operation/listGroupAssignedRoles.
func GetGroupRoles(ctx context.Context, cli *http.Client, host, key, group string, lim RateLimiter, window time.Duration, log *logp.Logger) ([]Role, http.Header, error) {
func GetGroupRoles(ctx context.Context, cli *http.Client, host, key, group string, lim *RateLimiter, log *logp.Logger) ([]Role, http.Header, error) {
if group == "" {
return nil, nil, errors.New("no group specified")
}
Expand All @@ -295,7 +295,7 @@ func GetGroupRoles(ctx context.Context, cli *http.Client, host, key, group strin
Host: host,
Path: path,
}
return getDetails[Role](ctx, cli, u, endpoint, key, true, OmitNone, lim, window, log)
return getDetails[Role](ctx, cli, u, endpoint, key, true, OmitNone, lim, log)
}

// GetDeviceDetails returns Okta device details using the list devices API endpoint. host is the
Expand All @@ -305,7 +305,7 @@ func GetGroupRoles(ctx context.Context, cli *http.Client, host, key, group strin
// See GetUserDetails for details of the query and rate limit parameters.
//
// See https://developer.okta.com/docs/api/openapi/okta-management/management/tag/Device/#tag/Device/operation/listDevices for details.
func GetDeviceDetails(ctx context.Context, cli *http.Client, host, key, device string, query url.Values, lim RateLimiter, window time.Duration, log *logp.Logger) ([]Device, http.Header, error) {
func GetDeviceDetails(ctx context.Context, cli *http.Client, host, key, device string, query url.Values, lim *RateLimiter, log *logp.Logger) ([]Device, http.Header, error) {
var endpoint string
var path string
if device == "" {
Expand All @@ -322,7 +322,7 @@ func GetDeviceDetails(ctx context.Context, cli *http.Client, host, key, device s
Path: path,
RawQuery: query.Encode(),
}
return getDetails[Device](ctx, cli, u, endpoint, key, device == "", OmitNone, lim, window, log)
return getDetails[Device](ctx, cli, u, endpoint, key, device == "", OmitNone, lim, log)
}

// GetDeviceUsers returns Okta user details for users associated with the provided device identifier
Expand All @@ -332,7 +332,7 @@ func GetDeviceDetails(ctx context.Context, cli *http.Client, host, key, device s
// See GetUserDetails for details of the query and rate limit parameters.
//
// See https://developer.okta.com/docs/api/openapi/okta-management/management/tag/Device/#tag/Device/operation/listDeviceUsers for details.
func GetDeviceUsers(ctx context.Context, cli *http.Client, host, key, device string, query url.Values, omit Response, lim RateLimiter, window time.Duration, log *logp.Logger) ([]User, http.Header, error) {
func GetDeviceUsers(ctx context.Context, cli *http.Client, host, key, device string, query url.Values, omit Response, lim *RateLimiter, log *logp.Logger) ([]User, http.Header, error) {
if device == "" {
// No user associated with a null device. Not an error.
return nil, nil, nil
Expand All @@ -347,7 +347,7 @@ func GetDeviceUsers(ctx context.Context, cli *http.Client, host, key, device str
Path: path,
RawQuery: query.Encode(),
}
du, h, err := getDetails[devUser](ctx, cli, u, endpoint, key, true, omit, lim, window, log)
du, h, err := getDetails[devUser](ctx, cli, u, endpoint, key, true, omit, lim, log)
if err != nil {
return nil, h, err
}
Expand All @@ -372,7 +372,7 @@ type devUser struct {
// for the specific user are returned, otherwise a list of all users is returned.
//
// See GetUserDetails for details of the query and rate limit parameters.
func getDetails[E entity](ctx context.Context, cli *http.Client, u *url.URL, endpoint string, key string, all bool, omit Response, lim RateLimiter, window time.Duration, log *logp.Logger) ([]E, http.Header, error) {
func getDetails[E entity](ctx context.Context, cli *http.Client, u *url.URL, endpoint string, key string, all bool, omit Response, lim *RateLimiter, log *logp.Logger) ([]E, http.Header, error) {
url := u.String()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
Expand All @@ -395,7 +395,7 @@ func getDetails[E entity](ctx context.Context, cli *http.Client, u *url.URL, end
return nil, nil, err
}
defer resp.Body.Close()
err = lim.Update(endpoint, resp.Header, window, log)
err = lim.Update(endpoint, resp.Header, log)
if err != nil {
io.Copy(io.Discard, resp.Body)
return nil, nil, err
Expand Down
Loading

0 comments on commit 08b7d84

Please sign in to comment.