Skip to content

Commit

Permalink
Revert "feat: enable data caching cross jobs (#263)"
Browse files Browse the repository at this point in the history
This reverts commit 03a2235.
  • Loading branch information
SimonCqk committed Dec 5, 2022
1 parent 1cd4514 commit b7e1572
Show file tree
Hide file tree
Showing 15 changed files with 182 additions and 674 deletions.
34 changes: 5 additions & 29 deletions apis/cache/v1alpha1/cachebackend_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ limitations under the License.
package v1alpha1

import (
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand All @@ -28,9 +26,6 @@ const (

// CacheBackendSpec defines the desired state of CacheBackend
type CacheBackendSpec struct {
// CacheBackendName is equal to ObjectMeta.Name for now
CacheBackendName string `json:"name,omitempty"`

// The location in the container where the dataset should be mounted
MountPath string `json:"mountPath,omitempty"`

Expand All @@ -39,27 +34,15 @@ type CacheBackendSpec struct {

// CacheEngine is different kinds of cache engine
CacheEngine *CacheEngine `json:"cacheEngine,omitempty"`

// Options is used to set additional configuration
Options Options `json:"options,omitempty"`
}

// CacheBackendStatus defines the observed state of CacheBackend
type CacheBackendStatus struct {
// CacheEngine is the current cache engine used by CacheBackend
CacheEngine string `json:"cacheEngine,omitempty"`
// JobName indicates the name of the job that consumes the cache
JobName string `json:"jobName,omitempty"`

// CacheStatus is used to help understand the status of a caching process
CacheStatus CacheStatus `json:"cacheStatus,omitempty"`

// UsedBy array contains the jobs currently using this cacheBackends
UsedBy []string `json:"usedBy,omitempty"`

// UsedNum equals to the size of UsedBy array
UsedNum int `json:"usedNum,omitempty"`

// LastUsedTime is equal to the completion time of the last job that used CacheBackend
LastUsedTime *metav1.Time `json:"lastUsedTime,omitempty"`
}

// Dataset is used to define where specific data sources are stored and mounted
Expand Down Expand Up @@ -124,20 +107,13 @@ const (
CacheCreating CacheStatus = "CacheCreating"
)

type Options struct {
// IdleTime means how long cacheBackend is not currently in use
IdleTime time.Duration `json:"idleTime,omitempty"`
}

// +k8s:defaulter-gen=TypeMeta
// +kubebuilder:object:root=true
// +kubebuilder:subresource:status
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// +kubebuilder:printcolumn:name="Engine",type=string,JSONPath=`.status.cacheEngine`
// +kubebuilder:printcolumn:name="Status",type=string,JSONPath=`.status.cacheStatus`
// +kubebuilder:printcolumn:name="Used-Num",type=integer,JSONPath=`.status.usedNum`
// +kubebuilder:printcolumn:name="Last-Used-Time",type=date,JSONPath=`.status.lastUsedTime`
// +kubebuilder:printcolumn:name="Age",type=date,JSONPath=".metadata.creationTimestamp"
// +kubebuilder:printcolumn:name="Job-Name",type=string,JSONPath=`.status.jobName`
// +kubebuilder:printcolumn:name="Cache-Status",type=string,JSONPath=`.status.cacheStatus`
// +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"

// CacheBackend is the Schema for the cache backends API
type CacheBackend struct {
Expand Down
27 changes: 1 addition & 26 deletions apis/cache/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 4 additions & 27 deletions config/crd/bases/cache.kubedl.io_cachebackends.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,12 @@ spec:
scope: Namespaced
versions:
- additionalPrinterColumns:
- jsonPath: .status.cacheEngine
name: Engine
- jsonPath: .status.jobName
name: Job-Name
type: string
- jsonPath: .status.cacheStatus
name: Status
name: Cache-Status
type: string
- jsonPath: .status.usedNum
name: Used-Num
type: integer
- jsonPath: .status.lastUsedTime
name: Last-Used-Time
type: date
- jsonPath: .metadata.creationTimestamp
name: Age
type: date
Expand Down Expand Up @@ -81,30 +75,13 @@ spec:
type: object
mountPath:
type: string
name:
type: string
options:
properties:
idleTime:
format: int64
type: integer
type: object
type: object
status:
properties:
cacheEngine:
type: string
cacheStatus:
type: string
lastUsedTime:
format: date-time
jobName:
type: string
usedBy:
items:
type: string
type: array
usedNum:
type: integer
type: object
type: object
served: true
Expand Down
8 changes: 0 additions & 8 deletions config/crd/bases/training.kubedl.io_pytorchjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,6 @@ spec:
type: object
mountPath:
type: string
name:
type: string
options:
properties:
idleTime:
format: int64
type: integer
type: object
type: object
cleanPodPolicy:
type: string
Expand Down
8 changes: 0 additions & 8 deletions config/crd/bases/training.kubedl.io_tfjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,6 @@ spec:
type: object
mountPath:
type: string
name:
type: string
options:
properties:
idleTime:
format: int64
type: integer
type: object
type: object
cleanPodPolicy:
type: string
Expand Down
102 changes: 27 additions & 75 deletions controllers/cache/cachebackend_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,10 @@ package controllers
import (
"context"
"fmt"
"reflect"
"time"

"github.com/go-logr/logr"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -58,50 +55,23 @@ type CacheBackendReconciler struct {
//+kubebuilder:rbac:groups="",resources=persistentvolumeclaims,verbs=get;list;watch;create;update;patch;delete

func (r *CacheBackendReconciler) Reconcile(_ context.Context, req ctrl.Request) (ctrl.Result, error) {
r.Log.Info(fmt.Sprintf("Reconciling for CacheBackend %s", req.Name))

// Get cacheBackend
cacheBackend := &cachev1alpha1.CacheBackend{}
err := r.Get(context.Background(), req.NamespacedName, cacheBackend)
if err != nil {
if errors.IsNotFound(err) {
r.Log.Info("CacheBackend doesn't exist", "name", req.String())
r.Log.Info("cacheBackend doesn't exist", "name", req.String())
return reconcile.Result{}, nil
}
r.Log.Error(err, "Failed to get CacheBackend")
r.Log.Error(err, "fail to get cacheBackend")
return reconcile.Result{}, err
}

status := cacheBackend.Status
oldStatus := cacheBackend.Status.DeepCopy()

if len(status.UsedBy) != 0 {
status.LastUsedTime = nil
}
status.UsedNum = len(status.UsedBy)

r.Log.Info(fmt.Sprintf("Current UsedNum equals %d, UsedBy %s", status.UsedNum, status.UsedBy))

if status.CacheStatus == cachev1alpha1.PVCCreated && cacheBackend.Spec.Options.IdleTime != 0 && len(status.UsedBy) == 0 {
idleTime := cacheBackend.Spec.Options.IdleTime
if status.LastUsedTime == nil {
status.UsedNum = len(status.UsedBy)
status.LastUsedTime = &metav1.Time{Time: time.Now()}
err = r.updateCacheBackendStatus(cacheBackend, &status)
if err != nil {
return reconcile.Result{Requeue: true}, err
}
// it means that CacheBackend is into idle time
return reconcile.Result{RequeueAfter: idleTime * time.Second}, nil
} else {
if time.Now().After(status.LastUsedTime.Add(idleTime * time.Second)) {
err = r.Delete(context.Background(), cacheBackend)
if err != nil {
return reconcile.Result{Requeue: true}, err
}
} else {
//return reconcile.Result{Requeue: true}, err
}
}
// When pvc has created, no need for reconcile
if cacheBackend.Status.CacheStatus == cachev1alpha1.PVCCreated {
r.Log.Info(fmt.Sprintf("cacheBackend status: is %s, pvc has created, skip reconcile", cacheBackend.Status.CacheStatus), "cacheBackend", cacheBackend.Name)
return reconcile.Result{}, nil
}

// Check if pvc has created, if created, then update status to PVCCreated and return
Expand All @@ -113,76 +83,63 @@ func (r *CacheBackendReconciler) Reconcile(_ context.Context, req ctrl.Request)
}, pvc)

if err == nil {
r.Log.Info(fmt.Sprintf("PVC %s found", pvc.Name))
status.CacheStatus = cachev1alpha1.PVCCreated
err = r.updateCacheBackendStatus(cacheBackend, &status)
r.Log.Info(fmt.Sprintf("pvc %s found", pvc.Name))
err = r.updateCacheBackendStatus(cacheBackend, cachev1alpha1.PVCCreated)
if err != nil {
return reconcile.Result{Requeue: true}, err
return ctrl.Result{}, err
}
return reconcile.Result{RequeueAfter: 10 * time.Second}, nil
return reconcile.Result{}, nil
}

// If the pvc is not found, then there are two possible scenarios
// 1) Cache job has already been committed and pvc is creating
r.Log.Info(fmt.Sprintf("PVC %s not found, try to create CacheBackend", pvcName))
r.Log.Info(fmt.Sprintf("pvc %s not found, try to create cache backend", pvcName))
if cacheBackend.Status.CacheStatus == cachev1alpha1.PVCCreating {
r.Log.Info(fmt.Sprintf("PVC %s is creating", pvcName), "CacheBackend", cacheBackend.Name)
r.Log.Info(fmt.Sprintf("pvc %s is creating", pvcName), "cacheBackend", cacheBackend.Name)
return reconcile.Result{Requeue: true}, nil
}

// 2) Cache job has not created, reconciler will start creating a cache job to generate the pvc
// 2) Cache job has not create, reconciler will start creating a cache job to generate the pvc
if cacheBackend.Spec.CacheEngine == nil {
r.Log.Error(err, " CacheEngine is undefined", "CacheBackend", cacheBackend.Name)
r.Log.Error(err, "cacheEngine is undefined", "cache backend", cacheBackend.Name)
return reconcile.Result{}, nil
} else {
// Different cache job are created based on the cacheEngine specified in cacheBackend.spec, which is pluggable
cacheBackendName, err := registry.CacheBackendName(cacheBackend.Spec.CacheEngine)
status.CacheEngine = cacheBackendName
if err != nil {
r.Log.Error(err, " Failed to get CacheBackend name in registry", "CacheBackend", cacheBackend.Name)
return reconcile.Result{Requeue: true}, err
r.Log.Error(err, "failed to get cache backend name in registry", "cacheBackend", cacheBackend.Name)
return ctrl.Result{}, err
}
cacheEngine := registry.Get(cacheBackendName)
err = cacheEngine.CreateCacheJob(cacheBackend)
if err != nil {
r.Log.Error(err, " Failed to create job with cache engine", "CacheBackend", cacheBackend.Name)
r.Log.Error(err, "failed to create job with cache engine", "cacheBackend", cacheBackend.Name)
// Update status
status.CacheStatus = cachev1alpha1.CacheFailed
err = r.updateCacheBackendStatus(cacheBackend, &status)
err = r.updateCacheBackendStatus(cacheBackend, cachev1alpha1.CacheFailed)
if err != nil {
return reconcile.Result{Requeue: true}, err
return ctrl.Result{}, err
}
return reconcile.Result{Requeue: true}, err
}

// Update status
status.CacheStatus = cachev1alpha1.PVCCreating
status.LastUsedTime = nil
err = r.updateCacheBackendStatus(cacheBackend, &status)
err = r.updateCacheBackendStatus(cacheBackend, cachev1alpha1.PVCCreating)
if err != nil {
return reconcile.Result{Requeue: true}, err
}
}

if !reflect.DeepEqual(oldStatus, status) {
err := r.updateCacheBackendStatus(cacheBackend, &status)
if err != nil {
return reconcile.Result{Requeue: true}, err
return ctrl.Result{}, err
}
}

return ctrl.Result{}, nil
}

func (r *CacheBackendReconciler) updateCacheBackendStatus(cacheBackend *cachev1alpha1.CacheBackend,
status *cachev1alpha1.CacheBackendStatus) error {

func (r *CacheBackendReconciler) updateCacheBackendStatus(cacheBackend *cachev1alpha1.CacheBackend, status cachev1alpha1.CacheStatus) error {
cacheBackendStatus := cacheBackend.Status.DeepCopy()
cacheCopy := cacheBackend.DeepCopy()
cacheCopy.Status = *status.DeepCopy()

cacheCopy.Status = *cacheBackendStatus
cacheCopy.Status.CacheStatus = status
err := r.Status().Update(context.Background(), cacheCopy)
if err != nil {
r.Log.Error(err, "failed to update CacheBackend", "CacheBackend", cacheBackend.Name)
r.Log.Error(err, "failed to update cacheBackend", "cacheBackend", cacheBackend.Name)
return err
}
return nil
Expand All @@ -194,8 +151,3 @@ func (r *CacheBackendReconciler) SetupWithManager(mgr ctrl.Manager) error {
For(&cachev1alpha1.CacheBackend{}).
Complete(r)
}

func (r *CacheBackendReconciler) updateUsedStatus(status *cachev1alpha1.CacheBackendStatus) {
status.LastUsedTime = &metav1.Time{Time: time.Now()}
status.UsedNum = len(status.UsedBy)
}
Loading

0 comments on commit b7e1572

Please sign in to comment.