From 079fda9e46f70d8544568ed2ca941416212356d0 Mon Sep 17 00:00:00 2001 From: Josh Liburdi Date: Thu, 14 Mar 2024 08:09:28 -0700 Subject: [PATCH] feat(autoscale): Dynamic Kinesis Scaling with Thresholds (#144) * feat(autoscale): Implement Dynamic Scaling * chore(autoscale): Adjust Dynamic Scaling * fix(terraform): Add Tags IAM Permission * refactor(cloudwatch): Pin Datapoints to Eval Period * docs(cloudwatch): Removed Reference to Dynamic Eval Periods --- .../terraform/aws/kinesis_data_stream/main.tf | 18 +++-- cmd/aws/lambda/README.md | 16 ++-- cmd/aws/lambda/autoscale/main.go | 81 ++++++++++++++----- internal/aws/cloudwatch/cloudwatch.go | 78 +++++++++++------- internal/aws/kinesis/kinesis.go | 16 ++++ 5 files changed, 145 insertions(+), 64 deletions(-) diff --git a/build/terraform/aws/kinesis_data_stream/main.tf b/build/terraform/aws/kinesis_data_stream/main.tf index d079ca6d..5c083b5c 100644 --- a/build/terraform/aws/kinesis_data_stream/main.tf +++ b/build/terraform/aws/kinesis_data_stream/main.tf @@ -6,11 +6,12 @@ resource "aws_kinesis_stream" "stream" { retention_period = var.config.retention encryption_type = var.kms != null ? "KMS" : "NONE" kms_key_id = var.kms != null ? var.kms.id : null - lifecycle { - ignore_changes = [shard_count] - } tags = var.tags + + lifecycle { + ignore_changes = [shard_count, tags] + } } # Applies the policy to each role in the access list. @@ -44,14 +45,15 @@ data "aws_iam_policy_document" "access" { statement { effect = "Allow" actions = [ + "kinesis:AddTagsToStream", "kinesis:DescribeStream*", "kinesis:GetRecords", "kinesis:GetShardIterator", "kinesis:ListShards", "kinesis:ListStreams", + "kinesis:ListTagsForStream", "kinesis:PutRecord*", "kinesis:SubscribeToShard", - "kinesis:SubscribeToShard", "kinesis:RegisterStreamConsumer", "kinesis:UpdateShardCount", ] @@ -84,10 +86,11 @@ resource "aws_cloudwatch_metric_alarm" "metric_alarm_downscale" { actions_enabled = true alarm_actions = [var.config.autoscaling_topic] evaluation_periods = 60 - datapoints_to_alarm = 57 - threshold = 0.25 + datapoints_to_alarm = 60 + threshold = 0.35 comparison_operator = "LessThanOrEqualToThreshold" treat_missing_data = "ignore" + lifecycle { ignore_changes = [metric_query, datapoints_to_alarm] } @@ -169,9 +172,10 @@ resource "aws_cloudwatch_metric_alarm" "metric_alarm_upscale" { alarm_actions = [var.config.autoscaling_topic] evaluation_periods = 5 datapoints_to_alarm = 5 - threshold = 0.75 + threshold = 0.70 comparison_operator = "GreaterThanOrEqualToThreshold" treat_missing_data = "ignore" + lifecycle { ignore_changes = [metric_query, datapoints_to_alarm] } diff --git a/cmd/aws/lambda/README.md b/cmd/aws/lambda/README.md index 5760b476..bb6b913c 100644 --- a/cmd/aws/lambda/README.md +++ b/cmd/aws/lambda/README.md @@ -18,18 +18,18 @@ This app handles ingest, transform, and load for data from these AWS services: ## autoscale -This app handles Kinesis Data Stream autoscaling through SNS notifications and CloudWatch alarms. The scaling behavior is to scale up / out if stream utilization is greater than 75% of the Kinesis service limits within a 5 minute period and scale down / in if stream utilization is less than 25% of the Kinesis service limits within a 60 minute period. In both cases, streams scale by 50%. +This app handles Kinesis Data Stream autoscaling through SNS notifications and CloudWatch alarms. Scaling is based on stream capacity as determined by the number and size of incoming records written to the stream. By default, the scaling behavior follows this pattern: -Stream utilization is based on volume (i.e., 60, 000 events per minute) and size (i.e., 10GB data per minute); these values are converted to a percentage (0.0 to 1.0) and the maximum of either is considered the stream's current utilization. +* If stream utilization is greater than 70% of the Kinesis service limits consistently within a 5 minute period, then scale up +* If stream utilization is less than 35% of the Kinesis service limits consistently within a 60 minute period, then scale down -By default, streams must be above the upper threshold for all 5 minutes to scale up and below the lower threshold for at least 57 minutes to scale down. These values can be overriden by the environment variables AUTOSCALE_KINESIS_UPSCALE_DATAPOINTS (cannot exceed 5 minutes) and AUTOSCALE_KINESIS_DOWNSCALE_DATAPOINTS (cannot exceed 60 minutes). +The scaling behavior is customizable using environment variables: -For example: +* `AUTOSCALE_KINESIS_THRESHOLD` - The target threshold to cause a scaling event. The default value is 0.7 (70%), but it can be set to any value between 0.4 (40%) and 0.9 (90%). If the threshold is low, then the stream is more sensitive to scaling up and less sensitive to scaling down. If the threshold is high, then the stream is less sensitive to scaling up and more sensitive to scaling down. +* `AUTOSCALE_KINESIS_UPSCALE_DATAPOINTS` - The number of data points required to scale up. The default value is 5, but it can be set to any value between 1 and 30. The number of data points affects the evaluation period; every 5 data points is equivalent to 5 minutes and the maximum evaluation period is 30 minutes. Use a higher value to reduce the frequency of scaling up. +* `AUTOSCALE_KINESIS_DOWNSCALE_DATAPOINTS` - The number of data points required to scale down. The default value is 60, but it can be set to any value between 1 and 360. The number of data points affects the evaluation period; every 60 data points is equivalent to 1 hour and the maximum evaluation period is 6 hours. Use a higher value to reduce the frequency of scaling down. -* If a stream is configured with 10 shards and it triggers the upscale alarm, then the stream is scaled up to 15 shards -* If a stream is configured with 10 shards and it triggers the downscale alarm, then the stream is scaled down to 5 shards - -Shards will not scale evenly, but the autoscaling functionality follows [AWS best practices for resharding streams](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_UpdateShardCount.html). UpdateShardCount has many limitations that the application is designed around, but there may be times when these limits cannot be avoided; if any limits are met, then users should file a service limit increase with AWS. Although rare, the most common service limits that users may experience are: +Shards do not scale evenly, but the autoscaling follows [AWS best practices for resharding streams](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_UpdateShardCount.html). UpdateShardCount has many limitations that the application is designed around, but there may be times when these limits cannot be avoided; if any limits are met, then users should file a service limit increase with AWS. Although rare, the most common service limits that users may experience are: * Scaling a stream more than 10 times per 24 hour rolling period * Scaling a stream beyond 10000 shards diff --git a/cmd/aws/lambda/autoscale/main.go b/cmd/aws/lambda/autoscale/main.go index 883cb6d8..9a83b998 100644 --- a/cmd/aws/lambda/autoscale/main.go +++ b/cmd/aws/lambda/autoscale/main.go @@ -6,6 +6,7 @@ import ( "math" "strconv" "strings" + "time" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" @@ -16,10 +17,6 @@ import ( "github.com/tidwall/gjson" ) -const ( - autoscalePercentage = 50.0 -) - var ( cloudwatchAPI cloudwatch.API kinesisAPI kinesis.API @@ -43,7 +40,7 @@ func handler(ctx context.Context, snsEvent events.SNSEvent) error { alarmName := gjson.Get(message, "AlarmName").String() triggerMetrics := gjson.Get(message, "Trigger.Metrics") - log.WithField("alarm", alarmName).Info("received autoscale notification") + log.WithField("alarm", alarmName).Debug("Received autoscale notification.") var stream string for _, v := range triggerMetrics.Array() { @@ -53,23 +50,25 @@ func handler(ctx context.Context, snsEvent events.SNSEvent) error { break } } - log.WithField("alarm", alarmName).WithField("stream", stream).Info("parsed Kinesis stream") + log.WithField("alarm", alarmName).WithField("stream", stream).Debug("Parsed Kinesis stream.") shards, err := kinesisAPI.ActiveShards(ctx, stream) if err != nil { return fmt.Errorf("handler: %v", err) } log.WithField("alarm", alarmName).WithField("stream", stream).WithField("count", shards). - Info("retrieved active shard count") + Info("Retrieved active shard count.") var newShards int64 if strings.Contains(alarmName, "upscale") { - newShards = upscale(float64(shards), autoscalePercentage) + newShards = upscale(float64(shards)) } if strings.Contains(alarmName, "downscale") { - newShards = downscale(float64(shards), autoscalePercentage) + newShards = downscale(float64(shards)) } + log.WithField("alarm", alarmName).WithField("stream", stream).WithField("count", newShards).Info("Calculated new shard count.") + tags, err := kinesisAPI.GetTags(ctx, stream) if err != nil { return fmt.Errorf("handler: %v", err) @@ -83,7 +82,7 @@ func handler(ctx context.Context, snsEvent events.SNSEvent) error { return fmt.Errorf("handler: %v", err) } - log.WithField("stream", stream).WithField("count", minShard).Info("retrieved minimum shard count") + log.WithField("stream", stream).WithField("count", minShard).Debug("Retrieved minimum shard count.") } if *tag.Key == "MaximumShards" { @@ -92,7 +91,28 @@ func handler(ctx context.Context, snsEvent events.SNSEvent) error { return fmt.Errorf("handler: %v", err) } - log.WithField("stream", stream).WithField("count", maxShard).Info("retrieved maximum shard count") + log.WithField("stream", stream).WithField("count", maxShard).Debug("Retrieved maximum shard count.") + } + + // Tracking the last scaling event prevents scaling from occurring too frequently. + // If the current scaling event is an upscale, then the last scaling event must be at least 3 minutes ago. + // If the current scaling event is a downscale, then the last scaling event must be at least 30 minutes ago. + if *tag.Key == "LastScalingEvent" { + lastScalingEvent, err := time.Parse(time.RFC3339, *tag.Value) + if err != nil { + return fmt.Errorf("handler: %v", err) + } + + if (time.Since(lastScalingEvent) < 3*time.Minute && strings.Contains(alarmName, "upscale")) || + (time.Since(lastScalingEvent) < 30*time.Minute && strings.Contains(alarmName, "downscale")) { + log.WithField("stream", stream).WithField("time", lastScalingEvent).Info("Last scaling event is too recent.") + + if err := cloudwatchAPI.UpdateKinesisAlarmState(ctx, alarmName, "Last scaling event is too recent"); err != nil { + return fmt.Errorf("handler: %v", err) + } + + return nil + } } } @@ -109,32 +129,55 @@ func handler(ctx context.Context, snsEvent events.SNSEvent) error { } if newShards == shards { - log.WithField("alarm", alarmName).WithField("stream", stream).WithField("count", shards).Info("active shard count is at minimum threshold, no updates necessary") + log.WithField("alarm", alarmName).WithField("stream", stream).WithField("count", shards).Info("Active shard count is at minimum threshold, no change is required.") return nil } if err := kinesisAPI.UpdateShards(ctx, stream, newShards); err != nil { return fmt.Errorf("handler: %v", err) } - log.WithField("alarm", alarmName).WithField("stream", stream).WithField("count", newShards).Info("updated shards") + + if err := kinesisAPI.UpdateTag(ctx, stream, "LastScalingEvent", time.Now().Format(time.RFC3339)); err != nil { + return fmt.Errorf("handler: %v", err) + } + + log.WithField("alarm", alarmName).WithField("stream", stream).WithField("count", newShards).Info("Updated shard count.") if err := cloudwatchAPI.UpdateKinesisDownscaleAlarm(ctx, stream+"_downscale", stream, topicArn, newShards); err != nil { return fmt.Errorf("handler: %v", err) } - log.WithField("alarm", stream+"_downscale").WithField("stream", stream).WithField("count", newShards).Info("reset alarm") + log.WithField("alarm", stream+"_downscale").WithField("stream", stream).WithField("count", newShards).Debug("Reset CloudWatch alarm.") if err := cloudwatchAPI.UpdateKinesisUpscaleAlarm(ctx, stream+"_upscale", stream, topicArn, newShards); err != nil { return fmt.Errorf("handler: %v", err) } - log.WithField("alarm", stream+"_upscale").WithField("stream", stream).WithField("count", newShards).Info("reset alarm") + log.WithField("alarm", stream+"_upscale").WithField("stream", stream).WithField("count", newShards).Debug("Reset CloudWatch alarm.") return nil } -func downscale(shards, pct float64) int64 { - return int64(math.Ceil(shards - (shards * (pct / 100)))) +func downscale(shards float64) int64 { + switch { + case shards < 5: + return int64(math.Ceil(shards / 2)) + case shards < 13: + return int64(math.Ceil(shards / 1.75)) + case shards < 33: + return int64(math.Ceil(shards / 1.5)) + default: + return int64(math.Ceil(shards / 1.25)) + } } -func upscale(shards, pct float64) int64 { - return int64(math.Ceil(shards + (shards * (pct / 100)))) +func upscale(shards float64) int64 { + switch { + case shards < 5: + return int64(math.Floor(shards * 2)) + case shards < 13: + return int64(math.Floor(shards * 1.75)) + case shards < 33: + return int64(math.Floor(shards * 1.5)) + default: + return int64(math.Floor(shards * 1.25)) + } } diff --git a/internal/aws/cloudwatch/cloudwatch.go b/internal/aws/cloudwatch/cloudwatch.go index bf64252e..c1e0a6d9 100644 --- a/internal/aws/cloudwatch/cloudwatch.go +++ b/internal/aws/cloudwatch/cloudwatch.go @@ -13,40 +13,54 @@ import ( ) const ( + // This is the period in seconds that the AWS Kinesis CloudWatch alarms + // will evaluate the metrics over. kinesisMetricsPeriod = 60 - // AWS Kinesis streams will scale down / in if they are less than 25% of the Kinesis service limits within a 60 minute / 1 hour period. - kinesisDownscaleEvaluationPeriod, kinesisDownscaleThreshold = 60, 0.25 - // AWS Kinesis streams will scale up / out if they are greater than 75% of the Kinesis service limits within a 5 minute period. - kinesisUpscaleEvaluationPeriod, kinesisUpscaleThreshold = 5, 0.75 ) var ( - // By default, AWS Kinesis streams must be below the lower threshold for 95% of the evaluation period (57 minutes) to scale down. This value can be overridden by the environment variable AUTOSCALE_KINESIS_DOWNSCALE_DATAPOINTS, but it cannot exceed 60 minutes. - kinesisDownscaleDatapoints = 57 - // By default, AWS Kinesis streams must be above the upper threshold for 100% of the evaluation period (5 minutes) to scale up. This value can be overridden by the environment variable AUTOSCALE_KINESIS_UPSCALE_DATAPOINTS, but it cannot exceed 5 minutes. + // By default, AWS Kinesis streams must be below the lower threshold for + // 100% of the evaluation period (60 minutes) to scale down. This value can + // be overridden by the environment variable AUTOSCALE_KINESIS_DOWNSCALE_DATAPOINTS. + kinesisDownscaleDatapoints = 60 + // By default, AWS Kinesis streams must be above the upper threshold for + // 100% of the evaluation period (5 minutes) to scale up. This value can + // be overridden by the environment variable AUTOSCALE_KINESIS_UPSCALE_DATAPOINTS. kinesisUpscaleDatapoints = 5 + // By default, AWS Kinesis streams will scale up if the incoming records and bytes + // are above 70% of the threshold. This value can be overridden by the environment + // variable AUTOSCALE_KINESIS_THRESHOLD, but it cannot be less than 40% or greater + // than 90%. + kinesisThreshold = 0.7 ) func init() { if v, found := os.LookupEnv("AUTOSCALE_KINESIS_DOWNSCALE_DATAPOINTS"); found { - downscale, err := strconv.Atoi(v) + dps, err := strconv.Atoi(v) if err != nil { panic(err) } - if downscale <= kinesisDownscaleEvaluationPeriod { - kinesisDownscaleDatapoints = downscale - } + kinesisDownscaleDatapoints = dps } if v, found := os.LookupEnv("AUTOSCALE_KINESIS_UPSCALE_DATAPOINTS"); found { - upscale, err := strconv.Atoi(v) + dps, err := strconv.Atoi(v) if err != nil { panic(err) } - if upscale <= kinesisUpscaleEvaluationPeriod { - kinesisUpscaleDatapoints = upscale + kinesisUpscaleDatapoints = dps + } + + if v, found := os.LookupEnv("AUTOSCALE_KINESIS_THRESHOLD"); found { + threshold, err := strconv.ParseFloat(v, 64) + if err != nil { + panic(err) + } + + if threshold >= 0.4 && threshold <= 0.9 { + kinesisThreshold = threshold } } } @@ -80,6 +94,8 @@ func (a *API) IsEnabled() bool { // UpdateKinesisDownscaleAlarm updates CloudWatch alarms that manage the scale down tracking for Kinesis streams. func (a *API) UpdateKinesisDownscaleAlarm(ctx aws.Context, name, stream, topic string, shards int64) error { + downscaleThreshold := kinesisThreshold - 0.35 + if _, err := a.Client.PutMetricAlarmWithContext( ctx, &cloudwatch.PutMetricAlarmInput{ @@ -87,9 +103,9 @@ func (a *API) UpdateKinesisDownscaleAlarm(ctx aws.Context, name, stream, topic s AlarmDescription: aws.String(stream), ActionsEnabled: aws.Bool(true), AlarmActions: []*string{aws.String(topic)}, - EvaluationPeriods: aws.Int64(kinesisDownscaleEvaluationPeriod), + EvaluationPeriods: aws.Int64(int64(kinesisDownscaleDatapoints)), DatapointsToAlarm: aws.Int64(int64(kinesisDownscaleDatapoints)), - Threshold: aws.Float64(kinesisDownscaleThreshold), + Threshold: aws.Float64(downscaleThreshold), ComparisonOperator: aws.String("LessThanOrEqualToThreshold"), TreatMissingData: aws.String("ignore"), Metrics: []*cloudwatch.MetricDataQuery{ @@ -170,12 +186,7 @@ func (a *API) UpdateKinesisDownscaleAlarm(ctx aws.Context, name, stream, topic s return fmt.Errorf("updatealarm alarm %s stream %s: %v", name, stream, err) } - if _, err := a.Client.SetAlarmStateWithContext(ctx, - &cloudwatch.SetAlarmStateInput{ - AlarmName: aws.String(name), - StateValue: aws.String("INSUFFICIENT_DATA"), - StateReason: aws.String("Threshold value updated"), - }); err != nil { + if err := a.UpdateKinesisAlarmState(ctx, name, "Threshold value updated"); err != nil { return fmt.Errorf("updatealarm alarm %s stream %s: %v", name, stream, err) } @@ -184,6 +195,8 @@ func (a *API) UpdateKinesisDownscaleAlarm(ctx aws.Context, name, stream, topic s // UpdateKinesisUpscaleAlarm updates CloudWatch alarms that manage the scale up tracking for Kinesis streams. func (a *API) UpdateKinesisUpscaleAlarm(ctx aws.Context, name, stream, topic string, shards int64) error { + upscaleThreshold := kinesisThreshold + if _, err := a.Client.PutMetricAlarmWithContext( ctx, &cloudwatch.PutMetricAlarmInput{ @@ -191,9 +204,9 @@ func (a *API) UpdateKinesisUpscaleAlarm(ctx aws.Context, name, stream, topic str AlarmDescription: aws.String(stream), ActionsEnabled: aws.Bool(true), AlarmActions: []*string{aws.String(topic)}, - EvaluationPeriods: aws.Int64(kinesisUpscaleEvaluationPeriod), + EvaluationPeriods: aws.Int64(int64(kinesisUpscaleDatapoints)), DatapointsToAlarm: aws.Int64(int64(kinesisUpscaleDatapoints)), - Threshold: aws.Float64(kinesisUpscaleThreshold), + Threshold: aws.Float64(upscaleThreshold), ComparisonOperator: aws.String("GreaterThanOrEqualToThreshold"), TreatMissingData: aws.String("ignore"), Metrics: []*cloudwatch.MetricDataQuery{ @@ -274,14 +287,19 @@ func (a *API) UpdateKinesisUpscaleAlarm(ctx aws.Context, name, stream, topic str return fmt.Errorf("updatealarm alarm %s stream %s: %v", name, stream, err) } - if _, err := a.Client.SetAlarmStateWithContext(ctx, - &cloudwatch.SetAlarmStateInput{ - AlarmName: aws.String(name), - StateValue: aws.String("INSUFFICIENT_DATA"), - StateReason: aws.String("Threshold value updated"), - }); err != nil { + if err := a.UpdateKinesisAlarmState(ctx, name, "Threshold value updated"); err != nil { return fmt.Errorf("updatealarm alarm %s stream %s: %v", name, stream, err) } return nil } + +func (a *API) UpdateKinesisAlarmState(ctx aws.Context, name, reason string) error { + _, err := a.Client.SetAlarmStateWithContext(ctx, + &cloudwatch.SetAlarmStateInput{ + AlarmName: aws.String(name), + StateValue: aws.String("INSUFFICIENT_DATA"), + StateReason: aws.String(reason), + }) + return err +} diff --git a/internal/aws/kinesis/kinesis.go b/internal/aws/kinesis/kinesis.go index 79f86b49..b98858d1 100644 --- a/internal/aws/kinesis/kinesis.go +++ b/internal/aws/kinesis/kinesis.go @@ -340,3 +340,19 @@ func (a *API) GetTags(ctx aws.Context, stream string) ([]*kinesis.Tag, error) { return tags, nil } + +// UpdateTag updates a tag on a Kinesis stream. +func (a *API) UpdateTag(ctx aws.Context, stream, key, value string) error { + input := &kinesis.AddTagsToStreamInput{ + StreamName: aws.String(stream), + Tags: map[string]*string{ + key: aws.String(value), + }, + } + + if _, err := a.Client.AddTagsToStreamWithContext(ctx, input); err != nil { + return fmt.Errorf("updatetag stream %s key %s value %s: %v", stream, key, value, err) + } + + return nil +}