Skip to content

Commit

Permalink
made some ammendments to implement deleteCompletedJobsBeyondThreshold
Browse files Browse the repository at this point in the history
Signed-off-by: Sanskar Bhushan <[email protected]>
  • Loading branch information
sbdtu5498 committed Apr 24, 2023
1 parent d1a0532 commit 7e083bf
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 26 deletions.
2 changes: 1 addition & 1 deletion apis/apps/v1alpha1/cron_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type CronStatus struct {

type CronHistory struct {
// Object is the reference of the historical scheduled cron job.
Object corev1.TypedLocalObjectReference `json:"object"`
Object corev1.ObjectReference `json:"object"`
// Status is the final status when job finished.
Status v1.JobConditionType `json:"status"`
// Created is the creation timestamp of job.
Expand Down
13 changes: 9 additions & 4 deletions config/crd/bases/apps.kubedl.io_crons.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,20 @@ spec:
type: string
object:
properties:
apiGroup:
apiVersion:
type: string
fieldPath:
type: string
kind:
type: string
name:
type: string
required:
- kind
- name
namespace:
type: string
resourceVersion:
type: string
uid:
type: string
type: object
status:
type: string
Expand Down
70 changes: 54 additions & 16 deletions controllers/apps/cron_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,11 @@ func (cc *CronController) Reconcile(_ context.Context, req ctrl.Request) (ctrl.R
if nextDuration != nil {
return ctrl.Result{RequeueAfter: *nextDuration}, nil
}

if err = cc.deleteCompletedJobsBeyondThreshold(&cron); err != nil {
return ctrl.Result{}, err
}

return ctrl.Result{}, nil
}

Expand Down Expand Up @@ -461,14 +466,14 @@ func (cc *CronController) newEmptyWorkload(apiVersion, kind string) (client.Obje
return nil, fmt.Errorf("workload %+v has not implemented client.Object interface", groupVersion)
}

// Deletes successfully complete jobs based on given history limits.
// Deletes successfully completed jobs beyond given history limits.
func (cc *CronController) deleteCompletedJobsBeyondThreshold(cron *v1alpha1.Cron) error {
historyLimit := cron.Spec.HistoryLimit
if historyLimit == nil {
return nil
}

completedWorkloads, err := cc.listCompletedWorkloads(cron)
completedWorkloads, err := cc.listSuccessfullyCompletedWorkloads(cron)
if err != nil {
return err
}
Expand All @@ -479,32 +484,65 @@ func (cc *CronController) deleteCompletedJobsBeyondThreshold(cron *v1alpha1.Cron

// Sort completed workloads by creation timestamp.
sort.Slice(completedWorkloads, func(i, j int) bool {
return completedWorkloads[i].GetCreationTimestamp().Before(&completedWorkloads[j].GetCreationTimestamp())
return completedWorkloads[i].GetCreationTimestamp().Time.Before(completedWorkloads[j].GetCreationTimestamp().Time)
})

// Create a new Scheme object.
s := runtime.NewScheme()

// Register the required Kubernetes API types with the Scheme object.
corev1.AddToScheme(s)
v1alpha1.AddToScheme(s)

// Delete the oldest completed workloads.
for _, wl := range completedWorkloads[:len(completedWorkloads)-int(*historyLimit)] {
if err := cc.deleteWorkload(cron, wl); err != nil {
for _, wlObj := range completedWorkloads[:len(completedWorkloads)-int(*historyLimit)] {
// Convert to ObjectReference
var wlRef corev1.ObjectReference
if err := s.Convert(wlObj, &wlRef, nil); err != nil {
return err
}

if err := cc.deleteWorkload(cron, wlRef); err != nil {
return err
}
}

return nil
}

// Helper function to list completed workloads (jobs) for a given Cron object.
func (cc *CronController) listCompletedWorkloads(cron *v1alpha1.Cron) ([]*batchv1.Job, error) {
jobList, err := cc.jobLister.Jobs(cron.Namespace).List(labels.SelectorFromSet(cron.Spec.JobLabelSelector))
if err != nil {
return nil, err
}
// listSuccessfullyCompletedWorkloads returns a list of successfully completed workloads.
func (cc *CronController) listSuccessfullyCompletedWorkloads(cron *v1alpha1.Cron) ([]metav1.Object, error) {
workloads := make([]metav1.Object, 0)

var completedWorkloads []*batchv1.Job
for _, job := range jobList {
if job.Status.Succeeded >= *job.Spec.Completions {
completedWorkloads = append(completedWorkloads, job)
for _, history := range cron.Status.History {
if history.Status == v1.JobSucceeded {
wl, err := cc.newEmptyWorkload(cron.APIVersion, history.Object.Kind)
if err != nil {
klog.Errorf("unsupported cron workload and failed to init by scheme, kind: %s, err: %v",
history.Object.Kind, err)
continue
}
if err := cc.client.Get(context.Background(), types.NamespacedName{
Name: history.Object.Name,
Namespace: history.Object.Namespace,
}, wl); err != nil {
if errors.IsNotFound(err) {
klog.Infof("completed workload[%s/%s] in cron[%s/%s] has been deleted.",
history.Object.Namespace, history.Object.Name, cron.Namespace, cron.Name)
continue
}
return nil, err
}
metaWl, ok := wl.(metav1.Object)
if !ok {
klog.Warningf("workload [%s/%s] cannot convert to metav1.Object", cron.Namespace, history.Object.Name)
continue
}
workloads = append(workloads, metaWl)
}
}
return completedWorkloads, nil

return workloads, nil
}

func (cc *CronController) deleteWorkload(cron *v1alpha1.Cron, ref corev1.ObjectReference) error {
Expand Down
13 changes: 8 additions & 5 deletions controllers/apps/cron_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,16 +129,19 @@ func inActiveList(active []corev1.ObjectReference, workload metav1.Object) bool
return false
}

func workloadToHistory(wl metav1.Object, apiGroup, kind string) v1alpha1.CronHistory {
func workloadToHistory(wl metav1.Object, apiVersion, kind string) v1alpha1.CronHistory {
status, finished := IsWorkloadFinished(wl)
created := wl.GetCreationTimestamp()
ch := v1alpha1.CronHistory{
Created: &created,
Status: status,
Object: corev1.TypedLocalObjectReference{
APIGroup: &apiGroup,
Kind: kind,
Name: wl.GetName(),
Object: corev1.ObjectReference{
APIVersion: apiVersion,
Kind: kind,
Name: wl.GetName(),
Namespace: wl.GetNamespace(),
ResourceVersion: wl.GetResourceVersion(),
UID: wl.GetUID(),
},
}
if finished {
Expand Down

0 comments on commit 7e083bf

Please sign in to comment.