Skip to content

Commit

Permalink
Avoid scaling triggered by events/messages when ScaledObject is paused (
Browse files Browse the repository at this point in the history
  • Loading branch information
aryan9600 authored May 10, 2022
1 parent c47e7ca commit 3cdac48
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 16 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ To learn more about our roadmap, we recommend reading [this document](ROADMAP.md

- **General**: Fix CVE-2022-21221 in `github.com/valyala/fasthttp` ([#2775](https://github.com/kedacore/keda/issue/2775))
- **General**: Bump Golang to 1.17.9 ([#3016](https://github.com/kedacore/keda/issues/3016))
- **General**: Fix autoscaling behaviour while paused. ([#3009](https://github.com/kedacore/keda/issues/3009))

## v2.7.0

Expand Down
35 changes: 20 additions & 15 deletions pkg/scaling/executor/scale_scaledobjects.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,23 +93,26 @@ func (e *scaleExecutor) RequestScale(ctx context.Context, scaledObject *kedav1al
}

status := scaledObject.Status.DeepCopy()
if pausedCount != nil && *pausedCount != currentReplicas && status.PausedReplicaCount == nil {
_, err := e.updateScaleOnScaleTarget(ctx, scaledObject, currentScale, *pausedCount)
if err != nil {
logger.Error(err, "error scaling target to paused replicas count", "paused replicas", *pausedCount)
if err := e.setReadyCondition(ctx, logger, scaledObject, metav1.ConditionUnknown,
kedav1alpha1.ScaledObjectConditionReadySucccesReason, kedav1alpha1.ScaledObjectConditionReadySuccessMessage); err != nil {
logger.Error(err, "error setting ready condition")
if pausedCount != nil {
// Scale the target to the paused replica count
if *pausedCount != currentReplicas {
_, err := e.updateScaleOnScaleTarget(ctx, scaledObject, currentScale, *pausedCount)
if err != nil {
logger.Error(err, "error scaling target to paused replicas count", "paused replicas", *pausedCount)
if err := e.setReadyCondition(ctx, logger, scaledObject, metav1.ConditionUnknown,
kedav1alpha1.ScaledObjectConditionReadySucccesReason, kedav1alpha1.ScaledObjectConditionReadySuccessMessage); err != nil {
logger.Error(err, "error setting ready condition")
}
return
}
return
}
status.PausedReplicaCount = pausedCount
err = kedacontrollerutil.UpdateScaledObjectStatus(ctx, e.client, logger, scaledObject, status)
if err != nil {
logger.Error(err, "error updating status paused replica count")
return
status.PausedReplicaCount = pausedCount
err = kedacontrollerutil.UpdateScaledObjectStatus(ctx, e.client, logger, scaledObject, status)
if err != nil {
logger.Error(err, "error updating status paused replica count")
return
}
logger.Info("Successfully scaled target to paused replicas count", "paused replicas", *pausedCount)
}
logger.Info("Successfully scaled target to paused replicas count", "paused replicas", *pausedCount)
return
}

Expand Down Expand Up @@ -355,6 +358,8 @@ func getIdleOrMinimumReplicaCount(scaledObject *kedav1alpha1.ScaledObject) (bool
return false, *scaledObject.Spec.MinReplicaCount
}

// GetPausedReplicaCount returns the paused replica count of the ScaledObject.
// If not paused, it returns nil.
func GetPausedReplicaCount(scaledObject *kedav1alpha1.ScaledObject) (*int32, error) {
if scaledObject.Annotations != nil {
if val, ok := scaledObject.Annotations[kedacontrollerutil.PausedReplicasAnnotation]; ok {
Expand Down
38 changes: 37 additions & 1 deletion tests/scalers/azure-queue-pause.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import * as azure from 'azure-storage'
import * as sh from 'shelljs'
import * as tmp from 'tmp'
import test from 'ava'
import {createNamespace, waitForDeploymentReplicaCount} from "./helpers";
import {createNamespace, waitForDeploymentReplicaCount, sleep} from "./helpers";

const testNamespace = 'pause-test'
const deploymentFile = tmp.fileSync()
Expand Down Expand Up @@ -60,6 +60,24 @@ test.serial(
}
)

test.serial.cb(
'Deployment should remain at pausedReplicaCount (0) even with messages on storage',
t => {
const queueSvc = azure.createQueueService(connectionString)
queueSvc.messageEncoder = new azure.QueueMessageEncoder.TextBase64QueueMessageEncoder()
async.mapLimit(
Array(1000).keys(),
20,
(n, cb) => queueSvc.createMessage(queueName, `test ${n}`, cb),
async () => {
t.true(await checkIfReplicaCountGreater(0, 'test-deployment', testNamespace, 60, 1000), 'replica count remain 0 after 1 minute')
queueSvc.clearMessages(queueName, _ => {})
t.end()
}
)
}
)

test.serial(`Updsating ScaledObject (without annotation) should work`, async t => {
fs.writeFileSync(scaledObjectFile.name, scaledObjectYaml)
t.is(
Expand Down Expand Up @@ -118,6 +136,24 @@ test.after.always.cb('clean up workload test related deployments', t => {
t.end()
})


// checks if the current replica count is greater than the given target count for a given interval.
// returns false if it is greater, otherwise true.
async function checkIfReplicaCountGreater(target: number, name: string, namespace: string, iterations = 10, interval = 3000): Promise<boolean> {
for (let i = 0; i < iterations; i++) {
let replicaCountStr = sh.exec(`kubectl get deployment.apps/${name} --namespace ${namespace} -o jsonpath="{.spec.replicas}"`).stdout
try {
const replicaCount = parseInt(replicaCountStr, 10)
if (replicaCount > target) {
return false
}
} catch { }

await sleep(interval)
}
return true
}

const deployYaml = `apiVersion: v1
kind: Secret
metadata:
Expand Down

0 comments on commit 3cdac48

Please sign in to comment.