diff --git a/CHANGELOG.md b/CHANGELOG.md index 41c130ce1ae..5fff0140bdd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -46,7 +46,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio ### New - **CPU/Memory scaler**: Add support for scale to zero if there are multiple triggers([#4269](https://github.com/kedacore/keda/issues/4269)) -- TODO ([#XXX](https://github.com/kedacore/keda/issue/XXX)) +- **Redis Scalers**: Allow scaling using redis stream length ([#4277](https://github.com/kedacore/keda/issues/4277)) - **General:** Introduce new Solr Scaler ([#4234](https://github.com/kedacore/keda/issues/4234)) ### Improvements diff --git a/pkg/scalers/redis_streams_scaler.go b/pkg/scalers/redis_streams_scaler.go index 6d481cb7860..98200f24ec4 100644 --- a/pkg/scalers/redis_streams_scaler.go +++ b/pkg/scalers/redis_streams_scaler.go @@ -14,13 +14,21 @@ import ( kedautil "github.com/kedacore/keda/v2/pkg/util" ) +type scaleFactor int8 + +const ( + xPendingFactor scaleFactor = iota + 1 + xLengthFactor +) + const ( // defaults - defaultTargetPendingEntriesCount = 5 - defaultDBIndex = 0 + defaultDBIndex = 0 + defaultTargetEntries = 5 // metadata names pendingEntriesCountMetadata = "pendingEntriesCount" + streamLengthMetadata = "streamLength" streamNameMetadata = "stream" consumerGroupNameMetadata = "consumerGroup" usernameMetadata = "username" @@ -30,15 +38,17 @@ const ( ) type redisStreamsScaler struct { - metricType v2.MetricTargetType - metadata *redisStreamsMetadata - closeFn func() error - getPendingEntriesCountFn func(ctx context.Context) (int64, error) - logger logr.Logger + metricType v2.MetricTargetType + metadata *redisStreamsMetadata + closeFn func() error + getEntriesCountFn func(ctx context.Context) (int64, error) + logger logr.Logger } type redisStreamsMetadata struct { + scaleFactor scaleFactor targetPendingEntriesCount int64 + targetStreamLength int64 streamName string consumerGroupName string databaseIndex int @@ -89,21 +99,15 @@ func createClusteredRedisStreamsScaler(ctx context.Context, meta *redisStreamsMe return nil } - pendingEntriesCountFn := func(ctx context.Context) (int64, error) { - pendingEntries, err := client.XPending(ctx, meta.streamName, meta.consumerGroupName).Result() - if err != nil { - return -1, err - } - return pendingEntries.Count, nil - } + entriesCountFn, err := createEntriesCountFn(client, meta) return &redisStreamsScaler{ - metricType: metricType, - metadata: meta, - closeFn: closeFn, - getPendingEntriesCountFn: pendingEntriesCountFn, - logger: logger, - }, nil + metricType: metricType, + metadata: meta, + closeFn: closeFn, + getEntriesCountFn: entriesCountFn, + logger: logger, + }, err } func createSentinelRedisStreamsScaler(ctx context.Context, meta *redisStreamsMetadata, metricType v2.MetricTargetType, logger logr.Logger) (Scaler, error) { @@ -133,27 +137,42 @@ func createScaler(client *redis.Client, meta *redisStreamsMetadata, metricType v return nil } - pendingEntriesCountFn := func(ctx context.Context) (int64, error) { - pendingEntries, err := client.XPending(ctx, meta.streamName, meta.consumerGroupName).Result() - if err != nil { - return -1, err - } - return pendingEntries.Count, nil - } + entriesCountFn, err := createEntriesCountFn(client, meta) return &redisStreamsScaler{ - metricType: metricType, - metadata: meta, - closeFn: closeFn, - getPendingEntriesCountFn: pendingEntriesCountFn, - logger: logger, - }, nil + metricType: metricType, + metadata: meta, + closeFn: closeFn, + getEntriesCountFn: entriesCountFn, + logger: logger, + }, err } -var ( - // ErrRedisMissingPendingEntriesCount is returned when "pendingEntriesCount" is missing. - ErrRedisMissingPendingEntriesCount = errors.New("missing pending entries count") +func createEntriesCountFn(client redis.Cmdable, meta *redisStreamsMetadata) (entriesCountFn func(ctx context.Context) (int64, error), err error) { + switch meta.scaleFactor { + case xPendingFactor: + entriesCountFn = func(ctx context.Context) (int64, error) { + pendingEntries, err := client.XPending(ctx, meta.streamName, meta.consumerGroupName).Result() + if err != nil { + return -1, err + } + return pendingEntries.Count, nil + } + case xLengthFactor: + entriesCountFn = func(ctx context.Context) (int64, error) { + entriesLength, err := client.XLen(ctx, meta.streamName).Result() + if err != nil { + return -1, err + } + return entriesLength, nil + } + default: + err = fmt.Errorf("unrecognized scale factor %v", meta.scaleFactor) + } + return +} +var ( // ErrRedisMissingStreamName is returned when "stream" is missing. ErrRedisMissingStreamName = errors.New("missing redis stream name") ) @@ -185,18 +204,6 @@ func parseRedisStreamsMetadata(config *ScalerConfig, parseFn redisAddressParser) meta.connectionInfo.unsafeSsl = parsedVal } - meta.targetPendingEntriesCount = defaultTargetPendingEntriesCount - - if val, ok := config.TriggerMetadata[pendingEntriesCountMetadata]; ok { - pendingEntriesCount, err := strconv.ParseInt(val, 10, 64) - if err != nil { - return nil, fmt.Errorf("error parsing pending entries count: %w", err) - } - meta.targetPendingEntriesCount = pendingEntriesCount - } else { - return nil, ErrRedisMissingPendingEntriesCount - } - if val, ok := config.TriggerMetadata[streamNameMetadata]; ok { meta.streamName = val } else { @@ -205,8 +212,25 @@ func parseRedisStreamsMetadata(config *ScalerConfig, parseFn redisAddressParser) if val, ok := config.TriggerMetadata[consumerGroupNameMetadata]; ok { meta.consumerGroupName = val + meta.scaleFactor = xPendingFactor + meta.targetPendingEntriesCount = defaultTargetEntries + if val, ok := config.TriggerMetadata[pendingEntriesCountMetadata]; ok { + pendingEntriesCount, err := strconv.ParseInt(val, 10, 64) + if err != nil { + return nil, fmt.Errorf("error parsing pending entries count: %w", err) + } + meta.targetPendingEntriesCount = pendingEntriesCount + } } else { - return nil, fmt.Errorf("missing redis stream consumer group name") + meta.scaleFactor = xLengthFactor + meta.targetStreamLength = defaultTargetEntries + if val, ok := config.TriggerMetadata[streamLengthMetadata]; ok { + streamLength, err := strconv.ParseInt(val, 10, 64) + if err != nil { + return nil, fmt.Errorf("error parsing stream length: %w", err) + } + meta.targetStreamLength = streamLength + } } meta.databaseIndex = defaultDBIndex @@ -228,11 +252,20 @@ func (s *redisStreamsScaler) Close(context.Context) error { // GetMetricSpecForScaling returns the metric spec for the HPA func (s *redisStreamsScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec { + var metricValue int64 + + switch s.metadata.scaleFactor { + case xPendingFactor: + metricValue = s.metadata.targetPendingEntriesCount + case xLengthFactor: + metricValue = s.metadata.targetStreamLength + } + externalMetric := &v2.ExternalMetricSource{ Metric: v2.MetricIdentifier{ Name: GenerateMetricNameWithIndex(s.metadata.scalerIndex, kedautil.NormalizeString(fmt.Sprintf("redis-streams-%s", s.metadata.streamName))), }, - Target: GetMetricTarget(s.metricType, s.metadata.targetPendingEntriesCount), + Target: GetMetricTarget(s.metricType, metricValue), } metricSpec := v2.MetricSpec{External: externalMetric, Type: externalMetricType} return []v2.MetricSpec{metricSpec} @@ -240,7 +273,7 @@ func (s *redisStreamsScaler) GetMetricSpecForScaling(context.Context) []v2.Metri // GetMetricsAndActivity fetches the number of pending entries for a consumer group in a stream func (s *redisStreamsScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) { - pendingEntriesCount, err := s.getPendingEntriesCountFn(ctx) + pendingEntriesCount, err := s.getEntriesCountFn(ctx) if err != nil { s.logger.Error(err, "error fetching pending entries count") diff --git a/pkg/scalers/redis_streams_scaler_test.go b/pkg/scalers/redis_streams_scaler_test.go index 2eadc19ad3e..bb9cadfc774 100644 --- a/pkg/scalers/redis_streams_scaler_test.go +++ b/pkg/scalers/redis_streams_scaler_test.go @@ -93,13 +93,11 @@ func TestParseRedisStreamsMetadataForInvalidCases(t *testing.T) { {"missing stream", map[string]string{"pendingEntriesCount": "5", "consumerGroup": "my-stream-consumer-group", "address": "REDIS_HOST"}, resolvedEnvMap}, - {"missing consumerGroup", map[string]string{"stream": "my-stream", "pendingEntriesCount": "5", "address": "REDIS_HOST"}, resolvedEnvMap}, - - {"missing pendingEntriesCount", map[string]string{"stream": "my-stream", "consumerGroup": "my-stream-consumer-group", "address": "REDIS_HOST"}, resolvedEnvMap}, - // invalid value for respective fields {"invalid pendingEntriesCount", map[string]string{"stream": "my-stream", "consumerGroup": "my-stream-consumer-group", "pendingEntriesCount": "junk", "host": "REDIS_HOST", "port": "REDIS_PORT", "databaseIndex": "0", "enableTLS": "false"}, resolvedEnvMap}, + {"invalid streamLength", map[string]string{"stream": "my-stream", "streamLength": "junk", "host": "REDIS_HOST", "port": "REDIS_PORT", "databaseIndex": "0", "enableTLS": "false"}, resolvedEnvMap}, + {"invalid databaseIndex", map[string]string{"stream": "my-stream", "consumerGroup": "my-stream-consumer-group", "pendingEntriesCount": "15", "address": "REDIS_SERVER", "databaseIndex": "junk", "enableTLS": "false"}, resolvedEnvMap}, {"invalid enableTLS", map[string]string{"stream": "my-stream", "consumerGroup": "my-stream-consumer-group", "pendingEntriesCount": "15", "address": "REDIS_SERVER", "databaseIndex": "1", "enableTLS": "no"}, resolvedEnvMap}, @@ -188,21 +186,13 @@ func TestParseRedisClusterStreamsMetadata(t *testing.T) { wantMeta: nil, wantErr: ErrRedisMissingStreamName, }, - { - name: "missing pending entries count", - metadata: map[string]string{ - "hosts": "a, b, c", - "ports": "1, 2, 3", - "stream": "my-stream", - }, - wantMeta: nil, - wantErr: ErrRedisMissingPendingEntriesCount, - }, { name: "invalid pending entries count", metadata: map[string]string{ + "stream": "my-stream", "hosts": "a, b, c", "ports": "1, 2, 3", + "consumerGroup": "consumer1", "pendingEntriesCount": "invalid", }, wantMeta: nil, @@ -225,6 +215,7 @@ func TestParseRedisClusterStreamsMetadata(t *testing.T) { connectionInfo: redisConnectionInfo{ addresses: []string{":7001", ":7002"}, }, + scaleFactor: xPendingFactor, }, wantErr: nil, }, @@ -248,6 +239,7 @@ func TestParseRedisClusterStreamsMetadata(t *testing.T) { hosts: []string{"a", "b", "c"}, ports: []string{"1", "2", "3"}, }, + scaleFactor: xPendingFactor, }, wantErr: nil, }, @@ -273,6 +265,7 @@ func TestParseRedisClusterStreamsMetadata(t *testing.T) { ports: []string{"1", "2", "3"}, username: "username", }, + scaleFactor: xPendingFactor, }, wantErr: nil, }, @@ -297,6 +290,7 @@ func TestParseRedisClusterStreamsMetadata(t *testing.T) { ports: []string{"1", "2", "3"}, username: "username", }, + scaleFactor: xPendingFactor, }, wantErr: nil, }, @@ -322,6 +316,7 @@ func TestParseRedisClusterStreamsMetadata(t *testing.T) { ports: []string{"1", "2", "3"}, username: "none", }, + scaleFactor: xPendingFactor, }, wantErr: nil, }, @@ -347,6 +342,7 @@ func TestParseRedisClusterStreamsMetadata(t *testing.T) { ports: []string{"1", "2", "3"}, password: "password", }, + scaleFactor: xPendingFactor, }, wantErr: nil, }, @@ -372,6 +368,7 @@ func TestParseRedisClusterStreamsMetadata(t *testing.T) { ports: []string{"1", "2", "3"}, password: "none", }, + scaleFactor: xPendingFactor, }, wantErr: nil, }, @@ -400,6 +397,7 @@ func TestParseRedisClusterStreamsMetadata(t *testing.T) { enableTLS: true, unsafeSsl: false, }, + scaleFactor: xPendingFactor, }, wantErr: nil, }, @@ -429,6 +427,45 @@ func TestParseRedisClusterStreamsMetadata(t *testing.T) { enableTLS: true, unsafeSsl: true, }, + scaleFactor: xPendingFactor, + }, + wantErr: nil, + }, + { + name: "stream is provided", + metadata: map[string]string{ + "stream": "my-stream", + }, + authParams: map[string]string{ + "addresses": ":7001, :7002", + }, + wantMeta: &redisStreamsMetadata{ + streamName: "my-stream", + targetStreamLength: 5, + connectionInfo: redisConnectionInfo{ + addresses: []string{":7001", ":7002"}, + }, + scaleFactor: xLengthFactor, + }, + wantErr: nil, + }, + { + name: "stream, consumerGroup is provided", + metadata: map[string]string{ + "stream": "my-stream", + "consumerGroup": "consumer1", + }, + authParams: map[string]string{ + "addresses": ":7001, :7002", + }, + wantMeta: &redisStreamsMetadata{ + streamName: "my-stream", + targetPendingEntriesCount: 5, + consumerGroupName: "consumer1", + connectionInfo: redisConnectionInfo{ + addresses: []string{":7001", ":7002"}, + }, + scaleFactor: xPendingFactor, }, wantErr: nil, }, @@ -486,21 +523,13 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { wantMeta: nil, wantErr: ErrRedisMissingStreamName, }, - { - name: "missing pending entries count", - metadata: map[string]string{ - "hosts": "a, b, c", - "ports": "1, 2, 3", - "stream": "my-stream", - }, - wantMeta: nil, - wantErr: ErrRedisMissingPendingEntriesCount, - }, { name: "invalid pending entries count", metadata: map[string]string{ + "stream": "my-stream", "hosts": "a, b, c", "ports": "1, 2, 3", + "consumerGroup": "consumer1", "pendingEntriesCount": "invalid", }, wantMeta: nil, @@ -523,6 +552,7 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { connectionInfo: redisConnectionInfo{ addresses: []string{":7001", ":7002"}, }, + scaleFactor: xPendingFactor, }, wantErr: nil, }, @@ -546,6 +576,7 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { hosts: []string{"a", "b", "c"}, ports: []string{"1", "2", "3"}, }, + scaleFactor: xPendingFactor, }, wantErr: nil, }, @@ -571,6 +602,7 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { ports: []string{"1", "2", "3"}, username: "username", }, + scaleFactor: xPendingFactor, }, wantErr: nil, }, @@ -595,6 +627,7 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { ports: []string{"1", "2", "3"}, username: "username", }, + scaleFactor: xPendingFactor, }, wantErr: nil, }, @@ -620,6 +653,7 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { ports: []string{"1", "2", "3"}, username: "none", }, + scaleFactor: xPendingFactor, }, wantErr: nil, }, @@ -645,6 +679,7 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { ports: []string{"1", "2", "3"}, password: "password", }, + scaleFactor: xPendingFactor, }, wantErr: nil, }, @@ -670,6 +705,7 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { ports: []string{"1", "2", "3"}, password: "none", }, + scaleFactor: xPendingFactor, }, wantErr: nil, }, @@ -695,6 +731,7 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { ports: []string{"1", "2", "3"}, sentinelUsername: "sentinelUsername", }, + scaleFactor: xPendingFactor, }, wantErr: nil, }, @@ -719,6 +756,7 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { ports: []string{"1", "2", "3"}, sentinelUsername: "sentinelUsername", }, + scaleFactor: xPendingFactor, }, wantErr: nil, }, @@ -744,6 +782,7 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { ports: []string{"1", "2", "3"}, sentinelUsername: "none", }, + scaleFactor: xPendingFactor, }, wantErr: nil, }, @@ -769,6 +808,7 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { ports: []string{"1", "2", "3"}, sentinelPassword: "sentinelPassword", }, + scaleFactor: xPendingFactor, }, wantErr: nil, }, @@ -794,6 +834,7 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { ports: []string{"1", "2", "3"}, sentinelPassword: "none", }, + scaleFactor: xPendingFactor, }, wantErr: nil, }, @@ -819,6 +860,7 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { ports: []string{"1", "2", "3"}, sentinelMaster: "sentinelMaster", }, + scaleFactor: xPendingFactor, }, wantErr: nil, }, @@ -843,6 +885,7 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { ports: []string{"1", "2", "3"}, sentinelMaster: "sentinelMaster", }, + scaleFactor: xPendingFactor, }, wantErr: nil, }, @@ -868,6 +911,7 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { ports: []string{"1", "2", "3"}, sentinelMaster: "none", }, + scaleFactor: xPendingFactor, }, wantErr: nil, }, @@ -896,6 +940,7 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { enableTLS: true, unsafeSsl: false, }, + scaleFactor: xPendingFactor, }, wantErr: nil, }, @@ -925,6 +970,83 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { enableTLS: true, unsafeSsl: true, }, + scaleFactor: xPendingFactor, + }, + wantErr: nil, + }, + { + name: "streamLength passed", + metadata: map[string]string{ + "hosts": "a", + "ports": "1", + "stream": "my-stream", + "streamLength": "15", + }, + authParams: map[string]string{}, + wantMeta: &redisStreamsMetadata{ + streamName: "my-stream", + targetStreamLength: 15, + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1"}, + hosts: []string{"a"}, + ports: []string{"1"}, + password: "", + enableTLS: false, + unsafeSsl: false, + }, + scaleFactor: xLengthFactor, + }, + wantErr: nil, + }, + { + name: "streamLength, pendingEntriesCount and consumerGroup passed", + metadata: map[string]string{ + "hosts": "a", + "ports": "1", + "stream": "my-stream", + "streamLength": "15", + "pendingEntriesCount": "30", + "consumerGroup": "consumer1", + }, + authParams: map[string]string{}, + wantMeta: &redisStreamsMetadata{ + streamName: "my-stream", + targetPendingEntriesCount: 30, + consumerGroupName: "consumer1", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1"}, + hosts: []string{"a"}, + ports: []string{"1"}, + password: "", + enableTLS: false, + unsafeSsl: false, + }, + scaleFactor: xPendingFactor, + }, + wantErr: nil, + }, + { + name: "streamLength and pendingEntriesCount passed", + metadata: map[string]string{ + "hosts": "a", + "ports": "1", + "stream": "my-stream", + "streamLength": "15", + "pendingEntriesCount": "30", + }, + authParams: map[string]string{}, + wantMeta: &redisStreamsMetadata{ + streamName: "my-stream", + targetStreamLength: 15, + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1"}, + hosts: []string{"a"}, + ports: []string{"1"}, + password: "", + enableTLS: false, + unsafeSsl: false, + }, + scaleFactor: xLengthFactor, }, wantErr: nil, }, diff --git a/tests/scalers/redis/redis_cluster_streams/redis_cluster_streams_test.go b/tests/scalers/redis/redis_cluster_streams/redis_cluster_streams_test.go index 410f4a35d66..9064e746d7a 100644 --- a/tests/scalers/redis/redis_cluster_streams/redis_cluster_streams_test.go +++ b/tests/scalers/redis/redis_cluster_streams/redis_cluster_streams_test.go @@ -71,11 +71,13 @@ spec: spec: containers: - name: redis-worker - image: ghcr.io/kedacore/tests-redis-cluster-streams:latest + image: ghcr.io/kedacore/tests-redis-streams:latest imagePullPolicy: IfNotPresent command: ["./main"] args: ["consumer"] env: + - name: REDIS_MODE + value: CLUSTER - name: REDIS_HOSTS value: {{.RedisHost}}.{{.RedisNamespace}} - name: REDIS_PORTS @@ -150,11 +152,13 @@ spec: spec: containers: - name: redis - image: ghcr.io/kedacore/tests-redis-cluster-streams:latest + image: ghcr.io/kedacore/tests-redis-streams:latest imagePullPolicy: IfNotPresent command: ["./main"] args: ["producer"] env: + - name: REDIS_MODE + value: CLUSTER - name: REDIS_HOSTS value: {{.RedisHost}}.{{.RedisNamespace}} - name: REDIS_PORTS diff --git a/tests/scalers/redis/redis_cluster_streams_length/redis_cluster_streams_length_test.go b/tests/scalers/redis/redis_cluster_streams_length/redis_cluster_streams_length_test.go new file mode 100644 index 00000000000..7f55cfc1b84 --- /dev/null +++ b/tests/scalers/redis/redis_cluster_streams_length/redis_cluster_streams_length_test.go @@ -0,0 +1,235 @@ +//go:build e2e +// +build e2e + +package redis_cluster_streams_length_test + +import ( + "encoding/base64" + "fmt" + "testing" + + "github.com/joho/godotenv" + "github.com/stretchr/testify/assert" + "k8s.io/client-go/kubernetes" + + . "github.com/kedacore/keda/v2/tests/helper" + redis "github.com/kedacore/keda/v2/tests/scalers/redis/helper" +) + +// Load environment variables from .env file +var _ = godotenv.Load("../../.env") + +const ( + testName = "redis-cluster-streams-length-test" +) + +var ( + testNamespace = fmt.Sprintf("%s-ns", testName) + redisNamespace = fmt.Sprintf("%s-redis-ns", testName) + deploymentName = fmt.Sprintf("%s-deployment", testName) + jobName = fmt.Sprintf("%s-job", testName) + scaledObjectName = fmt.Sprintf("%s-so", testName) + triggerAuthenticationName = fmt.Sprintf("%s-ta", testName) + secretName = fmt.Sprintf("%s-secret", testName) + redisPassword = "admin" + redisHost = fmt.Sprintf("%s-headless", testName) + minReplicaCount = 1 + maxReplicaCount = 5 +) + +type templateData struct { + TestNamespace string + RedisNamespace string + DeploymentName string + JobName string + ScaledObjectName string + TriggerAuthenticationName string + SecretName string + MinReplicaCount int + MaxReplicaCount int + RedisPassword string + RedisPasswordBase64 string + RedisHost string + ItemsToWrite int +} + +const ( + deploymentTemplate = `apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{.DeploymentName}} + namespace: {{.TestNamespace}} +spec: + replicas: 1 + selector: + matchLabels: + app: {{.DeploymentName}} + template: + metadata: + labels: + app: {{.DeploymentName}} + spec: + containers: + - name: redis-worker + image: ghcr.io/kedacore/tests-redis-streams:latest + imagePullPolicy: IfNotPresent + command: ["./main"] + args: ["consumer"] + env: + - name: REDIS_MODE + value: CLUSTER + - name: REDIS_HOSTS + value: {{.RedisHost}}.{{.RedisNamespace}} + - name: REDIS_PORTS + value: "6379" + - name: REDIS_STREAM_NAME + value: my-stream + - name: REDIS_STREAM_CONSUMER_GROUP_NAME + value: consumer-group-1 + - name: REDIS_PASSWORD + value: {{.RedisPassword}} + - name: DELETE_MESSAGES + value: "1" +` + + secretTemplate = `apiVersion: v1 +kind: Secret +metadata: + name: {{.SecretName}} + namespace: {{.TestNamespace}} +type: Opaque +data: + password: {{.RedisPasswordBase64}} +` + + triggerAuthenticationTemplate = `apiVersion: keda.sh/v1alpha1 +kind: TriggerAuthentication +metadata: + name: {{.TriggerAuthenticationName}} + namespace: {{.TestNamespace}} +spec: + secretTargetRef: + - parameter: password + name: {{.SecretName}} + key: password +` + + scaledObjectTemplate = `apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: {{.ScaledObjectName}} + namespace: {{.TestNamespace}} +spec: + scaleTargetRef: + name: {{.DeploymentName}} + pollingInterval: 5 + cooldownPeriod: 10 + minReplicaCount: {{.MinReplicaCount}} + maxReplicaCount: {{.MaxReplicaCount}} + advanced: + horizontalPodAutoscalerConfig: + behavior: + scaleDown: + stabilizationWindowSeconds: 15 + triggers: + - type: redis-cluster-streams + metadata: + hostsFromEnv: REDIS_HOSTS + portsFromEnv: REDIS_PORTS + stream: my-stream + streamLength: "15" + authenticationRef: + name: {{.TriggerAuthenticationName}} +` + + insertJobTemplate = `apiVersion: batch/v1 +kind: Job +metadata: + name: {{.JobName}} + namespace: {{.TestNamespace}} +spec: + ttlSecondsAfterFinished: 0 + template: + spec: + containers: + - name: redis + image: ghcr.io/kedacore/tests-redis-streams:latest + imagePullPolicy: IfNotPresent + command: ["./main"] + args: ["producer"] + env: + - name: REDIS_MODE + value: CLUSTER + - name: REDIS_HOSTS + value: {{.RedisHost}}.{{.RedisNamespace}} + - name: REDIS_PORTS + value: "6379" + - name: REDIS_STREAM_NAME + value: my-stream + - name: REDIS_PASSWORD + value: {{.RedisPassword}} + - name: NUM_MESSAGES + value: "{{.ItemsToWrite}}" + restartPolicy: Never + backoffLimit: 4 +` +) + +func TestScaler(t *testing.T) { + // Create kubernetes resources for PostgreSQL server + kc := GetKubernetesClient(t) + + // Create Redis Cluster + redis.InstallCluster(t, kc, testName, redisNamespace, redisPassword) + + // Create kubernetes resources for testing + data, templates := getTemplateData() + CreateKubernetesResources(t, kc, testNamespace, data, templates) + + testScaleOut(t, kc, data) + testScaleIn(t, kc) + + // cleanup + redis.RemoveCluster(t, kc, testName, redisNamespace) + DeleteKubernetesResources(t, kc, testNamespace, data, templates) +} + +func testScaleOut(t *testing.T, kc *kubernetes.Clientset, data templateData) { + t.Log("--- testing scale out ---") + KubectlApplyWithTemplate(t, data, "insertJobTemplate", insertJobTemplate) + + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, maxReplicaCount, 60, 3), + "replica count should be %d after 3 minutes", maxReplicaCount) +} + +func testScaleIn(t *testing.T, kc *kubernetes.Clientset) { + t.Log("--- testing scale in ---") + + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, minReplicaCount, 60, 3), + "replica count should be %d after 3 minutes", minReplicaCount) +} + +var data = templateData{ + TestNamespace: testNamespace, + RedisNamespace: redisNamespace, + DeploymentName: deploymentName, + ScaledObjectName: scaledObjectName, + MinReplicaCount: minReplicaCount, + MaxReplicaCount: maxReplicaCount, + TriggerAuthenticationName: triggerAuthenticationName, + SecretName: secretName, + JobName: jobName, + RedisPassword: redisPassword, + RedisPasswordBase64: base64.StdEncoding.EncodeToString([]byte(redisPassword)), + RedisHost: redisHost, + ItemsToWrite: 100, +} + +func getTemplateData() (templateData, []Template) { + return data, []Template{ + {Name: "secretTemplate", Config: secretTemplate}, + {Name: "deploymentTemplate", Config: deploymentTemplate}, + {Name: "triggerAuthenticationTemplate", Config: triggerAuthenticationTemplate}, + {Name: "scaledObjectTemplate", Config: scaledObjectTemplate}, + } +} diff --git a/tests/scalers/redis/redis_sentinel_streams/redis_sentinel_streams_test.go b/tests/scalers/redis/redis_sentinel_streams/redis_sentinel_streams_test.go index 9674d221534..1aadb110d7a 100644 --- a/tests/scalers/redis/redis_sentinel_streams/redis_sentinel_streams_test.go +++ b/tests/scalers/redis/redis_sentinel_streams/redis_sentinel_streams_test.go @@ -71,11 +71,13 @@ spec: spec: containers: - name: redis-worker - image: ghcr.io/kedacore/tests-redis-sentinel-streams:latest + image: ghcr.io/kedacore/tests-redis-streams:latest imagePullPolicy: IfNotPresent command: ["./main"] args: ["consumer"] env: + - name: REDIS_MODE + value: SENTINEL - name: REDIS_HOSTS value: {{.RedisHost}}.{{.RedisNamespace}} - name: REDIS_PORTS @@ -158,11 +160,13 @@ spec: spec: containers: - name: redis - image: ghcr.io/kedacore/tests-redis-sentinel-streams:latest + image: ghcr.io/kedacore/tests-redis-streams:latest imagePullPolicy: IfNotPresent command: ["./main"] args: ["producer"] env: + - name: REDIS_MODE + value: SENTINEL - name: REDIS_HOSTS value: {{.RedisHost}}.{{.RedisNamespace}} - name: REDIS_PORTS diff --git a/tests/scalers/redis/redis_sentinel_streams_length/redis_sentinel_streams_length_test.go b/tests/scalers/redis/redis_sentinel_streams_length/redis_sentinel_streams_length_test.go new file mode 100644 index 00000000000..f241b7142af --- /dev/null +++ b/tests/scalers/redis/redis_sentinel_streams_length/redis_sentinel_streams_length_test.go @@ -0,0 +1,249 @@ +//go:build e2e +// +build e2e + +package redis_sentinel_streams_test + +import ( + "encoding/base64" + "fmt" + "testing" + + "github.com/joho/godotenv" + "github.com/stretchr/testify/assert" + "k8s.io/client-go/kubernetes" + + . "github.com/kedacore/keda/v2/tests/helper" + redis "github.com/kedacore/keda/v2/tests/scalers/redis/helper" +) + +// Load environment variables from .env file +var _ = godotenv.Load("../../.env") + +const ( + testName = "redis-sentinel-streams-test-length" +) + +var ( + testNamespace = fmt.Sprintf("%s-ns", testName) + redisNamespace = fmt.Sprintf("%s-redis-ns", testName) + deploymentName = fmt.Sprintf("%s-deployment", testName) + jobName = fmt.Sprintf("%s-job", testName) + scaledObjectName = fmt.Sprintf("%s-so", testName) + triggerAuthenticationName = fmt.Sprintf("%s-ta", testName) + secretName = fmt.Sprintf("%s-secret", testName) + redisPassword = "admin" + redisHost = fmt.Sprintf("%s-headless", testName) + minReplicaCount = 1 + maxReplicaCount = 4 +) + +type templateData struct { + TestNamespace string + RedisNamespace string + DeploymentName string + JobName string + ScaledObjectName string + TriggerAuthenticationName string + SecretName string + MinReplicaCount int + MaxReplicaCount int + RedisPassword string + RedisPasswordBase64 string + RedisHost string + ItemsToWrite int +} + +const ( + deploymentTemplate = `apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{.DeploymentName}} + namespace: {{.TestNamespace}} +spec: + replicas: 1 + selector: + matchLabels: + app: {{.DeploymentName}} + template: + metadata: + labels: + app: {{.DeploymentName}} + spec: + containers: + - name: redis-worker + image: ghcr.io/kedacore/tests-redis-streams:latest + imagePullPolicy: IfNotPresent + command: ["./main"] + args: ["consumer"] + env: + - name: REDIS_MODE + value: SENTINEL + - name: REDIS_HOSTS + value: {{.RedisHost}}.{{.RedisNamespace}} + - name: REDIS_PORTS + value: "26379" + - name: REDIS_STREAM_NAME + value: my-stream + - name: REDIS_STREAM_CONSUMER_GROUP_NAME + value: consumer-group-1 + - name: REDIS_PASSWORD + value: {{.RedisPassword}} + - name: REDIS_SENTINEL_PASSWORD + value: {{.RedisPassword}} + - name: REDIS_SENTINEL_MASTER + value: mymaster + - name: DELETE_MESSAGES + value: "1" +` + + secretTemplate = `apiVersion: v1 +kind: Secret +metadata: + name: {{.SecretName}} + namespace: {{.TestNamespace}} +type: Opaque +data: + password: {{.RedisPasswordBase64}} +` + + triggerAuthenticationTemplate = `apiVersion: keda.sh/v1alpha1 +kind: TriggerAuthentication +metadata: + name: {{.TriggerAuthenticationName}} + namespace: {{.TestNamespace}} +spec: + secretTargetRef: + - parameter: password + name: {{.SecretName}} + key: password + - parameter: sentinelPassword + name: {{.SecretName}} + key: password +` + + scaledObjectTemplate = `apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: {{.ScaledObjectName}} + namespace: {{.TestNamespace}} +spec: + scaleTargetRef: + name: {{.DeploymentName}} + pollingInterval: 5 + cooldownPeriod: 10 + minReplicaCount: {{.MinReplicaCount}} + maxReplicaCount: {{.MaxReplicaCount}} + advanced: + horizontalPodAutoscalerConfig: + behavior: + scaleDown: + stabilizationWindowSeconds: 15 + triggers: + - type: redis-sentinel-streams + metadata: + hostsFromEnv: REDIS_HOSTS + portsFromEnv: REDIS_PORTS + stream: my-stream + streamLength: "10" + sentinelMaster: mymaster + authenticationRef: + name: {{.TriggerAuthenticationName}} +` + + insertJobTemplate = `apiVersion: batch/v1 +kind: Job +metadata: + name: {{.JobName}} + namespace: {{.TestNamespace}} +spec: + ttlSecondsAfterFinished: 0 + template: + spec: + containers: + - name: redis + image: ghcr.io/kedacore/tests-redis-streams:latest + imagePullPolicy: IfNotPresent + command: ["./main"] + args: ["producer"] + env: + - name: REDIS_MODE + value: SENTINEL + - name: REDIS_HOSTS + value: {{.RedisHost}}.{{.RedisNamespace}} + - name: REDIS_PORTS + value: "26379" + - name: REDIS_STREAM_NAME + value: my-stream + - name: REDIS_STREAM_CONSUMER_GROUP_NAME + value: consumer-group-1 + - name: REDIS_PASSWORD + value: {{.RedisPassword}} + - name: REDIS_SENTINEL_PASSWORD + value: {{.RedisPassword}} + - name: REDIS_SENTINEL_MASTER + value: mymaster + - name: NUM_MESSAGES + value: "{{.ItemsToWrite}}" + restartPolicy: Never + backoffLimit: 4 +` +) + +func TestScaler(t *testing.T) { + // Create kubernetes resources for PostgreSQL server + kc := GetKubernetesClient(t) + + // Create Redis Sentinel + redis.InstallSentinel(t, kc, testName, redisNamespace, redisPassword) + + // Create kubernetes resources for testing + data, templates := getTemplateData() + CreateKubernetesResources(t, kc, testNamespace, data, templates) + + testScaleOut(t, kc, data) + testScaleIn(t, kc) + + // cleanup + redis.RemoveSentinel(t, kc, testName, redisNamespace) + DeleteKubernetesResources(t, kc, testNamespace, data, templates) +} + +func testScaleOut(t *testing.T, kc *kubernetes.Clientset, data templateData) { + t.Log("--- testing scale out ---") + KubectlApplyWithTemplate(t, data, "insertJobTemplate", insertJobTemplate) + + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, maxReplicaCount, 60, 3), + "replica count should be %d after 3 minutes", maxReplicaCount) +} + +func testScaleIn(t *testing.T, kc *kubernetes.Clientset) { + t.Log("--- testing scale in ---") + + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, minReplicaCount, 60, 3), + "replica count should be %d after 3 minutes", minReplicaCount) +} + +var data = templateData{ + TestNamespace: testNamespace, + RedisNamespace: redisNamespace, + DeploymentName: deploymentName, + ScaledObjectName: scaledObjectName, + MinReplicaCount: minReplicaCount, + MaxReplicaCount: maxReplicaCount, + TriggerAuthenticationName: triggerAuthenticationName, + SecretName: secretName, + JobName: jobName, + RedisPassword: redisPassword, + RedisPasswordBase64: base64.StdEncoding.EncodeToString([]byte(redisPassword)), + RedisHost: redisHost, + ItemsToWrite: 100, +} + +func getTemplateData() (templateData, []Template) { + return data, []Template{ + {Name: "secretTemplate", Config: secretTemplate}, + {Name: "deploymentTemplate", Config: deploymentTemplate}, + {Name: "triggerAuthenticationTemplate", Config: triggerAuthenticationTemplate}, + {Name: "scaledObjectTemplate", Config: scaledObjectTemplate}, + } +} diff --git a/tests/scalers/redis/redis_standalone_streams/redis_standalone_streams_test.go b/tests/scalers/redis/redis_standalone_streams/redis_standalone_streams_test.go index 6e3e5688ba8..7404ace6437 100644 --- a/tests/scalers/redis/redis_standalone_streams/redis_standalone_streams_test.go +++ b/tests/scalers/redis/redis_standalone_streams/redis_standalone_streams_test.go @@ -33,13 +33,14 @@ var ( secretName = fmt.Sprintf("%s-secret", testName) redisPassword = "admin" redisStreamName = "stream" - redisHost = fmt.Sprintf("redis.%s.svc.cluster.local:6379", redisNamespace) + redisAddress = fmt.Sprintf("redis.%s.svc.cluster.local:6379", redisNamespace) minReplicaCount = 1 maxReplicaCount = 2 ) type templateData struct { TestNamespace string + RedisNamespace string DeploymentName string JobName string ScaledObjectName string @@ -50,7 +51,7 @@ type templateData struct { RedisPassword string RedisPasswordBase64 string RedisStreamName string - RedisHost string + RedisAddress string ItemsToWrite int } @@ -72,11 +73,15 @@ spec: spec: containers: - name: redis-worker - image: ghcr.io/kedacore/tests-redis-streams-consumer:latest + image: ghcr.io/kedacore/tests-redis-streams:latest imagePullPolicy: IfNotPresent + command: ["./main"] + args: ["consumer"] env: - - name: REDIS_HOST - value: {{.RedisHost}} + - name: REDIS_MODE + value: STANDALONE + - name: REDIS_ADDRESS + value: {{.RedisAddress}} - name: REDIS_STREAM_NAME value: {{.RedisStreamName}} - name: REDIS_PASSWORD @@ -127,10 +132,10 @@ spec: triggers: - type: redis-streams metadata: - addressFromEnv: REDIS_HOST + addressFromEnv: REDIS_ADDRESS stream: {{.RedisStreamName}} consumerGroup: consumer-group-1 - pendingEntriesCount: "10" + pendingEntriesCount: "15" authenticationRef: name: {{.TriggerAuthenticationName}} ` @@ -146,11 +151,15 @@ spec: spec: containers: - name: redis - image: ghcr.io/kedacore/tests-redis-streams-producer:latest + image: ghcr.io/kedacore/tests-redis-streams:latest imagePullPolicy: IfNotPresent + command: ["./main"] + args: ["producer"] env: - - name: REDIS_HOST - value: {{.RedisHost}} + - name: REDIS_MODE + value: STANDALONE + - name: REDIS_ADDRESS + value: {{.RedisAddress}} - name: REDIS_PASSWORD value: {{.RedisPassword}} - name: REDIS_STREAM_NAME @@ -172,8 +181,6 @@ func TestScaler(t *testing.T) { // Create kubernetes resources for testing data, templates := getTemplateData() CreateKubernetesResources(t, kc, testNamespace, data, templates) - assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 1, 60, 3), - "replica count should be %d after 3 minutes", 1) testScaleOut(t, kc, data) testScaleIn(t, kc) @@ -185,7 +192,6 @@ func TestScaler(t *testing.T) { func testScaleOut(t *testing.T, kc *kubernetes.Clientset, data templateData) { t.Log("--- testing scale out ---") - data.ItemsToWrite = 20 KubectlApplyWithTemplate(t, data, "insertJobTemplate", insertJobTemplate) assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, maxReplicaCount, 60, 3), @@ -196,14 +202,15 @@ func testScaleIn(t *testing.T, kc *kubernetes.Clientset) { t.Log("--- testing scale in ---") assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, minReplicaCount, 60, 3), - "replica count should be %d after 5 minutes", minReplicaCount) + "replica count should be %d after 3 minutes", minReplicaCount) } var data = templateData{ TestNamespace: testNamespace, + RedisNamespace: redisNamespace, DeploymentName: deploymentName, ScaledObjectName: scaledObjectName, - MinReplicaCount: 1, + MinReplicaCount: minReplicaCount, MaxReplicaCount: maxReplicaCount, TriggerAuthenticationName: triggerAuthenticationName, SecretName: secretName, @@ -211,8 +218,8 @@ var data = templateData{ RedisPassword: redisPassword, RedisPasswordBase64: base64.StdEncoding.EncodeToString([]byte(redisPassword)), RedisStreamName: redisStreamName, - RedisHost: redisHost, - ItemsToWrite: 0, + RedisAddress: redisAddress, + ItemsToWrite: 20, } func getTemplateData() (templateData, []Template) { diff --git a/tests/scalers/redis/redis_standalone_streams_length/redis_standalone_streams_length_test.go b/tests/scalers/redis/redis_standalone_streams_length/redis_standalone_streams_length_test.go new file mode 100644 index 00000000000..15da8912c25 --- /dev/null +++ b/tests/scalers/redis/redis_standalone_streams_length/redis_standalone_streams_length_test.go @@ -0,0 +1,233 @@ +//go:build e2e +// +build e2e + +package redis_standalone_streams_length_test + +import ( + "encoding/base64" + "fmt" + "testing" + + "github.com/joho/godotenv" + "github.com/stretchr/testify/assert" + "k8s.io/client-go/kubernetes" + + . "github.com/kedacore/keda/v2/tests/helper" + redis "github.com/kedacore/keda/v2/tests/scalers/redis/helper" +) + +// Load environment variables from .env file +var _ = godotenv.Load("../../.env") + +const ( + testName = "redis-standalone-streams-length-test" +) + +var ( + testNamespace = fmt.Sprintf("%s-ns", testName) + redisNamespace = fmt.Sprintf("%s-redis-ns", testName) + deploymentName = fmt.Sprintf("%s-deployment", testName) + jobName = fmt.Sprintf("%s-job", testName) + scaledObjectName = fmt.Sprintf("%s-so", testName) + triggerAuthenticationName = fmt.Sprintf("%s-ta", testName) + secretName = fmt.Sprintf("%s-secret", testName) + redisPassword = "admin" + redisStreamName = "stream" + redisAddress = fmt.Sprintf("redis.%s.svc.cluster.local:6379", redisNamespace) + minReplicaCount = 1 + maxReplicaCount = 4 +) + +type templateData struct { + TestNamespace string + RedisNamespace string + DeploymentName string + JobName string + ScaledObjectName string + TriggerAuthenticationName string + SecretName string + MinReplicaCount int + MaxReplicaCount int + RedisPassword string + RedisPasswordBase64 string + RedisStreamName string + RedisAddress string + ItemsToWrite int +} + +const ( + deploymentTemplate = `apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{.DeploymentName}} + namespace: {{.TestNamespace}} +spec: + replicas: 1 + selector: + matchLabels: + app: {{.DeploymentName}} + template: + metadata: + labels: + app: {{.DeploymentName}} + spec: + containers: + - name: redis-worker + image: ghcr.io/kedacore/tests-redis-streams:latest + imagePullPolicy: IfNotPresent + command: ["./main"] + args: ["consumer"] + env: + - name: REDIS_MODE + value: STANDALONE + - name: REDIS_ADDRESS + value: {{.RedisAddress}} + - name: REDIS_STREAM_NAME + value: {{.RedisStreamName}} + - name: REDIS_PASSWORD + value: {{.RedisPassword}} + - name: REDIS_STREAM_CONSUMER_GROUP_NAME + value: "consumer-group-1" + - name: DELETE_MESSAGES + value: "1" +` + + secretTemplate = `apiVersion: v1 +kind: Secret +metadata: + name: {{.SecretName}} + namespace: {{.TestNamespace}} +type: Opaque +data: + password: {{.RedisPasswordBase64}} +` + + triggerAuthenticationTemplate = `apiVersion: keda.sh/v1alpha1 +kind: TriggerAuthentication +metadata: + name: {{.TriggerAuthenticationName}} + namespace: {{.TestNamespace}} +spec: + secretTargetRef: + - parameter: password + name: {{.SecretName}} + key: password +` + + scaledObjectTemplate = `apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: {{.ScaledObjectName}} + namespace: {{.TestNamespace}} +spec: + scaleTargetRef: + name: {{.DeploymentName}} + pollingInterval: 5 + cooldownPeriod: 10 + minReplicaCount: {{.MinReplicaCount}} + maxReplicaCount: {{.MaxReplicaCount}} + advanced: + horizontalPodAutoscalerConfig: + behavior: + scaleDown: + stabilizationWindowSeconds: 15 + triggers: + - type: redis-streams + metadata: + addressFromEnv: REDIS_ADDRESS + stream: {{.RedisStreamName}} + streamLength: "10" + authenticationRef: + name: {{.TriggerAuthenticationName}} +` + + insertJobTemplate = `apiVersion: batch/v1 +kind: Job +metadata: + name: {{.JobName}} + namespace: {{.TestNamespace}} +spec: + ttlSecondsAfterFinished: 0 + template: + spec: + containers: + - name: redis + image: ghcr.io/kedacore/tests-redis-streams:latest + imagePullPolicy: IfNotPresent + command: ["./main"] + args: ["producer"] + env: + - name: REDIS_MODE + value: STANDALONE + - name: REDIS_ADDRESS + value: {{.RedisAddress}} + - name: REDIS_PASSWORD + value: {{.RedisPassword}} + - name: REDIS_STREAM_NAME + value: {{.RedisStreamName}} + - name: NUM_MESSAGES + value: "{{.ItemsToWrite}}" + restartPolicy: Never + backoffLimit: 4 +` +) + +func TestScaler(t *testing.T) { + // Create kubernetes resources for PostgreSQL server + kc := GetKubernetesClient(t) + + // Create Redis Standalone + redis.InstallStandalone(t, kc, testName, redisNamespace, redisPassword) + + // Create kubernetes resources for testing + data, templates := getTemplateData() + CreateKubernetesResources(t, kc, testNamespace, data, templates) + + testScaleOut(t, kc, data) + testScaleIn(t, kc) + + // cleanup + redis.RemoveStandalone(t, kc, testName, redisNamespace) + DeleteKubernetesResources(t, kc, testNamespace, data, templates) +} + +func testScaleOut(t *testing.T, kc *kubernetes.Clientset, data templateData) { + t.Log("--- testing scale out ---") + KubectlApplyWithTemplate(t, data, "insertJobTemplate", insertJobTemplate) + + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, maxReplicaCount, 60, 3), + "replica count should be %d after 3 minutes", maxReplicaCount) +} + +func testScaleIn(t *testing.T, kc *kubernetes.Clientset) { + t.Log("--- testing scale in ---") + + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, minReplicaCount, 60, 3), + "replica count should be %d after 3 minutes", minReplicaCount) +} + +var data = templateData{ + TestNamespace: testNamespace, + RedisNamespace: redisNamespace, + DeploymentName: deploymentName, + ScaledObjectName: scaledObjectName, + MinReplicaCount: minReplicaCount, + MaxReplicaCount: maxReplicaCount, + TriggerAuthenticationName: triggerAuthenticationName, + SecretName: secretName, + JobName: jobName, + RedisPassword: redisPassword, + RedisPasswordBase64: base64.StdEncoding.EncodeToString([]byte(redisPassword)), + RedisStreamName: redisStreamName, + RedisAddress: redisAddress, + ItemsToWrite: 100, +} + +func getTemplateData() (templateData, []Template) { + return data, []Template{ + {Name: "secretTemplate", Config: secretTemplate}, + {Name: "deploymentTemplate", Config: deploymentTemplate}, + {Name: "triggerAuthenticationTemplate", Config: triggerAuthenticationTemplate}, + {Name: "scaledObjectTemplate", Config: scaledObjectTemplate}, + } +}