From 03f072ba92551f3918a0b811f3ba543afe8f7ce4 Mon Sep 17 00:00:00 2001 From: dttung2905 Date: Sun, 18 Aug 2024 21:15:12 +0100 Subject: [PATCH 1/2] Refactor aws dynamodb streams scaler Signed-off-by: dttung2905 --- pkg/scalers/aws_dynamodb_streams_scaler.go | 52 +++++++------------ .../aws_dynamodb_streams_scaler_test.go | 48 ++++++++--------- 2 files changed, 43 insertions(+), 57 deletions(-) diff --git a/pkg/scalers/aws_dynamodb_streams_scaler.go b/pkg/scalers/aws_dynamodb_streams_scaler.go index a8448a46408..4756041de71 100644 --- a/pkg/scalers/aws_dynamodb_streams_scaler.go +++ b/pkg/scalers/aws_dynamodb_streams_scaler.go @@ -3,14 +3,13 @@ package scalers import ( "context" "fmt" - "strconv" - "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/dynamodb" "github.com/aws/aws-sdk-go-v2/service/dynamodbstreams" "github.com/go-logr/logr" v2 "k8s.io/api/autoscaling/v2" "k8s.io/metrics/pkg/apis/external_metrics" + "strconv" awsutils "github.com/kedacore/keda/v2/pkg/scalers/aws" "github.com/kedacore/keda/v2/pkg/scalers/scalersconfig" @@ -31,13 +30,13 @@ type awsDynamoDBStreamsScaler struct { } type awsDynamoDBStreamsMetadata struct { - targetShardCount int64 - activationTargetShardCount int64 - tableName string - awsRegion string - awsEndpoint string awsAuthorization awsutils.AuthorizationMetadata triggerIndex int + targetShardCount int64 + activationTargetShardCount int64 + TableName string `keda:"name=tableName, order=triggerMetadata"` + AwsRegion string `keda:"name=awsRegion, order=triggerMetadata"` + AwsEndpoint string `keda:"name=awsEndpoint, order=triggerMetadata, optional"` } // NewAwsDynamoDBStreamsScaler creates a new awsDynamoDBStreamsScaler @@ -58,7 +57,7 @@ func NewAwsDynamoDBStreamsScaler(ctx context.Context, config *scalersconfig.Scal if err != nil { return nil, fmt.Errorf("error when creating dynamodbstream client: %w", err) } - streamArn, err := getDynamoDBStreamsArn(ctx, dbClient, &meta.tableName) + streamArn, err := getDynamoDBStreamsArn(ctx, dbClient, &meta.TableName) if err != nil { return nil, fmt.Errorf("error dynamodb stream arn: %w", err) } @@ -75,24 +74,11 @@ func NewAwsDynamoDBStreamsScaler(ctx context.Context, config *scalersconfig.Scal } func parseAwsDynamoDBStreamsMetadata(config *scalersconfig.ScalerConfig, logger logr.Logger) (*awsDynamoDBStreamsMetadata, error) { - meta := awsDynamoDBStreamsMetadata{} - meta.targetShardCount = defaultTargetDBStreamsShardCount - - if val, ok := config.TriggerMetadata["awsRegion"]; ok && val != "" { - meta.awsRegion = val - } else { - return nil, fmt.Errorf("no awsRegion given") - } - - if val, ok := config.TriggerMetadata["awsEndpoint"]; ok { - meta.awsEndpoint = val - } - - if val, ok := config.TriggerMetadata["tableName"]; ok && val != "" { - meta.tableName = val - } else { - return nil, fmt.Errorf("no tableName given") + meta := &awsDynamoDBStreamsMetadata{} + if err := config.TypedConfig(meta); err != nil { + return nil, fmt.Errorf("error parsing prometheus metadata: %w", err) } + meta.targetShardCount = defaultTargetDBStreamsShardCount if val, ok := config.TriggerMetadata["shardCount"]; ok && val != "" { shardCount, err := strconv.ParseInt(val, 10, 64) @@ -121,22 +107,22 @@ func parseAwsDynamoDBStreamsMetadata(config *scalersconfig.ScalerConfig, logger meta.awsAuthorization = auth meta.triggerIndex = config.TriggerIndex - return &meta, nil + return meta, nil } func createClientsForDynamoDBStreamsScaler(ctx context.Context, metadata *awsDynamoDBStreamsMetadata) (*dynamodb.Client, *dynamodbstreams.Client, error) { - cfg, err := awsutils.GetAwsConfig(ctx, metadata.awsRegion, metadata.awsAuthorization) + cfg, err := awsutils.GetAwsConfig(ctx, metadata.AwsRegion, metadata.awsAuthorization) if err != nil { return nil, nil, err } dbClient := dynamodb.NewFromConfig(*cfg, func(options *dynamodb.Options) { - if metadata.awsEndpoint != "" { - options.BaseEndpoint = aws.String(metadata.awsEndpoint) + if metadata.AwsEndpoint != "" { + options.BaseEndpoint = aws.String(metadata.AwsEndpoint) } }) dbStreamClient := dynamodbstreams.NewFromConfig(*cfg, func(options *dynamodbstreams.Options) { - if metadata.awsEndpoint != "" { - options.BaseEndpoint = aws.String(metadata.awsEndpoint) + if metadata.AwsEndpoint != "" { + options.BaseEndpoint = aws.String(metadata.AwsEndpoint) } }) @@ -176,7 +162,7 @@ func (s *awsDynamoDBStreamsScaler) Close(_ context.Context) error { func (s *awsDynamoDBStreamsScaler) GetMetricSpecForScaling(_ context.Context) []v2.MetricSpec { externalMetric := &v2.ExternalMetricSource{ Metric: v2.MetricIdentifier{ - Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, kedautil.NormalizeString(fmt.Sprintf("aws-dynamodb-streams-%s", s.metadata.tableName))), + Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, kedautil.NormalizeString(fmt.Sprintf("aws-dynamodb-streams-%s", s.metadata.TableName))), }, Target: GetMetricTarget(s.metricType, s.metadata.targetShardCount), } @@ -208,7 +194,7 @@ func (s *awsDynamoDBStreamsScaler) getDynamoDBStreamShardCount(ctx context.Conte } for { if lastShardID != nil { - // The upper limit of shard num to retrun is 100. + // The upper limit of shard num to return is 100. // ExclusiveStartShardId is the shard ID of the first item that the operation will evaluate. input = dynamodbstreams.DescribeStreamInput{ StreamArn: s.streamArn, diff --git a/pkg/scalers/aws_dynamodb_streams_scaler_test.go b/pkg/scalers/aws_dynamodb_streams_scaler_test.go index 5c87de8d87a..00d264623b8 100644 --- a/pkg/scalers/aws_dynamodb_streams_scaler_test.go +++ b/pkg/scalers/aws_dynamodb_streams_scaler_test.go @@ -137,8 +137,8 @@ var testAwsDynamoDBStreamMetadata = []parseAwsDynamoDBStreamsMetadataTestData{ expected: &awsDynamoDBStreamsMetadata{ targetShardCount: 2, activationTargetShardCount: 1, - tableName: testAWSDynamoDBSmallTable, - awsRegion: testAWSDynamoDBStreamsRegion, + TableName: testAWSDynamoDBSmallTable, + AwsRegion: testAWSDynamoDBStreamsRegion, awsAuthorization: awsutils.AuthorizationMetadata{ AwsAccessKeyID: testAWSDynamoDBStreamsAccessKeyID, AwsSecretAccessKey: testAWSDynamoDBStreamsSecretAccessKey, @@ -161,9 +161,9 @@ var testAwsDynamoDBStreamMetadata = []parseAwsDynamoDBStreamsMetadataTestData{ expected: &awsDynamoDBStreamsMetadata{ targetShardCount: 2, activationTargetShardCount: 1, - tableName: testAWSDynamoDBSmallTable, - awsRegion: testAWSDynamoDBStreamsRegion, - awsEndpoint: testAWSDynamoDBStreamsEndpoint, + TableName: testAWSDynamoDBSmallTable, + AwsRegion: testAWSDynamoDBStreamsRegion, + AwsEndpoint: testAWSDynamoDBStreamsEndpoint, awsAuthorization: awsutils.AuthorizationMetadata{ AwsAccessKeyID: testAWSDynamoDBStreamsAccessKeyID, AwsSecretAccessKey: testAWSDynamoDBStreamsSecretAccessKey, @@ -206,8 +206,8 @@ var testAwsDynamoDBStreamMetadata = []parseAwsDynamoDBStreamsMetadataTestData{ expected: &awsDynamoDBStreamsMetadata{ targetShardCount: defaultTargetDBStreamsShardCount, activationTargetShardCount: defaultActivationTargetDBStreamsShardCount, - tableName: testAWSDynamoDBSmallTable, - awsRegion: testAWSDynamoDBStreamsRegion, + TableName: testAWSDynamoDBSmallTable, + AwsRegion: testAWSDynamoDBStreamsRegion, awsAuthorization: awsutils.AuthorizationMetadata{ AwsAccessKeyID: testAWSDynamoDBStreamsAccessKeyID, AwsSecretAccessKey: testAWSDynamoDBStreamsSecretAccessKey, @@ -227,8 +227,8 @@ var testAwsDynamoDBStreamMetadata = []parseAwsDynamoDBStreamsMetadataTestData{ authParams: testAWSKinesisAuthentication, expected: &awsDynamoDBStreamsMetadata{ targetShardCount: defaultTargetDBStreamsShardCount, - tableName: testAWSDynamoDBSmallTable, - awsRegion: testAWSDynamoDBStreamsRegion, + TableName: testAWSDynamoDBSmallTable, + AwsRegion: testAWSDynamoDBStreamsRegion, awsAuthorization: awsutils.AuthorizationMetadata{ AwsAccessKeyID: testAWSDynamoDBStreamsAccessKeyID, AwsSecretAccessKey: testAWSDynamoDBStreamsSecretAccessKey, @@ -279,8 +279,8 @@ var testAwsDynamoDBStreamMetadata = []parseAwsDynamoDBStreamsMetadataTestData{ }, expected: &awsDynamoDBStreamsMetadata{ targetShardCount: 2, - tableName: testAWSDynamoDBSmallTable, - awsRegion: testAWSDynamoDBStreamsRegion, + TableName: testAWSDynamoDBSmallTable, + AwsRegion: testAWSDynamoDBStreamsRegion, awsAuthorization: awsutils.AuthorizationMetadata{ AwsAccessKeyID: testAWSDynamoDBStreamsAccessKeyID, AwsSecretAccessKey: testAWSDynamoDBStreamsSecretAccessKey, @@ -331,8 +331,8 @@ var testAwsDynamoDBStreamMetadata = []parseAwsDynamoDBStreamsMetadataTestData{ }, expected: &awsDynamoDBStreamsMetadata{ targetShardCount: 2, - tableName: testAWSDynamoDBSmallTable, - awsRegion: testAWSDynamoDBStreamsRegion, + TableName: testAWSDynamoDBSmallTable, + AwsRegion: testAWSDynamoDBStreamsRegion, awsAuthorization: awsutils.AuthorizationMetadata{ AwsRoleArn: testAWSDynamoDBStreamsRoleArn, PodIdentityOwner: true, @@ -351,8 +351,8 @@ var testAwsDynamoDBStreamMetadata = []parseAwsDynamoDBStreamsMetadataTestData{ authParams: map[string]string{}, expected: &awsDynamoDBStreamsMetadata{ targetShardCount: 2, - tableName: testAWSDynamoDBSmallTable, - awsRegion: testAWSDynamoDBStreamsRegion, + TableName: testAWSDynamoDBSmallTable, + AwsRegion: testAWSDynamoDBStreamsRegion, awsAuthorization: awsutils.AuthorizationMetadata{ PodIdentityOwner: false, }, @@ -370,10 +370,10 @@ var awsDynamoDBStreamMetricIdentifiers = []awsDynamoDBStreamsMetricIdentifier{ } var awsDynamoDBStreamsGetMetricTestData = []*awsDynamoDBStreamsMetadata{ - {tableName: testAWSDynamoDBBigTable}, - {tableName: testAWSDynamoDBSmallTable}, - {tableName: testAWSDynamoDBErrorTable}, - {tableName: testAWSDynamoDBInvalidTable}, + {TableName: testAWSDynamoDBBigTable}, + {TableName: testAWSDynamoDBSmallTable}, + {TableName: testAWSDynamoDBErrorTable}, + {TableName: testAWSDynamoDBInvalidTable}, } func TestParseAwsDynamoDBStreamsMetadata(t *testing.T) { @@ -399,7 +399,7 @@ func TestAwsDynamoDBStreamsGetMetricSpecForScaling(t *testing.T) { if err != nil { t.Fatal("Could not parse metadata:", err) } - streamArn, err := getDynamoDBStreamsArn(ctx, &mockAwsDynamoDB{}, &meta.tableName) + streamArn, err := getDynamoDBStreamsArn(ctx, &mockAwsDynamoDB{}, &meta.TableName) if err != nil { t.Fatal("Could not get dynamodb stream arn:", err) } @@ -418,12 +418,12 @@ func TestAwsDynamoDBStreamsScalerGetMetrics(t *testing.T) { var err error var streamArn *string ctx := context.Background() - streamArn, err = getDynamoDBStreamsArn(ctx, &mockAwsDynamoDB{}, &meta.tableName) + streamArn, err = getDynamoDBStreamsArn(ctx, &mockAwsDynamoDB{}, &meta.TableName) if err == nil { scaler := awsDynamoDBStreamsScaler{"", meta, streamArn, &mockAwsDynamoDBStreams{}, logr.Discard()} value, _, err = scaler.GetMetricsAndActivity(context.Background(), "MetricName") } - switch meta.tableName { + switch meta.TableName { case testAWSDynamoDBErrorTable: assert.Error(t, err, "expect error because of dynamodb stream api error") case testAWSDynamoDBInvalidTable: @@ -442,12 +442,12 @@ func TestAwsDynamoDBStreamsScalerIsActive(t *testing.T) { var err error var streamArn *string ctx := context.Background() - streamArn, err = getDynamoDBStreamsArn(ctx, &mockAwsDynamoDB{}, &meta.tableName) + streamArn, err = getDynamoDBStreamsArn(ctx, &mockAwsDynamoDB{}, &meta.TableName) if err == nil { scaler := awsDynamoDBStreamsScaler{"", meta, streamArn, &mockAwsDynamoDBStreams{}, logr.Discard()} _, value, err = scaler.GetMetricsAndActivity(context.Background(), "MetricName") } - switch meta.tableName { + switch meta.TableName { case testAWSDynamoDBErrorTable: assert.Error(t, err, "expect error because of dynamodb stream api error") case testAWSDynamoDBInvalidTable: From 299412507a3103da92cc505fcf4d6aa194fdb323 Mon Sep 17 00:00:00 2001 From: dttung2905 Date: Sun, 18 Aug 2024 21:16:09 +0100 Subject: [PATCH 2/2] Fix import style Signed-off-by: dttung2905 --- pkg/scalers/aws_dynamodb_streams_scaler.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/scalers/aws_dynamodb_streams_scaler.go b/pkg/scalers/aws_dynamodb_streams_scaler.go index 4756041de71..477389b0ef8 100644 --- a/pkg/scalers/aws_dynamodb_streams_scaler.go +++ b/pkg/scalers/aws_dynamodb_streams_scaler.go @@ -3,13 +3,14 @@ package scalers import ( "context" "fmt" + "strconv" + "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/dynamodb" "github.com/aws/aws-sdk-go-v2/service/dynamodbstreams" "github.com/go-logr/logr" v2 "k8s.io/api/autoscaling/v2" "k8s.io/metrics/pkg/apis/external_metrics" - "strconv" awsutils "github.com/kedacore/keda/v2/pkg/scalers/aws" "github.com/kedacore/keda/v2/pkg/scalers/scalersconfig"