From b7e1572a21d8f895aff3715b2fecaf9bbc3eef05 Mon Sep 17 00:00:00 2001 From: SimonCqk Date: Mon, 5 Dec 2022 22:37:26 +0800 Subject: [PATCH] Revert "feat: enable data caching cross jobs (#263)" This reverts commit 03a223546ccfc470659fd96f39a0654c966ff0d4. --- apis/cache/v1alpha1/cachebackend_types.go | 34 +--- apis/cache/v1alpha1/zz_generated.deepcopy.go | 27 +-- .../bases/cache.kubedl.io_cachebackends.yaml | 31 +-- .../bases/training.kubedl.io_pytorchjobs.yaml | 8 - .../crd/bases/training.kubedl.io_tfjobs.yaml | 8 - controllers/cache/cachebackend_controller.go | 102 +++------- .../cache/cachebackend_controller_test.go | 88 +-------- docs/cache_backend.md | 184 ++++++++---------- example/cachebackend/cache.yaml | 20 -- example/tf/tf_job_mnist_cache.yaml | 15 +- pkg/cache_backend/fluid/fluidcache_test.go | 2 +- pkg/cache_backend/test/cachebackend.go | 8 +- pkg/job_controller/job.go | 27 +-- pkg/job_controller/job_controller.go | 132 ++++--------- pkg/job_controller/job_test.go | 170 ---------------- 15 files changed, 182 insertions(+), 674 deletions(-) delete mode 100644 example/cachebackend/cache.yaml diff --git a/apis/cache/v1alpha1/cachebackend_types.go b/apis/cache/v1alpha1/cachebackend_types.go index a80db89c..cd2e5c3e 100644 --- a/apis/cache/v1alpha1/cachebackend_types.go +++ b/apis/cache/v1alpha1/cachebackend_types.go @@ -17,8 +17,6 @@ limitations under the License. package v1alpha1 import ( - "time" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -28,9 +26,6 @@ const ( // CacheBackendSpec defines the desired state of CacheBackend type CacheBackendSpec struct { - // CacheBackendName is equal to ObjectMeta.Name for now - CacheBackendName string `json:"name,omitempty"` - // The location in the container where the dataset should be mounted MountPath string `json:"mountPath,omitempty"` @@ -39,27 +34,15 @@ type CacheBackendSpec struct { // CacheEngine is different kinds of cache engine CacheEngine *CacheEngine `json:"cacheEngine,omitempty"` - - // Options is used to set additional configuration - Options Options `json:"options,omitempty"` } // CacheBackendStatus defines the observed state of CacheBackend type CacheBackendStatus struct { - // CacheEngine is the current cache engine used by CacheBackend - CacheEngine string `json:"cacheEngine,omitempty"` + // JobName indicates the name of the job that consumes the cache + JobName string `json:"jobName,omitempty"` // CacheStatus is used to help understand the status of a caching process CacheStatus CacheStatus `json:"cacheStatus,omitempty"` - - // UsedBy array contains the jobs currently using this cacheBackends - UsedBy []string `json:"usedBy,omitempty"` - - // UsedNum equals to the size of UsedBy array - UsedNum int `json:"usedNum,omitempty"` - - // LastUsedTime is equal to the completion time of the last job that used CacheBackend - LastUsedTime *metav1.Time `json:"lastUsedTime,omitempty"` } // Dataset is used to define where specific data sources are stored and mounted @@ -124,20 +107,13 @@ const ( CacheCreating CacheStatus = "CacheCreating" ) -type Options struct { - // IdleTime means how long cacheBackend is not currently in use - IdleTime time.Duration `json:"idleTime,omitempty"` -} - // +k8s:defaulter-gen=TypeMeta // +kubebuilder:object:root=true // +kubebuilder:subresource:status // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object -// +kubebuilder:printcolumn:name="Engine",type=string,JSONPath=`.status.cacheEngine` -// +kubebuilder:printcolumn:name="Status",type=string,JSONPath=`.status.cacheStatus` -// +kubebuilder:printcolumn:name="Used-Num",type=integer,JSONPath=`.status.usedNum` -// +kubebuilder:printcolumn:name="Last-Used-Time",type=date,JSONPath=`.status.lastUsedTime` -// +kubebuilder:printcolumn:name="Age",type=date,JSONPath=".metadata.creationTimestamp" +// +kubebuilder:printcolumn:name="Job-Name",type=string,JSONPath=`.status.jobName` +// +kubebuilder:printcolumn:name="Cache-Status",type=string,JSONPath=`.status.cacheStatus` +// +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp" // CacheBackend is the Schema for the cache backends API type CacheBackend struct { diff --git a/apis/cache/v1alpha1/zz_generated.deepcopy.go b/apis/cache/v1alpha1/zz_generated.deepcopy.go index 8229ba7d..7f908b55 100644 --- a/apis/cache/v1alpha1/zz_generated.deepcopy.go +++ b/apis/cache/v1alpha1/zz_generated.deepcopy.go @@ -51,7 +51,7 @@ func (in *CacheBackend) DeepCopyInto(out *CacheBackend) { out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) in.Spec.DeepCopyInto(&out.Spec) - in.Status.DeepCopyInto(&out.Status) + out.Status = in.Status } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new CacheBackend. @@ -117,7 +117,6 @@ func (in *CacheBackendSpec) DeepCopyInto(out *CacheBackendSpec) { *out = new(CacheEngine) (*in).DeepCopyInto(*out) } - out.Options = in.Options } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new CacheBackendSpec. @@ -133,15 +132,6 @@ func (in *CacheBackendSpec) DeepCopy() *CacheBackendSpec { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *CacheBackendStatus) DeepCopyInto(out *CacheBackendStatus) { *out = *in - if in.UsedBy != nil { - in, out := &in.UsedBy, &out.UsedBy - *out = make([]string, len(*in)) - copy(*out, *in) - } - if in.LastUsedTime != nil { - in, out := &in.LastUsedTime, &out.LastUsedTime - *out = (*in).DeepCopy() - } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new CacheBackendStatus. @@ -243,18 +233,3 @@ func (in *Level) DeepCopy() *Level { in.DeepCopyInto(out) return out } - -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *Options) DeepCopyInto(out *Options) { - *out = *in -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Options. -func (in *Options) DeepCopy() *Options { - if in == nil { - return nil - } - out := new(Options) - in.DeepCopyInto(out) - return out -} diff --git a/config/crd/bases/cache.kubedl.io_cachebackends.yaml b/config/crd/bases/cache.kubedl.io_cachebackends.yaml index 5e26c47e..bdd1205f 100644 --- a/config/crd/bases/cache.kubedl.io_cachebackends.yaml +++ b/config/crd/bases/cache.kubedl.io_cachebackends.yaml @@ -17,18 +17,12 @@ spec: scope: Namespaced versions: - additionalPrinterColumns: - - jsonPath: .status.cacheEngine - name: Engine + - jsonPath: .status.jobName + name: Job-Name type: string - jsonPath: .status.cacheStatus - name: Status + name: Cache-Status type: string - - jsonPath: .status.usedNum - name: Used-Num - type: integer - - jsonPath: .status.lastUsedTime - name: Last-Used-Time - type: date - jsonPath: .metadata.creationTimestamp name: Age type: date @@ -81,30 +75,13 @@ spec: type: object mountPath: type: string - name: - type: string - options: - properties: - idleTime: - format: int64 - type: integer - type: object type: object status: properties: - cacheEngine: - type: string cacheStatus: type: string - lastUsedTime: - format: date-time + jobName: type: string - usedBy: - items: - type: string - type: array - usedNum: - type: integer type: object type: object served: true diff --git a/config/crd/bases/training.kubedl.io_pytorchjobs.yaml b/config/crd/bases/training.kubedl.io_pytorchjobs.yaml index c12b7dda..45b1c8dc 100644 --- a/config/crd/bases/training.kubedl.io_pytorchjobs.yaml +++ b/config/crd/bases/training.kubedl.io_pytorchjobs.yaml @@ -92,14 +92,6 @@ spec: type: object mountPath: type: string - name: - type: string - options: - properties: - idleTime: - format: int64 - type: integer - type: object type: object cleanPodPolicy: type: string diff --git a/config/crd/bases/training.kubedl.io_tfjobs.yaml b/config/crd/bases/training.kubedl.io_tfjobs.yaml index 38b72489..fa9a5f1b 100644 --- a/config/crd/bases/training.kubedl.io_tfjobs.yaml +++ b/config/crd/bases/training.kubedl.io_tfjobs.yaml @@ -92,14 +92,6 @@ spec: type: object mountPath: type: string - name: - type: string - options: - properties: - idleTime: - format: int64 - type: integer - type: object type: object cleanPodPolicy: type: string diff --git a/controllers/cache/cachebackend_controller.go b/controllers/cache/cachebackend_controller.go index 4205b79b..bce1fc68 100644 --- a/controllers/cache/cachebackend_controller.go +++ b/controllers/cache/cachebackend_controller.go @@ -19,13 +19,10 @@ package controllers import ( "context" "fmt" - "reflect" - "time" "github.com/go-logr/logr" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" @@ -58,50 +55,23 @@ type CacheBackendReconciler struct { //+kubebuilder:rbac:groups="",resources=persistentvolumeclaims,verbs=get;list;watch;create;update;patch;delete func (r *CacheBackendReconciler) Reconcile(_ context.Context, req ctrl.Request) (ctrl.Result, error) { - r.Log.Info(fmt.Sprintf("Reconciling for CacheBackend %s", req.Name)) + // Get cacheBackend cacheBackend := &cachev1alpha1.CacheBackend{} err := r.Get(context.Background(), req.NamespacedName, cacheBackend) if err != nil { if errors.IsNotFound(err) { - r.Log.Info("CacheBackend doesn't exist", "name", req.String()) + r.Log.Info("cacheBackend doesn't exist", "name", req.String()) return reconcile.Result{}, nil } - r.Log.Error(err, "Failed to get CacheBackend") + r.Log.Error(err, "fail to get cacheBackend") return reconcile.Result{}, err } - status := cacheBackend.Status - oldStatus := cacheBackend.Status.DeepCopy() - - if len(status.UsedBy) != 0 { - status.LastUsedTime = nil - } - status.UsedNum = len(status.UsedBy) - - r.Log.Info(fmt.Sprintf("Current UsedNum equals %d, UsedBy %s", status.UsedNum, status.UsedBy)) - - if status.CacheStatus == cachev1alpha1.PVCCreated && cacheBackend.Spec.Options.IdleTime != 0 && len(status.UsedBy) == 0 { - idleTime := cacheBackend.Spec.Options.IdleTime - if status.LastUsedTime == nil { - status.UsedNum = len(status.UsedBy) - status.LastUsedTime = &metav1.Time{Time: time.Now()} - err = r.updateCacheBackendStatus(cacheBackend, &status) - if err != nil { - return reconcile.Result{Requeue: true}, err - } - // it means that CacheBackend is into idle time - return reconcile.Result{RequeueAfter: idleTime * time.Second}, nil - } else { - if time.Now().After(status.LastUsedTime.Add(idleTime * time.Second)) { - err = r.Delete(context.Background(), cacheBackend) - if err != nil { - return reconcile.Result{Requeue: true}, err - } - } else { - //return reconcile.Result{Requeue: true}, err - } - } + // When pvc has created, no need for reconcile + if cacheBackend.Status.CacheStatus == cachev1alpha1.PVCCreated { + r.Log.Info(fmt.Sprintf("cacheBackend status: is %s, pvc has created, skip reconcile", cacheBackend.Status.CacheStatus), "cacheBackend", cacheBackend.Name) + return reconcile.Result{}, nil } // Check if pvc has created, if created, then update status to PVCCreated and return @@ -113,76 +83,63 @@ func (r *CacheBackendReconciler) Reconcile(_ context.Context, req ctrl.Request) }, pvc) if err == nil { - r.Log.Info(fmt.Sprintf("PVC %s found", pvc.Name)) - status.CacheStatus = cachev1alpha1.PVCCreated - err = r.updateCacheBackendStatus(cacheBackend, &status) + r.Log.Info(fmt.Sprintf("pvc %s found", pvc.Name)) + err = r.updateCacheBackendStatus(cacheBackend, cachev1alpha1.PVCCreated) if err != nil { - return reconcile.Result{Requeue: true}, err + return ctrl.Result{}, err } - return reconcile.Result{RequeueAfter: 10 * time.Second}, nil + return reconcile.Result{}, nil } // If the pvc is not found, then there are two possible scenarios // 1) Cache job has already been committed and pvc is creating - r.Log.Info(fmt.Sprintf("PVC %s not found, try to create CacheBackend", pvcName)) + r.Log.Info(fmt.Sprintf("pvc %s not found, try to create cache backend", pvcName)) if cacheBackend.Status.CacheStatus == cachev1alpha1.PVCCreating { - r.Log.Info(fmt.Sprintf("PVC %s is creating", pvcName), "CacheBackend", cacheBackend.Name) + r.Log.Info(fmt.Sprintf("pvc %s is creating", pvcName), "cacheBackend", cacheBackend.Name) return reconcile.Result{Requeue: true}, nil } - // 2) Cache job has not created, reconciler will start creating a cache job to generate the pvc + // 2) Cache job has not create, reconciler will start creating a cache job to generate the pvc if cacheBackend.Spec.CacheEngine == nil { - r.Log.Error(err, " CacheEngine is undefined", "CacheBackend", cacheBackend.Name) + r.Log.Error(err, "cacheEngine is undefined", "cache backend", cacheBackend.Name) return reconcile.Result{}, nil } else { // Different cache job are created based on the cacheEngine specified in cacheBackend.spec, which is pluggable cacheBackendName, err := registry.CacheBackendName(cacheBackend.Spec.CacheEngine) - status.CacheEngine = cacheBackendName if err != nil { - r.Log.Error(err, " Failed to get CacheBackend name in registry", "CacheBackend", cacheBackend.Name) - return reconcile.Result{Requeue: true}, err + r.Log.Error(err, "failed to get cache backend name in registry", "cacheBackend", cacheBackend.Name) + return ctrl.Result{}, err } cacheEngine := registry.Get(cacheBackendName) err = cacheEngine.CreateCacheJob(cacheBackend) if err != nil { - r.Log.Error(err, " Failed to create job with cache engine", "CacheBackend", cacheBackend.Name) + r.Log.Error(err, "failed to create job with cache engine", "cacheBackend", cacheBackend.Name) // Update status - status.CacheStatus = cachev1alpha1.CacheFailed - err = r.updateCacheBackendStatus(cacheBackend, &status) + err = r.updateCacheBackendStatus(cacheBackend, cachev1alpha1.CacheFailed) if err != nil { - return reconcile.Result{Requeue: true}, err + return ctrl.Result{}, err } return reconcile.Result{Requeue: true}, err } // Update status - status.CacheStatus = cachev1alpha1.PVCCreating - status.LastUsedTime = nil - err = r.updateCacheBackendStatus(cacheBackend, &status) + err = r.updateCacheBackendStatus(cacheBackend, cachev1alpha1.PVCCreating) if err != nil { - return reconcile.Result{Requeue: true}, err - } - } - - if !reflect.DeepEqual(oldStatus, status) { - err := r.updateCacheBackendStatus(cacheBackend, &status) - if err != nil { - return reconcile.Result{Requeue: true}, err + return ctrl.Result{}, err } } return ctrl.Result{}, nil } -func (r *CacheBackendReconciler) updateCacheBackendStatus(cacheBackend *cachev1alpha1.CacheBackend, - status *cachev1alpha1.CacheBackendStatus) error { - +func (r *CacheBackendReconciler) updateCacheBackendStatus(cacheBackend *cachev1alpha1.CacheBackend, status cachev1alpha1.CacheStatus) error { + cacheBackendStatus := cacheBackend.Status.DeepCopy() cacheCopy := cacheBackend.DeepCopy() - cacheCopy.Status = *status.DeepCopy() - + cacheCopy.Status = *cacheBackendStatus + cacheCopy.Status.CacheStatus = status err := r.Status().Update(context.Background(), cacheCopy) if err != nil { - r.Log.Error(err, "failed to update CacheBackend", "CacheBackend", cacheBackend.Name) + r.Log.Error(err, "failed to update cacheBackend", "cacheBackend", cacheBackend.Name) return err } return nil @@ -194,8 +151,3 @@ func (r *CacheBackendReconciler) SetupWithManager(mgr ctrl.Manager) error { For(&cachev1alpha1.CacheBackend{}). Complete(r) } - -func (r *CacheBackendReconciler) updateUsedStatus(status *cachev1alpha1.CacheBackendStatus) { - status.LastUsedTime = &metav1.Time{Time: time.Now()} - status.UsedNum = len(status.UsedBy) -} diff --git a/controllers/cache/cachebackend_controller_test.go b/controllers/cache/cachebackend_controller_test.go index 05816bc4..c2ed2f52 100644 --- a/controllers/cache/cachebackend_controller_test.go +++ b/controllers/cache/cachebackend_controller_test.go @@ -3,9 +3,6 @@ package controllers import ( "context" "testing" - "time" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" @@ -35,7 +32,7 @@ func TestCacheBackendStatus(t *testing.T) { cacheBackend := &testCase - fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(cacheBackend).Build() + fakeClient := fake.NewFakeClientWithScheme(scheme, cacheBackend) // register cacheBackend cacheregistry.RegisterCacheBackends(fakeClient) @@ -88,88 +85,5 @@ func TestCacheBackendStatus(t *testing.T) { }, cacheBackend) assert.Equal(t, v1alpha1.PVCCreated, cacheBackend.Status.CacheStatus) } -} - -func TestCachePolicyBasedOnIdleTime(t *testing.T) { - testCases := []struct { - testName string - cacheBackend *v1alpha1.CacheBackend - cacheStatus v1alpha1.CacheStatus - usedBy []string - expectDeleted bool - }{ - { - testName: "fluid", - cacheBackend: testcase.NewFluidCacheBackend("testCacheBackend", "default"), - cacheStatus: v1alpha1.PVCCreated, - usedBy: []string{}, - expectDeleted: true, - }, - { - testName: "fluid", - cacheBackend: testcase.NewFluidCacheBackend("testCacheBackend", "default"), - cacheStatus: v1alpha1.PVCCreating, - usedBy: []string{}, - expectDeleted: false, - }, - { - testName: "fluid", - cacheBackend: testcase.NewFluidCacheBackend("testCacheBackend", "default"), - cacheStatus: v1alpha1.PVCCreated, - usedBy: []string{"test-job-1", "test-job-2"}, - expectDeleted: false, - }, - } - - for _, testCase := range testCases { - - scheme := runtime.NewScheme() - _ = apis.AddToScheme(scheme) - _ = corev1.AddToScheme(scheme) - cacheBackend := testCase.cacheBackend - fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(cacheBackend).Build() - cacheregistry.RegisterCacheBackends(fakeClient) - - cacheReconciler := &CacheBackendReconciler{ - Client: fakeClient, - Log: ctrl.Log.WithName("controllers").WithName("CacheBackend"), - Scheme: scheme, - } - - request := reconcile.Request{NamespacedName: types.NamespacedName{ - Namespace: cacheBackend.Namespace, - Name: cacheBackend.Name, - }} - - // 1. At first, CacheBackend can be got successfully - err := cacheReconciler.Get(context.TODO(), types.NamespacedName{ - Namespace: cacheBackend.Namespace, - Name: cacheBackend.Name, - }, cacheBackend) - assert.NoError(t, err) - - // 2. Update the IdleTime to control the longest time of CacheBackend to remain - cacheBackend.Status.LastUsedTime = &metav1.Time{Time: time.Date(2022, 9, 1, 10, 39, 0, 0, time.UTC)} - cacheBackend.Spec.Options.IdleTime = time.Now().Sub(cacheBackend.Status.LastUsedTime.Time) - cacheBackend.Status.CacheStatus = testCase.cacheStatus - cacheBackend.Status.UsedBy = testCase.usedBy - - err = cacheReconciler.Update(context.Background(), cacheBackend) - assert.NoError(t, err) - - err = cacheReconciler.Status().Update(context.Background(), cacheBackend) - assert.NoError(t, err) - - // 3. Controller will delete the infrequently used CacheBackend - _, _ = cacheReconciler.Reconcile(context.Background(), request) - - err = cacheReconciler.Get(context.TODO(), types.NamespacedName{ - Namespace: cacheBackend.Namespace, - Name: cacheBackend.Name, - }, cacheBackend) - isDeleted := err != nil - - assert.Equal(t, testCase.expectDeleted, isDeleted) - } } diff --git a/docs/cache_backend.md b/docs/cache_backend.md index 0377f70c..d6702e91 100644 --- a/docs/cache_backend.md +++ b/docs/cache_backend.md @@ -1,10 +1,9 @@ # CacheBackend ## Background - A deep learning job usually consists of data preprocessing, data loading to GPU memory, and model training. In these processes, I/O of datasets is one of the bottlenecks affecting the training time of deep learning jobs. Improving the access speed of datasets can reduce the time spent in training, improve the utilization rate of GPU and the efficiency of model training. -Kubedl supports caching datasets using a third-party caching engine. Based on this feature, the efficiency of users using KubeDL for deep learning job can be improved. +Kubedl supports caching datasets using a third-party caching engine. Based on this feature, the efficiency of users using KubeDL for deep learning job can be improved. ## How To Use @@ -14,86 +13,9 @@ At present, KubeDL only supports [Fluid](https://github.com/fluid-cloudnative/fl In order to use Fluid as the cache backend to speed up access to the dataset, you need to add some fields to job spec. At the same time, you also need to deploy Fluid in your Kubernetes cluster in advance to provide cache support. Please refer to [here](https://github.com/fluid-cloudnative/fluid/blob/master/docs/en/userguide/install.md) for specific deployment steps -#### Check CacheBackend - -To demonstrate how to use the cache feature, here is a demo CacheBackend: - -```yaml -apiVersion: "cache.kubedl.io/v1alpha1" -kind: "CacheBackend" -metadata: - name: "test-cachebackend" -spec: - dataset: - dataSources: - - location: local:///dataset/mnist # path property can be any legal UFS path acknowledged by Alluxio - subDirName: mnist # dataset needs to specify the name of the file directory in the mount path - cacheEngine: - fluid: - alluxioRuntime: - replicas: 1 - tieredStorage: - - cachePath: /dev/shm - quota: "1Gi" - mediumType: MEM - options: - idleTime: 60 -``` - -This example uses the [MNIST dataset](http://yann.lecun.com/exdb/mnist/), which is a very classic dataset in the field of computer vision. If you want to practice this case yourself, you need to download the MNIST dataset and replace the file path at `spec.cachebackend.cacheengine.fluid.dataset.datasources.path`. Note that the prefix `local://` is reserved, which means to load the source file locally (Or you can replace it with any path that can be recognized by alluxio). - -Dataset and AlluxioRuntime are the basic components of Fluid. These two abstractions are used to define dataset and configure cache parameters.The parameter meanings of these two parts are almost the same as those in Fluid (some parameters may be modified in KubeDL to make naming easier to understand) - -CacheBackend supports simple parameters to support more functionality, it mainly using the `options` field. In the above demo file, we added the `idleTime` to `options`. `idleTime` controls the maximum unused time of a CacheBackend. If a CacheBackend has been unused for more than Idletime, the controller will automatically remove the infrequently used CacheBackend. - -#### Create CacheBackend - -The CacheBackend mentioned in the previous section can be found in `example/cachebackend/cache.yaml`. - -```shell -$ kubectl apply -f example/cachebackend/cache.yaml -cachebackend.cache.kubedl.io/test-cachebackend created -``` - -Check the status of CacheBackend. If enabled cache, KubeDL will create a cachebackend. The status is `PVCCreating`, which means that cachebackend is already requesting the Fluid to create dataset and alluxioruntime. - -The `LAST-USED-TIME` means that current CacheBackend is in inactive status and it is equal to the completion time of the last job that used CacheBackend. The `USED-NUM` is the number of jobs that are currently using CacheBackend. - -```shell -$ kubectl get cacheBackend -NAME ENGINE STATUS USED-NUM LAST-USED-TIME AGE -test-cachebackend fluid PVCCreating 28s -``` - -After a while, can view the status of the dataset and the alluxio runtime. At this moment, the dataset has not been cached. - -```shell -$ kubectl get dataset -NAME UFS TOTAL SIZE CACHED CACHE CAPACITY CACHED PERCENTAGE PHASE AGE -test-cachebackend 11.06MiB 0.00B 1.00GiB 0.0% Bound 75s - -$ kubectl get alluxioruntime -NAME MASTER PHASE WORKER PHASE FUSE PHASE AGE -test-cachebackend Ready Ready Ready 56s -``` - -Kubedl will automatically check whether the PVC is created. If the status of cachebackend is updated to `PVCCreated`, it indicates that the PVC created by Fluid for us has been generated. - -```shell -$ kubectl get cacheBackend -NAME ENGINE STATUS USED-NUM LAST-USED-TIME AGE -test-cachebackend fluid PVCCreated 5s 88s - -$ kubectl get pvc -NAME STATUS VOLUME CAPACITY ACCESS MODES STORAGECLASS AGE -test-cachebackend Bound default-test-cachebackend 100Gi ROX fluid 52s -``` - -#### Check Job +#### An Example -Next we will show how to use CacheBackend to speed up jobs in Kubedl. - -To enable CacheBackend, you need to add a CacheBackend object to the Spec field of your job ( line 7 ~ 9 ). It only need to specify which cacheBackend you want to use and which directory you want to mount in Pods. The Controller will automatically bind the PersistVolumeClaim and mount the Volume later. +To demonstrate how to use the cache feature, here is a demo tfjob: ```yaml apiVersion: "training.kubedl.io/v1alpha1" @@ -101,10 +23,21 @@ kind: "TFJob" metadata: name: "tf-cache" spec: - cleanPodPolicy: None -+ cacheBackend: -+ name: test-cachebackend -+ mountPath: "/data" # mountPath is the path to which cached files are mounted to the container + cleanPodPolicy: None + cacheBackend: + mountPath: "/data" # mountPath is the path to which cached files are mounted to the container + dataset: + dataSources: + - location: local:///dataset/mnist # path property can be any legal UFS path acknowledged by Alluxio + subDirName: mnist # dataset needs to specify the name of the file directory in the mount path + cacheEngine: + fluid: + alluxioRuntime: + replicas: 1 + tieredStorage: + - cachePath: /dev/shm + quota: "1Gi" + mediumType: MEM tfReplicaSpecs: Worker: replicas: 1 @@ -128,65 +61,108 @@ spec: requests: cpu: 1024m memory: 1Gi + ``` -#### Create Job +This example uses the [MNIST dataset](http://yann.lecun.com/exdb/mnist/), which is a very classic dataset in the field of computer vision. If you want to practice this case yourself, you need to download the MNIST dataset and replace the file path at `spec.cachebackend.cacheengine.fluid.dataset.datasources.path`. Note that the prefix `local://` is reserved, which means to load the source file locally (Or you can replace it with any path that can be recognized by alluxio). + +Dataset and alluxioruntime are the basic components of Fluid. These two abstractions are used to define dataset and configure cache parameters.The parameter meanings of these two parts are almost the same as those in Fluid (some names may be modified in KubeDL to make naming easier to understand) + +#### Create Job and Check Status + +Create Job ```shell $ kubectl apply -f tf_job_mnist_cache.yaml tfjob.training.kubedl.io/tf-cache created ``` -Check CacheBackend in job status +If enabled cache, KubeDL will first create a cachebackend + +```shell +$ kubectl get cacheBackend +NAME JOB-NAME CACHE-STATUS AGE +cache-tf-cache-9a79c tf-cache CacheCreating 1s +``` + +After that, the status will change to `PVCCreating`. It means that cachebackend is already requesting the Fluid to create dataset and alluxioruntime ```shell -$ kubectl get tfjob -NAME STATE AGE MODEL-VERSION CACHE-BACKEND MAX-LIFETIME TTL-AFTER-FINISHED -tf-cache Running 8s test-cachebackend +$ kubectl get cacheBackend +NAME JOB-NAME CACHE-STATUS AGE +cache-tf-cache-9a79c tf-cache PVCCreating 3s +``` + +After a while, can view the status of the dataset and the alluxio runtime. At this moment, the dataset has not been cached + +```shell +$ kubectl get dataset +NAME UFS TOTAL SIZE CACHED CACHE CAPACITY CACHED PERCENTAGE PHASE AGE +cache-tf-cache-9a79c 11.06MiB NotBound 53s + +$ kubectl get alluxioruntime +NAME MASTER PHASE WORKER PHASE FUSE PHASE AGE +cache-tf-cache-9a79c Ready Ready Ready 59s +``` + +Kubedl will automatically check whether the PVC is created. If the status of cachebackend is updated to `PVCCreated`, it indicates that the PVC created by Fluid for us has been generated + +```shell +$ kubectl get cacheBackend +NAME JOB-NAME CACHE-STATUS AGE +cache-tf-cache-9a79c tf-cache PVCCreated 92s ``` Kubedl will not run the job until PVC is created. When the job is running, PVC has been mounted into the containers ```shell -$ kubectl get po -NAME READY STATUS RESTARTS AGE -tf-cache-worker-0 1/1 Running 0 20s +$ kubectl get po +NAME READY STATUS RESTARTS AGE +cache-tf-cache-9a79c-fuse-z98qb 1/1 Running 0 62s +cache-tf-cache-9a79c-master-0 2/2 Running 0 87s +cache-tf-cache-9a79c-worker-4cq44 2/2 Running 0 62s +tf-cache-worker-0 1/1 Running 0 14s ``` Viewing the status of the dataset, can see that `CACHED PERCENTAGE = 100.0%`, which means that the dataset used in this job has been cached ```shell $ kubectl get dataset -NAME UFS TOTAL SIZE CACHED CACHE CAPACITY CACHED PERCENTAGE PHASE AGE -test-cachebackend 11.06MiB 11.06MiB 1.00GiB 100.0% Bound 2m45s +NAME UFS TOTAL SIZE CACHED CACHE CAPACITY CACHED PERCENTAGE PHASE AGE +cache-tf-cache-9a79c 11.06MiB 11.06MiB 1.00GiB 100.0% Bound 118s ``` Job complete ```shell $ kubectl get po -NAME READY STATUS RESTARTS AGE -tf-cache-worker-0 0/1 Completed 0 2m18s +NAME READY STATUS RESTARTS AGE +cache-tf-cache-9a79c-fuse-z98qb 1/1 Running 0 113s +cache-tf-cache-9a79c-master-0 2/2 Running 0 2m18s +cache-tf-cache-9a79c-worker-4cq44 2/2 Running 0 113s +tf-cache-worker-0 0/1 Completed 0 65s ``` #### Clean Up -Delete Job +Delete job ```shell $ kubectl delete -f tf_job_mnist_cache.yaml tfjob.training.kubedl.io "tf-cache" deleted ``` -Delete CacheBackend. +Delete dataset ```shell -$ kubectl delete cacheBackend test-cachebackend -cachebackend.cache.kubedl.io "test-cachebackend" deleted +$ kubectl delete dataset cache-tf-cache-9a79c +dataset.data.fluid.io "cache-tf-cache-9a79c" deleted ``` -After CacheBackend removed, the `dataset`, `alluxioruntime` and `pvc` will be automatically deleted. +Delete alluxioruntime -## Cache Reuse +```shell +$ kubectl delete alluxioruntime cache-tf-cache-9a79c +alluxioruntime.data.fluid.io "cache-tf-cache-9a79c" deleted +``` -CacheBackend objects have job-independent life cycles. When a job bound to CacheBackend is deleted, CacheBackend and cached datasets will remain in the environment. It means that you can bind multiple jobs to the same CacheBackend or bind the same cacheBackend to different jobs at different times. diff --git a/example/cachebackend/cache.yaml b/example/cachebackend/cache.yaml deleted file mode 100644 index 534e12c8..00000000 --- a/example/cachebackend/cache.yaml +++ /dev/null @@ -1,20 +0,0 @@ -apiVersion: "cache.kubedl.io/v1alpha1" -kind: "CacheBackend" -metadata: - name: "test-cachebackend" -spec: - dataset: - dataSources: - - location: local:///dataset/mnist # path property can be any legal UFS path acknowledged by Alluxio - subDirName: mnist # dataset needs to specify the name of the file directory in the mount path - cacheEngine: - fluid: - alluxioRuntime: - replicas: 1 - tieredStorage: - - cachePath: /dev/shm - quota: "1Gi" - mediumType: MEM - options: - idleTime: 60 - diff --git a/example/tf/tf_job_mnist_cache.yaml b/example/tf/tf_job_mnist_cache.yaml index b29b947a..4499d31f 100755 --- a/example/tf/tf_job_mnist_cache.yaml +++ b/example/tf/tf_job_mnist_cache.yaml @@ -3,10 +3,21 @@ kind: "TFJob" metadata: name: "tf-cache" spec: - cleanPodPolicy: None + cleanPodPolicy: None cacheBackend: - name: test-cachebackend mountPath: "/data" # mountPath is the path to which cached files are mounted to the container + dataset: + dataSources: + - location: local:///dataset/mnist # path property can be any legal UFS path acknowledged by Alluxio + subDirName: mnist # dataset needs to specify the name of the file directory in the mount path + cacheEngine: + fluid: + alluxioRuntime: + replicas: 1 + tieredStorage: + - cachePath: /dev/shm + quota: "1Gi" + mediumType: MEM tfReplicaSpecs: Worker: replicas: 1 diff --git a/pkg/cache_backend/fluid/fluidcache_test.go b/pkg/cache_backend/fluid/fluidcache_test.go index ced5461a..329d3a8f 100644 --- a/pkg/cache_backend/fluid/fluidcache_test.go +++ b/pkg/cache_backend/fluid/fluidcache_test.go @@ -20,7 +20,7 @@ func TestCreateCacheJob(t *testing.T) { _ = apis.AddToScheme(scheme) cacheBackend := testcase.NewFluidCacheBackend("testCacheBackend", "jobName") - fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(cacheBackend).Build() + fakeClient := fake.NewFakeClientWithScheme(scheme, cacheBackend) testFluidCache := &Cache{client: fakeClient} // create job diff --git a/pkg/cache_backend/test/cachebackend.go b/pkg/cache_backend/test/cachebackend.go index 767a8515..22f5431f 100644 --- a/pkg/cache_backend/test/cachebackend.go +++ b/pkg/cache_backend/test/cachebackend.go @@ -14,9 +14,8 @@ func NewFluidCacheBackend(name string, namespace string) *v1alpha1.CacheBackend Name: name, }, Spec: v1alpha1.CacheBackendSpec{ - CacheBackendName: name, - MountPath: "/test/mount/path", - Dataset: &v1alpha1.Dataset{DataSources: []v1alpha1.DataSource{}}, + MountPath: "/test/mount/path", + Dataset: &v1alpha1.Dataset{DataSources: []v1alpha1.DataSource{}}, CacheEngine: &v1alpha1.CacheEngine{ Fluid: &v1alpha1.Fluid{ AlluxioRuntime: &v1alpha1.AlluxioRuntime{ @@ -25,10 +24,9 @@ func NewFluidCacheBackend(name string, namespace string) *v1alpha1.CacheBackend }, }, }, - Options: v1alpha1.Options{IdleTime: 60}, }, Status: v1alpha1.CacheBackendStatus{ - CacheEngine: "fluid", + JobName: "testJobName", CacheStatus: v1alpha1.CacheCreating, }, } diff --git a/pkg/job_controller/job.go b/pkg/job_controller/job.go index 4df4811c..3ab8a4db 100644 --- a/pkg/job_controller/job.go +++ b/pkg/job_controller/job.go @@ -70,7 +70,7 @@ func (jc *JobController) deletePodsAndServices(runPolicy *apiv1.RunPolicy, job i // ReconcileJobs checks and updates replicas for each given ReplicaSpec. // It will requeue the job in case of an error while creating/deleting pods/services. func (jc *JobController) ReconcileJobs(job client.Object, replicas map[apiv1.ReplicaType]*apiv1.ReplicaSpec, jobStatus apiv1.JobStatus, - runPolicy *apiv1.RunPolicy, modelVersion *v1alpha1.ModelVersionSpec, cacheBackendSpec *cachev1alpha1.CacheBackendSpec) (result reconcile.Result, err error) { + runPolicy *apiv1.RunPolicy, modelVersion *v1alpha1.ModelVersionSpec, cacheBackend *cachev1alpha1.CacheBackendSpec) (result reconcile.Result, err error) { jobName := job.GetName() jobKey, err := KeyFunc(job) @@ -102,28 +102,26 @@ func (jc *JobController) ReconcileJobs(job client.Object, replicas map[apiv1.Rep err = code_sync.InjectCodeSyncInitContainers(job, replicas) if err != nil { - log.Error(err, " Failed to inject code sync init container") + log.Error(err, "failed to inject code sync init container") return reconcile.Result{}, err } // TODO(SimonCqk): update job conditions failed ? - if cacheBackendSpec != nil && jobStatus.CacheBackendName == "" && !commonutil.IsSucceeded(jobStatus) { - // Check CacheBackend has been created or not - err = jc.checkCache(job, cacheBackendSpec) - + if cacheBackend != nil { + // Create cache backend + // if cache backend has been created, the func also returns nil + err = jc.createCache(job, cacheBackend, &jobStatus) if err != nil { + log.Error(err, " failed to create cacheBackend") return reconcile.Result{Requeue: true}, err } // Next step is to get pvc and inject it to containers - - err = jc.addCachePathToContainer(job, cacheBackendSpec, replicas) + err = jc.addCachePathToContainer(job, cacheBackend, replicas) if err != nil { - log.Error(err, " Failed to inject pvc to containers") + log.Error(err, " failed to inject pvc to containers") return reconcile.Result{Requeue: true}, err } - - jobStatus.CacheBackendName = cacheBackendSpec.CacheBackendName // update to api-server later } pods, err := jc.Controller.GetPodsForJob(job) @@ -229,13 +227,6 @@ func (jc *JobController) ReconcileJobs(job client.Object, replicas map[apiv1.Rep } } - if cacheBackendSpec != nil { - err = jc.updateCacheUsedStatus(job, cacheBackendSpec) - if err != nil { - return result, err - } - } - if !reflect.DeepEqual(*oldStatus, jobStatus) { return result, jc.Controller.UpdateJobStatusInApiServer(job, &jobStatus) } diff --git a/pkg/job_controller/job_controller.go b/pkg/job_controller/job_controller.go index 67bd41f8..8c5374ef 100644 --- a/pkg/job_controller/job_controller.go +++ b/pkg/job_controller/job_controller.go @@ -14,12 +14,12 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" "k8s.io/kubernetes/pkg/controller" - "k8s.io/utils/strings/slices" controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" cachev1alpha1 "github.com/alibaba/kubedl/apis/cache/v1alpha1" "github.com/alibaba/kubedl/cmd/options" + cachectrl "github.com/alibaba/kubedl/controllers/cache" "github.com/alibaba/kubedl/pkg/gang_schedule" apiv1 "github.com/alibaba/kubedl/pkg/job_controller/api/v1" "github.com/alibaba/kubedl/pkg/metrics" @@ -199,9 +199,10 @@ func (jc *JobController) resolveControllerRef(namespace string, controllerRef *m return job } -func (jc *JobController) checkCache(metaObject metav1.Object, cacheBackendSpec *cachev1alpha1.CacheBackendSpec) error { +func (jc *JobController) createCache(metaObject metav1.Object, cacheBackendSpec *cachev1alpha1.CacheBackendSpec, + jobStatus *apiv1.JobStatus) error { cacheBackend := &cachev1alpha1.CacheBackend{} - cacheBackendName := cacheBackendSpec.CacheBackendName + cacheBackendName := cachectrl.GetCacheName(metaObject) cacheBackendNameSpace := metaObject.GetNamespace() err := jc.Client.Get(context.Background(), types.NamespacedName{ Namespace: cacheBackendNameSpace, @@ -209,74 +210,55 @@ func (jc *JobController) checkCache(metaObject metav1.Object, cacheBackendSpec * }, cacheBackend) if err == nil { - log.Infof("CacheBackend %s has been created", cacheBackendName) - if !slices.Contains(cacheBackend.Status.UsedBy, metaObject.GetName()) { - cacheCpy := cacheBackend.DeepCopy() - cacheCpy.Status.UsedBy = append(cacheBackend.Status.UsedBy, metaObject.GetName()) - err = jc.Client.Status().Update(context.Background(), cacheCpy) - if err != nil { - log.Errorf("Update UsedBy status failed") - return err - } - } + // Already been created + log.Infof("cache backend has been created") + return nil } else { if k8serrors.IsNotFound(err) { + log.Infof("cache backend is not exist, start to create %s", cacheBackendName) + // If haven't created yet - if cacheBackendSpec.Dataset == nil || cacheBackendSpec.CacheEngine == nil { - log.Errorf("Information on creating CacheBackend is missing. The creation failed") + cacheBackend = &cachev1alpha1.CacheBackend{} + cacheBackend.Name = cacheBackendName + cacheBackend.Namespace = cacheBackendNameSpace + cacheBackend.Spec = *cacheBackendSpec + controllerRef := jc.GenOwnerReference(metaObject) + cacheBackend.OwnerReferences = append(cacheBackend.OwnerReferences, *controllerRef) + err = jc.Client.Create(context.Background(), cacheBackend) + if err != nil { + log.Errorf("failed to create cache backend %s", cacheBackend.Name) return err } - log.Infof("CacheBackend is not exist, start to create %s", cacheBackendName) - err = jc.createCache(cacheBackendName, cacheBackendNameSpace, cacheBackendSpec) + + // Update job status + jobStatus.CacheBackendName = cacheBackendName + + // Update cache backend status + cacheCopy := cacheBackend.DeepCopy() + cacheCopy.Status.JobName = metaObject.GetName() + cacheCopy.Status.CacheStatus = cachev1alpha1.CacheCreating + err = jc.Client.Status().Update(context.Background(), cacheCopy) if err != nil { - log.Errorf("Failed to create CacheBackend %s", cacheBackendName) + log.Error(err, "failed to update job name", "cacheBackend", cacheBackend.Name) return err } - log.Infof("CacheBackend %s created", cacheBackendName) + } else { - log.Errorf("Failed to get CacheBackend %s", cacheBackendName) + log.Errorf("failed to get cache backend %s", cacheBackend.Name) return err } } - + log.Infof("cache backend %s created", cacheBackendName) return nil } -func (jc *JobController) createCache(name, namespace string, cacheBackendSpec *cachev1alpha1.CacheBackendSpec) error { - cacheBackend := &cachev1alpha1.CacheBackend{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Namespace: namespace, - }, - Spec: *cacheBackendSpec, - Status: cachev1alpha1.CacheBackendStatus{ - CacheStatus: cachev1alpha1.CacheCreating, - UsedBy: []string{name}, - }, - } - - err := jc.Client.Create(context.Background(), cacheBackend) - return err -} - func (jc JobController) addCachePathToContainer(metaObject metav1.Object, cacheBackend *cachev1alpha1.CacheBackendSpec, replicas map[apiv1.ReplicaType]*apiv1.ReplicaSpec) error { - cacheObj := &cachev1alpha1.CacheBackend{} - err := jc.Client.Get(context.Background(), types.NamespacedName{ - Namespace: metaObject.GetNamespace(), - Name: cacheBackend.CacheBackendName, - }, cacheObj) - - if err != nil { - log.Errorf("Failed to get cacheBackend instance %s when inject cache to cantainers", cacheBackend.CacheBackendName) - } - // Check whether the PVC has been created pvc := &v1.PersistentVolumeClaim{} - //pvcName := cachectrl.GetCacheName(metaObject) - pvcName := cacheBackend.CacheBackendName - err = jc.Client.Get(context.Background(), types.NamespacedName{ + pvcName := cachectrl.GetCacheName(metaObject) + err := jc.Client.Get(context.Background(), types.NamespacedName{ Namespace: metaObject.GetNamespace(), Name: pvcName, }, pvc) @@ -301,20 +283,20 @@ func (jc JobController) addCachePathToContainer(metaObject metav1.Object, cacheB }) } } - jc.addCacheVolumeToPodSpec(pvcName, cacheBackend.MountPath, &spec.Template) + jc.addCacheVolumeToPodSpec(pvcName, cacheBackend, &spec.Template) } } else { if k8serrors.IsNotFound(err) { - log.Errorf("Cannot find pvc %s, waiting to be created", pvcName) + log.Errorf("cannot find pvc %s, waiting to be created", pvcName) } else { - log.Errorf("Failed to get pvc %s", pvcName) + log.Errorf("fail to get pvc %s", pvcName) } } return err } -func (jc JobController) addCacheVolumeToPodSpec(pvcName, mountPath string, pod *v1.PodTemplateSpec) { +func (jc JobController) addCacheVolumeToPodSpec(pvcName string, cacheBackend *cachev1alpha1.CacheBackendSpec, pod *v1.PodTemplateSpec) { pod.Spec.Volumes = append(pod.Spec.Volumes, v1.Volume{ Name: "cachevolume", @@ -327,45 +309,7 @@ func (jc JobController) addCacheVolumeToPodSpec(pvcName, mountPath string, pod * for i, c := range pod.Spec.Containers { pod.Spec.Containers[i].VolumeMounts = append(c.VolumeMounts, v1.VolumeMount{ - Name: "cachevolume", MountPath: mountPath, + Name: "cachevolume", MountPath: cacheBackend.MountPath, }) } } - -func (jc JobController) updateCacheUsedStatus(metaObject metav1.Object, cacheBackendSpec *cachev1alpha1.CacheBackendSpec) (err error) { - cacheBackend := &cachev1alpha1.CacheBackend{} - cacheBackendName := cacheBackendSpec.CacheBackendName - cacheBackendNameSpace := metaObject.GetNamespace() - err = jc.Client.Get(context.Background(), types.NamespacedName{ - Namespace: cacheBackendNameSpace, - Name: cacheBackendName, - }, cacheBackend) - - if err == nil { - //for i, jobName := range cacheBackend.Status.UsedBy { - // if jobName == metaObject.GetName() { - // cacheBackend.Status.UsedBy = append(cacheBackend.Status.UsedBy[:i], cacheBackend.Status.UsedBy[i+1:]...) - // break - // } - //} - - i := slices.Index(cacheBackend.Status.UsedBy, metaObject.GetName()) - if i != -1 { - cacheBackend.Status.UsedBy = append(cacheBackend.Status.UsedBy[:i], cacheBackend.Status.UsedBy[i+1:]...) - log.Infof("Update CacheBackend %s used status", cacheBackendName) - err = jc.Client.Status().Update(context.Background(), cacheBackend) - if err != nil { - log.Errorf("Update CacheBackend %s used status", cacheBackendName) - return err - } - } - - } else { - if k8serrors.IsNotFound(err) { - log.Errorf("Cannot find CacheBackend %s to update used status", cacheBackendName) - } else { - log.Errorf("Failed to get CacheBackend %s to update used status", cacheBackendName) - } - } - return -} diff --git a/pkg/job_controller/job_test.go b/pkg/job_controller/job_test.go index da3bcc25..01882047 100644 --- a/pkg/job_controller/job_test.go +++ b/pkg/job_controller/job_test.go @@ -18,10 +18,8 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client/fake" appv1 "github.com/alibaba/kubedl/apis/apps/v1alpha1" - cachev1alpha1 "github.com/alibaba/kubedl/apis/cache/v1alpha1" "github.com/alibaba/kubedl/apis/model/v1alpha1" "github.com/alibaba/kubedl/cmd/options" - testcachebackend "github.com/alibaba/kubedl/pkg/cache_backend/test" apiv1 "github.com/alibaba/kubedl/pkg/job_controller/api/v1" "github.com/alibaba/kubedl/pkg/metrics" v1 "github.com/alibaba/kubedl/pkg/test_job/v1" @@ -442,171 +440,3 @@ func GetJobByName(mainJobController JobController, namespace string, name string Name: name, }, obj) } - -func TestCreateCache(T *testing.T) { - testCases := []struct { - testName string - cacheBackend *cachev1alpha1.CacheBackend - expectSuccess bool - }{ - { - testName: "fluid (should success)", - cacheBackend: testcachebackend.NewFluidCacheBackend("fluid", "default"), - expectSuccess: true, - }, - } - - for _, testCase := range testCases { - T.Run(testCase.testName, func(T *testing.T) { - - cacheBackend := testCase.cacheBackend - unifiedNamespace := cacheBackend.Namespace - unifiedName := cacheBackend.Name - - job := &v1.TestJob{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: unifiedNamespace, - Name: unifiedName, - }, - } - - scheme := runtime.NewScheme() - _ = appv1.AddToScheme(scheme) - _ = cachev1alpha1.AddToScheme(scheme) - _ = v1.AddToScheme(scheme) - _ = corev1.AddToScheme(scheme) - - fakeClient := fake.NewClientBuilder(). - WithScheme(scheme). - WithRuntimeObjects(job). - Build() - - testJobController := &v1.TestJobController{ - Job: job, - } - - mainJobController := JobController{ - Controller: testJobController, - Client: fakeClient, - } - - // Create CacheBackend - err := mainJobController.createCache(job.Name, job.Namespace, &cacheBackend.Spec) - assert.NoError(T, err) - - // Check - err = fakeClient.Get(context.Background(), types.NamespacedName{ - Namespace: unifiedNamespace, - Name: unifiedName, - }, cacheBackend) - - if assert.NoError(T, err) != testCase.expectSuccess { - T.Errorf("CacheBackend status is unexpected, expected: %v, got: %v", testCase.expectSuccess, assert.NoError(T, err)) - } - }) - } -} - -func newReplica() (replica *apiv1.ReplicaSpec) { - replica = &apiv1.ReplicaSpec{ - RestartPolicy: "Never", - Template: corev1.PodTemplateSpec{ - Spec: corev1.PodSpec{ - Containers: []corev1.Container{ - { - Name: "fakeName", - Image: "kubedl/fakeImage:fakeImageTag", - }, - }, - }, - }, - } - return -} - -func TestAddCachePathToContainer(T *testing.T) { - testCases := []struct { - testName string - cacheBackend *cachev1alpha1.CacheBackend - expectSuccess bool - }{ - { - testName: "fluid (should success)", - cacheBackend: testcachebackend.NewFluidCacheBackend("fluid", "default"), - expectSuccess: true, - }, - } - - for _, testCase := range testCases { - T.Run(testCase.testName, func(T *testing.T) { - - cacheBackend := testCase.cacheBackend - unifiedNamespace := cacheBackend.Namespace - unifiedName := cacheBackend.Name - - job := &v1.TestJob{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: unifiedNamespace, - Name: unifiedName, - }, - } - - replicas := map[apiv1.ReplicaType]*apiv1.ReplicaSpec{ - "Worker": newReplica(), - } - - scheme := runtime.NewScheme() - _ = appv1.AddToScheme(scheme) - _ = cachev1alpha1.AddToScheme(scheme) - _ = v1.AddToScheme(scheme) - _ = corev1.AddToScheme(scheme) - - fakeClient := fake.NewClientBuilder(). - WithScheme(scheme). - WithRuntimeObjects(job). - Build() - - testJobController := &v1.TestJobController{ - Job: job, - } - - mainJobController := JobController{ - Controller: testJobController, - Client: fakeClient, - } - - // the pvc should be created by reconciler of cacheEngine(e.g. fluid), create it manually for test - pvc := &corev1.PersistentVolumeClaim{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: unifiedNamespace, - Name: unifiedName, - }, - } - err := fakeClient.Create(context.Background(), pvc) - - err = fakeClient.Get(context.Background(), types.NamespacedName{ - Namespace: unifiedNamespace, - Name: unifiedName, - }, pvc) - assert.NoError(T, err) - - err = mainJobController.addCachePathToContainer(job, &cacheBackend.Spec, replicas) - assert.NoError(T, err) - - expectedVolume := corev1.Volume{ - Name: "cachevolume", - VolumeSource: corev1.VolumeSource{ - PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ - ClaimName: "fluid", - }, - }, - } - - result := assert.Contains(T, replicas["Worker"].Template.Spec.Volumes, expectedVolume) - - if result != testCase.expectSuccess { - T.Errorf("Volume status is unexpected, expected: %v, got: %v", testCase.expectSuccess, result) - } - }) - } -}