Skip to content

Commit

Permalink
add a cronjob crud example
Browse files Browse the repository at this point in the history
  • Loading branch information
xg0719 committed Jun 5, 2022
1 parent be9a47e commit 5645c57
Showing 1 changed file with 130 additions and 0 deletions.
130 changes: 130 additions & 0 deletions examples/cronjob_crud.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
#!/usr/bin/python3
# -*- coding:utf-8 -*-

import json
import time

from kubernetes import client, config

config.load_kube_config()


def create_namespaced_cron_job(namespace='default', body=None):
cronjob_json = body
if body is None:
print('body is required!')
exit(0)
name = body['metadata']['name']
if judge_crontab_exists(namespace, name):
print(f'{name} exists, please do not repeat!')
else:
v1 = client.BatchV1beta1Api()
ret = v1.create_namespaced_cron_job(namespace=namespace, body=cronjob_json, pretty=True,
_preload_content=False, async_req=False)
ret_dict = json.loads(ret.data)
print(f'create succeed\n{json.dumps(ret_dict)}')


def delete_namespaced_cron_job(namespace='default', name=None):
if name is None:
print('name is required!')
exit(0)
if not judge_crontab_exists(namespace, name):
print(f"{name} doesn't exists, please enter a new one!")
else:
v1 = client.BatchV1beta1Api()
ret = v1.delete_namespaced_cron_job(name=name, namespace=namespace, _preload_content=False, async_req=False)
ret_dict = json.loads(ret.data)
print(f'delete succeed\n{json.dumps(ret_dict)}')


def patch_namespaced_cron_job(namespace='default', body=None):
cronjob_json = body
if body is None:
print('body is required!')
exit(0)
name = body['metadata']['name']
if judge_crontab_exists(namespace, name):
v1 = client.BatchV1beta1Api()
ret = v1.patch_namespaced_cron_job(name=name, namespace=namespace, body=cronjob_json,
_preload_content=False, async_req=False)
ret_dict = json.loads(ret.data)
print(f'patch succeed\n{json.dumps(ret_dict)}')
else:
print(f"{name} doesn't exists, please enter a new one!")


def get_cronjob_list(namespace='default'):
v1 = client.BatchV1beta1Api()
ret = v1.list_namespaced_cron_job(namespace=namespace, pretty=True, _preload_content=False)
cron_job_list = json.loads(ret.data)
print(f'cronjob number={len(cron_job_list["items"])}')
# for cron in cron_job_list['items']:
# print(json.dumps(cron))
return cron_job_list["items"]


def judge_crontab_exists(namespace, name):
cron_job_list = get_cronjob_list(namespace)
for cron_job in cron_job_list:
if name == cron_job['metadata']['name']:
return True
return False


def get_cronjob_body(namespace, name, command):
body = {
"apiVersion": "batch/v1beta1",
"kind": "CronJob",
"metadata": {
"name": name,
"namespace": namespace
},
"spec": {
"schedule": "*/1 * * * *",
"concurrencyPolicy": "Allow",
"suspend": False,
"jobTemplate": {
"spec": {
"template": {
"spec": {
"containers": [
{
"name": name,
"image": "busybox:1.35",
"command": command
}
],
"restartPolicy": "Never"
}
}
}
},
"successfulJobsHistoryLimit": 3,
"failedJobsHistoryLimit": 1
}
}
return body


if __name__ == '__main__':
# get
cronjob_list = get_cronjob_list()

# delete
delete_namespaced_cron_job('default', 'hostname')
time.sleep(2)

# create
container_command = [
"/bin/sh",
"-c",
"date; echo Hello from the Kubernetes cluster; hostname"
]
hostname_json = get_cronjob_body('default', 'hostname', container_command)
create_namespaced_cron_job('default', hostname_json)

# update
container_command[2] = "date; echo this is patch; hostname"
hostname_json = get_cronjob_body('default', 'hostname', container_command)
patch_namespaced_cron_job('default', hostname_json)

0 comments on commit 5645c57

Please sign in to comment.