Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding functionality to delete successfully completed jobs in a FIFO … #293

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apis/apps/v1alpha1/cron_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type CronStatus struct {

type CronHistory struct {
// Object is the reference of the historical scheduled cron job.
Object corev1.TypedLocalObjectReference `json:"object"`
Object corev1.ObjectReference `json:"object"`
// Status is the final status when job finished.
Status v1.JobConditionType `json:"status"`
// Created is the creation timestamp of job.
Expand Down
13 changes: 9 additions & 4 deletions config/crd/bases/apps.kubedl.io_crons.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,20 @@ spec:
type: string
object:
properties:
apiGroup:
apiVersion:
type: string
fieldPath:
type: string
kind:
type: string
name:
type: string
required:
- kind
- name
namespace:
type: string
resourceVersion:
type: string
uid:
type: string
type: object
status:
type: string
Expand Down
84 changes: 84 additions & 0 deletions controllers/apps/cron_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,11 @@ func (cc *CronController) Reconcile(_ context.Context, req ctrl.Request) (ctrl.R
if nextDuration != nil {
return ctrl.Result{RequeueAfter: *nextDuration}, nil
}

if err = cc.deleteCompletedJobsBeyondThreshold(&cron); err != nil {
return ctrl.Result{}, err
}

return ctrl.Result{}, nil
}

Expand Down Expand Up @@ -461,6 +466,85 @@ func (cc *CronController) newEmptyWorkload(apiVersion, kind string) (client.Obje
return nil, fmt.Errorf("workload %+v has not implemented client.Object interface", groupVersion)
}

// Deletes successfully completed jobs beyond given history limits.
func (cc *CronController) deleteCompletedJobsBeyondThreshold(cron *v1alpha1.Cron) error {
historyLimit := cron.Spec.HistoryLimit
if historyLimit == nil {
return nil
}

completedWorkloads, err := cc.listSuccessfullyCompletedWorkloads(cron)
if err != nil {
return err
}

if len(completedWorkloads) <= int(*historyLimit) {
return nil
}

// Sort completed workloads by creation timestamp.
sort.Slice(completedWorkloads, func(i, j int) bool {
return completedWorkloads[i].GetCreationTimestamp().Time.Before(completedWorkloads[j].GetCreationTimestamp().Time)
})

// Create a new Scheme object.
s := runtime.NewScheme()

// Register the required Kubernetes API types with the Scheme object.
corev1.AddToScheme(s)
v1alpha1.AddToScheme(s)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scheme created here can be a static one and being reused.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed it


// Delete the oldest completed workloads.
for _, wlObj := range completedWorkloads[:len(completedWorkloads)-int(*historyLimit)] {
// Convert to ObjectReference
var wlRef corev1.ObjectReference
if err := s.Convert(wlObj, &wlRef, nil); err != nil {
return err
}

if err := cc.deleteWorkload(cron, wlRef); err != nil {
return err
}
}

return nil
}

// listSuccessfullyCompletedWorkloads returns a list of successfully completed workloads.
func (cc *CronController) listSuccessfullyCompletedWorkloads(cron *v1alpha1.Cron) ([]metav1.Object, error) {
workloads := make([]metav1.Object, 0)

for _, history := range cron.Status.History {
if history.Status == v1.JobSucceeded {
wl, err := cc.newEmptyWorkload(cron.APIVersion, history.Object.Kind)
if err != nil {
klog.Errorf("unsupported cron workload and failed to init by scheme, kind: %s, err: %v",
history.Object.Kind, err)
continue
}
if err := cc.client.Get(context.Background(), types.NamespacedName{
Name: history.Object.Name,
Namespace: history.Object.Namespace,
}, wl); err != nil {
if errors.IsNotFound(err) {
klog.Infof("completed workload[%s/%s] in cron[%s/%s] has been deleted.",
history.Object.Namespace, history.Object.Name, cron.Namespace, cron.Name)
continue
}
return nil, err
}
metaWl, ok := wl.(metav1.Object)
if !ok {
klog.Warningf("workload [%s/%s] cannot convert to metav1.Object", cron.Namespace, history.Object.Name)
continue
}
workloads = append(workloads, metaWl)
}
}

return workloads, nil
}

func (cc *CronController) deleteWorkload(cron *v1alpha1.Cron, ref corev1.ObjectReference) error {
wl, err := cc.newEmptyWorkload(ref.APIVersion, ref.Kind)
if err != nil {
Expand Down
13 changes: 8 additions & 5 deletions controllers/apps/cron_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,16 +129,19 @@ func inActiveList(active []corev1.ObjectReference, workload metav1.Object) bool
return false
}

func workloadToHistory(wl metav1.Object, apiGroup, kind string) v1alpha1.CronHistory {
func workloadToHistory(wl metav1.Object, apiVersion, kind string) v1alpha1.CronHistory {
status, finished := IsWorkloadFinished(wl)
created := wl.GetCreationTimestamp()
ch := v1alpha1.CronHistory{
Created: &created,
Status: status,
Object: corev1.TypedLocalObjectReference{
APIGroup: &apiGroup,
Kind: kind,
Name: wl.GetName(),
Object: corev1.ObjectReference{
APIVersion: apiVersion,
Kind: kind,
Name: wl.GetName(),
Namespace: wl.GetNamespace(),
ResourceVersion: wl.GetResourceVersion(),
UID: wl.GetUID(),
},
}
if finished {
Expand Down