Skip to content

Commit

Permalink
Scale Jobs Natively (Without Admission Controllers) (#71)
Browse files Browse the repository at this point in the history
* added feature to scale jobs without admission controller

* added test with owner references

* refactored documentation

* refactored documentation
  • Loading branch information
samuel-esp authored Jul 18, 2024
1 parent 98d4042 commit 9aa2078
Show file tree
Hide file tree
Showing 4 changed files with 276 additions and 18 deletions.
69 changes: 58 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ Scale down / "pause" Kubernetes workload (`Deployments`, `StatefulSets`,
- [Uptime / downtime spec](#uptime--downtime-spec)
- [Alternative Logic, Based on Periods](#alternative-logic-based-on-periods)
- [Command Line Options](#command-line-options)
- [Scaling Jobs](#scaling-jobs)
- [Scaling Jobs: Overview](#scaling-jobs-overview)
- [Scaling Jobs Natively](#scaling-jobs-natively)
- [Scaling Jobs With Admission Controller](#scaling-jobs-with-admission-controller)
- [Scaling Daemonsets](#scaling-daemonset)
- [Matching Labels Argument](#matching-labels-argument)
- [Namespace Defaults](#namespace-defaults)
Expand Down Expand Up @@ -355,19 +357,65 @@ Available command line options:
> Make sure to read the dedicated section below to understand how to use the
> `--admission-controller` feature correctly
### Scaling Jobs
### Scaling Jobs: Overview
Before scaling jobs make sure the Admission Controller of your choice is correctly installed inside the cluster.
Kube-Downscaler performs some health checks that are displayed inside logs when the `--debug` arg is present.
If you are using Gatekeeper, Kube-Downscaler will install a new Custom Resource Definition
called `kubedownscalerjobsconstraint`
Kube Downscaler offers two possibilities for downscaling Jobs:
**When using this feature you need to exclude Kyverno or Gatekeeper resources from downscaling otherwise
the admission controller pods won't be able to donwscale jobs.**
You can use `EXCLUDE_NAMESPACES` environment variable or `--exclude-namespaces` arg to exclude `"kyverno"` or `"gatekeeper-system"` namespaces.
To have a more fine-grained control you can use `EXCLUDE_DEPLOYMENTS` environment variable
1) Downscaling Jobs Natively: Kube Downscaler will downscale Jobs by modifying the spec.suspend parameter
within the job's yaml file. The spec.suspend parameter will be set to True and the pods created by the Job
will be automatically deleted.
2) Downscaling Jobs With Admission Controllers: Kube Downscaler will block the creation of all new Jobs using
Admission Policies created with an Admission Controller (Kyverno or Gatekeeper, depending on the user's choice)
In both cases, all Jobs created by CronJob will not be modified unless the user specifies via the
`--include-resources` argument that they want to turn off both Jobs and CronJobs
**How To Choose The Correct Mode:**
1) The first mode is recommended when the Jobs created within the Cluster are few and sporadic
2) The second mode is recommended when there are many Jobs created within the Cluster and they are created at very frequent intervals.
it's important to note the following:
The second mode is specifically designed to avoid frequent node provisioning. This is particularly relevant when KubeDownscaler
might turn off jobs shortly after they've triggered node provisioning. If jobs trigger node provisioning but are then
scaled down or stopped by KubeDownscaler within 30 to 60 seconds, the Cluster Autoscaler is basically doing an unnecessary
provisioning action because the new nodes will be scaled down shortly after as well. Frequently provisioning nodes only to
have them become unnecessary shortly thereafter is an operation that should be minimized, as it is inefficient.
### Scaling Jobs Natively
To scale down jobs natively, you only need to specify `jobs` inside the `--include-resource` argument of the Deployment
### Scaling Jobs With Admission Controller
Before scaling jobs with an Admission Controller make sure the Admission Controller of your choice is correctly installed inside the
cluster.
At startup, Kube-Downscaler will always perform some health checks for the Admission Controller of your choiche that are
displayed inside logs when the argument `--debug` arg is present inside the main Deployment.
**<u>Important</u>: In order to use this feature you will need to exclude Kyverno or Gatekeeper resources from downscaling otherwise
the admission controller pods won't be able to block jobs.** You can use `EXCLUDE_NAMESPACES` environment variable or `--exclude-namespaces`
arg to exclude `"kyverno"` or `"gatekeeper-system"` namespaces.
Alternatively `EXCLUDE_DEPLOYMENTS` environment variable
or `--exclude-deployments` arg to exclude only certain resources inside `"kyverno"` or `"gatekeeper-system"` namespaces
The workflow for blocking jobs is different if you use Gatekeeper or Kyverno, both are described below
**Blocking Jobs: Gatekeeper Workflow**
1) Kube-Downscaler will install a new Custom Resource Definition
called `kubedownscalerjobsconstraint`.
2) Kube-Downscaler will create a Constraint called "KubeDownscalerJobsConstraint" for each namespace that is not excluded
**Blocking Jobs: Kyverno Workflow**
1) Kube-Downscaler will create a Policy for each namespace that is not excluded
All the statements below are valid for both Kyverno and Gatekeeper, unless specified otherwise
**<u>Important</u>:** Jobs started from CronJobs are excluded by default unless you have included `cronjobs` inside `--include-resources` argument
**Annotations:** both the `downscaler/exclude` and `downscaler/exclude-until` annotations are fully supported
Expand Down Expand Up @@ -401,7 +449,6 @@ annotations are not supported if specified directly inside the Job definition du
on computing days of the week inside the policies. However you can still use
these annotations at Namespace level to downscale/upscale Jobs
**Deleting Policies:** if for some reason you want to delete all resources blocking jobs, you can use these commands:
Gatekeeper
Expand Down
1 change: 1 addition & 0 deletions chart/templates/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ rules:
- batch
resources:
- cronjobs
- jobs
verbs:
- get
- watch
Expand Down
19 changes: 13 additions & 6 deletions kube_downscaler/scaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ def pods_force_uptime(api, namespace: str):
return True
return False

def scale_jobs_without_admission_controller(plural, admission_controller):
return plural == "jobs" and admission_controller == ""

def is_stack_deployment(resource: NamespacedAPIObject) -> bool:
if resource.kind == Deployment.kind and resource.version == Deployment.version:
Expand Down Expand Up @@ -195,7 +197,7 @@ def ignore_resource(resource: NamespacedAPIObject, now: datetime.datetime) -> bo
def get_replicas(
resource: NamespacedAPIObject, original_replicas: Optional[int], uptime: str
) -> int:
if resource.kind == "CronJob":
if resource.kind in ["CronJob", "Job"]:
suspended = resource.obj["spec"]["suspend"]
replicas = 0 if suspended else 1
state = "suspended" if suspended else "not suspended"
Expand Down Expand Up @@ -397,12 +399,12 @@ def scale_up(
f"Unsuspending {resource.kind} {resource.namespace}/{resource.name} (uptime: {uptime}, downtime: {downtime})"
)
event_message = "Unsuspending DaemonSet"
elif resource.kind == "CronJob":
elif resource.kind in ["CronJob", "Job"]:
resource.obj["spec"]["suspend"] = False
logger.info(
f"Unsuspending {resource.kind} {resource.namespace}/{resource.name} (uptime: {uptime}, downtime: {downtime})"
)
event_message = "Unsuspending CronJob"
event_message = f"Unsuspending {resource.kind}"
elif resource.kind == "PodDisruptionBudget":
if "minAvailable" in resource.obj["spec"]:
resource.obj["spec"]["minAvailable"] = original_replicas
Expand Down Expand Up @@ -468,12 +470,12 @@ def scale_down(
f"Suspending {resource.kind} {resource.namespace}/{resource.name} (uptime: {uptime}, downtime: {downtime})"
)
event_message = "Suspending DaemonSet"
elif resource.kind == "CronJob":
elif resource.kind in ["CronJob", "Job"]:
resource.obj["spec"]["suspend"] = True
logger.info(
f"Suspending {resource.kind} {resource.namespace}/{resource.name} (uptime: {uptime}, downtime: {downtime})"
)
event_message = "Suspending CronJob"
event_message = f"Suspending {resource.kind}"
elif resource.kind == "PodDisruptionBudget":
if "minAvailable" in resource.obj["spec"]:
resource.obj["spec"]["minAvailable"] = target_replicas
Expand Down Expand Up @@ -836,6 +838,11 @@ def autoscale_resources(
f"{resource.kind} {resource.namespace}/{resource.name} was excluded (name matches exclusion list)"
)
continue
if resource.kind == 'Job' and 'ownerReferences' in resource.metadata:
logger.debug(
f"{resource.kind} {resource.namespace}/{resource.name} was excluded (Job with ownerReferences)"
)
continue
resources_by_namespace[resource.namespace].append(resource)
except requests.HTTPError as e:
if e.response.status_code == 404:
Expand Down Expand Up @@ -1165,7 +1172,7 @@ def scale(
for clazz in RESOURCE_CLASSES:
plural = clazz.endpoint
if plural in include_resources:
if plural != "jobs":
if scale_jobs_without_admission_controller(plural, admission_controller) or plural != "jobs":
autoscale_resources(
api,
clazz,
Expand Down
205 changes: 204 additions & 1 deletion tests/test_scaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -893,9 +893,9 @@ def get(url, version, **kwargs):
include_resources=include_resources,
exclude_namespaces=[],
exclude_deployments=[],
admission_controller="",
dry_run=False,
grace_period=300,
admission_controller="",
downtime_replicas=0,
enable_events=True,
matching_labels=frozenset([re.compile("")]),
Expand Down Expand Up @@ -993,6 +993,209 @@ def get(url, version, **kwargs):
}
assert json.loads(api.patch.call_args[1]["data"]) == patch_data

def test_scaler_job_suspend_without_admission_controller(monkeypatch):
api = MagicMock()
monkeypatch.setattr(
"kube_downscaler.scaler.helper.get_kube_api", MagicMock(return_value=api)
)
monkeypatch.setattr(
"kube_downscaler.scaler.helper.add_event", MagicMock(return_value=None)
)

def get(url, version, **kwargs):
if url == "pods":
data = {"items": []}
elif url == "jobs":
data = {
"items": [
{
"metadata": {
"name": "job-1",
"namespace": "default",
"creationTimestamp": "2019-03-01T16:38:00Z",
},
"spec": {"suspend": False},
},
]
}
elif url == "namespaces/default":
data = {"metadata": {"annotations": {"downscaler/uptime": "never"}}}
# data = {'metadata': {}}
else:
raise Exception(f"unexpected call: {url}, {version}, {kwargs}")

response = MagicMock()
response.json.return_value = data
return response

api.get = get

include_resources = frozenset(["jobs"])
scale(
namespace=None,
upscale_period="never",
downscale_period="never",
default_uptime="never",
default_downtime="always",
include_resources=include_resources,
exclude_namespaces=[],
exclude_deployments=[],
dry_run=False,
grace_period=300,
admission_controller="",
downtime_replicas=0,
enable_events=True,
matching_labels=frozenset([re.compile("")]),
)

assert api.patch.call_count == 1
assert api.patch.call_args[1]["url"] == "/jobs/job-1"

patch_data = {
"metadata": {
"name": "job-1",
"namespace": "default",
"creationTimestamp": "2019-03-01T16:38:00Z",
"annotations": {ORIGINAL_REPLICAS_ANNOTATION: "1"},
},
"spec": {"suspend": True},
}
assert json.loads(api.patch.call_args[1]["data"]) == patch_data

def test_scaler_job_suspend_without_admission_controller_with_owner_reference(monkeypatch):
api = MagicMock()
monkeypatch.setattr(
"kube_downscaler.scaler.helper.get_kube_api", MagicMock(return_value=api)
)
monkeypatch.setattr(
"kube_downscaler.scaler.helper.add_event", MagicMock(return_value=None)
)

def get(url, version, **kwargs):
if url == "pods":
data = {"items": []}
elif url == "jobs":
data = {
"items": [
{
"metadata": {
"name": "job-1",
"namespace": "default",
"creationTimestamp": "2019-03-01T16:38:00Z",
"ownerReferences": "cron-job-1"
},
"spec": {"suspend": False},
},
]
}
elif url == "namespaces/default":
data = {"metadata": {"annotations": {"downscaler/uptime": "never"}}}
# data = {'metadata': {}}
else:
raise Exception(f"unexpected call: {url}, {version}, {kwargs}")

response = MagicMock()
response.json.return_value = data
return response

api.get = get

include_resources = frozenset(["jobs"])
scale(
namespace=None,
upscale_period="never",
downscale_period="never",
default_uptime="never",
default_downtime="always",
include_resources=include_resources,
exclude_namespaces=[],
exclude_deployments=[],
dry_run=False,
grace_period=300,
admission_controller="",
downtime_replicas=0,
enable_events=True,
matching_labels=frozenset([re.compile("")]),
)

assert api.patch.call_count == 0

def test_scaler_job_unsuspend_without_admission_controller(monkeypatch):
api = MagicMock()
monkeypatch.setattr(
"kube_downscaler.scaler.helper.get_kube_api", MagicMock(return_value=api)
)
monkeypatch.setattr(
"kube_downscaler.scaler.helper.add_event", MagicMock(return_value=None)
)

def get(url, version, **kwargs):
if url == "pods":
data = {"items": []}
elif url == "jobs":
data = {
"items": [
{
"metadata": {
"name": "job-1",
"namespace": "default",
"creationTimestamp": "2019-03-01T16:38:00Z",
"annotations": {ORIGINAL_REPLICAS_ANNOTATION: "1"},
},
"spec": {"suspend": True},
},
]
}
elif url == "namespaces/default":
data = {
"metadata": {
"annotations": {
"downscaler/uptime": "always",
"downscaler/downtime": "never",
}
}
}
# data = {'metadata': {}}
else:
raise Exception(f"unexpected call: {url}, {version}, {kwargs}")

response = MagicMock()
response.json.return_value = data
return response

api.get = get

include_resources = frozenset(["jobs"])
scale(
namespace=None,
upscale_period="never",
downscale_period="never",
default_uptime="never",
default_downtime="always",
include_resources=include_resources,
exclude_namespaces=[],
exclude_deployments=[],
matching_labels=frozenset([re.compile("")]),
dry_run=False,
grace_period=300,
admission_controller="",
downtime_replicas=0,
enable_events=True,
)

assert api.patch.call_count == 1
assert api.patch.call_args[1]["url"] == "/jobs/job-1"

patch_data = {
"metadata": {
"name": "job-1",
"namespace": "default",
"creationTimestamp": "2019-03-01T16:38:00Z",
"annotations": {ORIGINAL_REPLICAS_ANNOTATION: None},
},
"spec": {"suspend": False},
}
assert json.loads(api.patch.call_args[1]["data"]) == patch_data

def test_scaler_downscale_period_no_error(monkeypatch, caplog):
api = MagicMock()
Expand Down

0 comments on commit 9aa2078

Please sign in to comment.