Skip to content

Commit

Permalink
Add middleware to log server errors
Browse files Browse the repository at this point in the history
Signed-off-by: Connor Catlett <[email protected]>
  • Loading branch information
ConnorJC3 committed Oct 31, 2024
1 parent e8df5a8 commit 1500fba
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 4 deletions.
1 change: 1 addition & 0 deletions pkg/cloud/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ func newEC2Cloud(region string, awsSdkDebugLog bool, userAgentExtra string, batc
svc := ec2.NewFromConfig(cfg, func(o *ec2.Options) {
o.APIOptions = append(o.APIOptions,
RecordRequestsMiddleware(),
LogServerErrorsMiddleware(), // This middlware should always be last so it sees an unmangled error
)

endpoint := os.Getenv("AWS_EC2_ENDPOINT")
Expand Down
32 changes: 28 additions & 4 deletions pkg/cloud/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,13 @@ import (
"time"

awsmiddleware "github.com/aws/aws-sdk-go-v2/aws/middleware"
"github.com/aws/aws-sdk-go-v2/aws/retry"
"github.com/aws/smithy-go"
"github.com/aws/smithy-go/middleware"
"github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/metrics"
"k8s.io/klog/v2"
)

const requestLimitExceededErrorCode = "RequestLimitExceeded"

// RecordRequestsHandler is added to the Complete chain; called after any request
func RecordRequestsMiddleware() func(*middleware.Stack) error {
return func(stack *middleware.Stack) error {
Expand All @@ -40,13 +39,12 @@ func RecordRequestsMiddleware() func(*middleware.Stack) error {
if err != nil {
var apiErr smithy.APIError
if errors.As(err, &apiErr) {
if apiErr.ErrorCode() == requestLimitExceededErrorCode {
if _, isThrottleError := retry.DefaultThrottleErrorCodes[apiErr.ErrorCode()]; isThrottleError {
operationName := awsmiddleware.GetOperationName(ctx)
labels = map[string]string{
"operation_name": operationName,
}
metrics.Recorder().IncreaseCount("cloudprovider_aws_api_throttled_requests_total", labels)
klog.InfoS("Got RequestLimitExceeded error on AWS request", "request", operationName)
} else {
metrics.Recorder().IncreaseCount("cloudprovider_aws_api_request_errors", labels)
}
Expand All @@ -60,6 +58,32 @@ func RecordRequestsMiddleware() func(*middleware.Stack) error {
}
}

// LogServerErrorsMiddleware is a middleware that logs server errors received when attempting to contact the AWS API
// A specialized middleware is used instead of the SDK's built-in retry logging to allow for customizing the verbosity
// of throttle errors vs server/unknown errors, to prevent flooding the logs with throttle error
func LogServerErrorsMiddleware() func(*middleware.Stack) error {
return func(stack *middleware.Stack) error {
return stack.Finalize.Add(middleware.FinalizeMiddlewareFunc("LogServerErrorsMiddleware", func(ctx context.Context, input middleware.FinalizeInput, next middleware.FinalizeHandler) (output middleware.FinalizeOutput, metadata middleware.Metadata, err error) {
output, metadata, err = next.HandleFinalize(ctx, input)
if err != nil {
var apiErr smithy.APIError
if errors.As(err, &apiErr) {
if _, isThrottleError := retry.DefaultThrottleErrorCodes[apiErr.ErrorCode()]; isThrottleError {
// Only log throttle errors under a high verbosity as we expect to see many of them
// under normal bursty/high-TPS workloads
klog.V(4).ErrorS(apiErr, "Throttle error from AWS API")
} else {
klog.ErrorS(apiErr, "Error from AWS API")
}
} else {
klog.ErrorS(err, "Unknown error attempting to contact AWS API")
}
}
return output, metadata, err
}), middleware.After)
}
}

func createLabels(ctx context.Context) map[string]string {
operationName := awsmiddleware.GetOperationName(ctx)
if operationName == "" {
Expand Down

0 comments on commit 1500fba

Please sign in to comment.