From ac41ef0354362c41dfd1586f8bfef98df0df9b4e Mon Sep 17 00:00:00 2001 From: jian-he Date: Wed, 16 Sep 2020 15:03:53 -0700 Subject: [PATCH] Add ut for tfjob --- controllers/tensorflow/pod.go | 2 +- .../tensorflow/tfjob_controller_test.go | 248 ++++++++++++++++++ pkg/util/k8sutil/k8sutil.go | 8 +- pkg/util/k8sutil/k8sutil.go.orig | 160 +++++++++++ pkg/util/status.go | 12 +- 5 files changed, 421 insertions(+), 9 deletions(-) create mode 100644 controllers/tensorflow/tfjob_controller_test.go create mode 100644 pkg/util/k8sutil/k8sutil.go.orig diff --git a/controllers/tensorflow/pod.go b/controllers/tensorflow/pod.go index 1ebde659..5c2a757b 100644 --- a/controllers/tensorflow/pod.go +++ b/controllers/tensorflow/pod.go @@ -52,7 +52,7 @@ func (r *TFJobReconciler) GetPodsForJob(obj interface{}) ([]*corev1.Pod, error) // If any adoptions are attempted, we should first recheck for deletion // with an uncached quorum read sometime after listing Pods (see #42639). canAdoptFunc := job_controller.RecheckDeletionTimestamp(func() (metav1.Object, error) { - fresh, err := r.GetJobFromAPIClient(job.GetNamespace(), job.GetName()) + fresh, err := r.ctrl.Controller.GetJobFromAPIClient(job.GetNamespace(), job.GetName()) if err != nil { return nil, err } diff --git a/controllers/tensorflow/tfjob_controller_test.go b/controllers/tensorflow/tfjob_controller_test.go new file mode 100644 index 00000000..d2d949dd --- /dev/null +++ b/controllers/tensorflow/tfjob_controller_test.go @@ -0,0 +1,248 @@ +package tensorflow + +import ( + "context" + "flag" + "strings" + "testing" + + "github.com/alibaba/kubedl/api" + tfv1 "github.com/alibaba/kubedl/api/tensorflow/v1" + "github.com/alibaba/kubedl/pkg/gang_schedule/registry" + "github.com/alibaba/kubedl/pkg/job_controller" + "github.com/alibaba/kubedl/pkg/job_controller/api/v1" + "github.com/alibaba/kubedl/pkg/metrics" + "github.com/alibaba/kubedl/pkg/util" + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" + k8scontroller "k8s.io/kubernetes/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/reconcile" +) + +func init() { + // Enable klog which is used in dependencies + _ = flag.Set("logtostderr", "true") + _ = flag.Set("v", "10") +} + +type TFJobReconcilerTest struct { + TFJobReconciler +} + +func (r *TFJobReconcilerTest) GetJobFromAPIClient(namespace, name string) (metav1.Object, error) { + job := &tfv1.TFJob{} + err := r.Get(context.Background(), types.NamespacedName{Namespace: namespace, Name: name}, job) + return job, err +} + +func (r *TFJobReconcilerTest) satisfiedExpectations(tfJob *tfv1.TFJob) bool { + // during unit test, no watch events will happen, hence always return true to trigger reconcile + return true +} + +type FakeJobExpectations struct { + *k8scontroller.ControllerExpectations +} + +func (fe FakeJobExpectations) SatisfiedExpectations(controllerKey string) bool { + // alwasys return true, so that, reconcile loop can always trigger sync, + return true +} + +// NewReconciler returns a new reconcile.Reconciler +func NewReconcilerTest(client client.Client, scheme *runtime.Scheme, + recorder record.EventRecorder, + config job_controller.JobControllerConfiguration) *TFJobReconcilerTest { + r := &TFJobReconcilerTest{ + TFJobReconciler{ + Client: client, + scheme: scheme, + }, + } + r.recorder = recorder + // Initialize pkg job controller with components we only need. + r.ctrl = job_controller.JobController{ + Controller: r, + Expectations: k8scontroller.NewControllerExpectations(), + Config: config, + BackoffStatesQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), + Recorder: r.recorder, + Metrics: metrics.NewJobMetrics(tfv1.Kind, r.Client), + } + if r.ctrl.Config.EnableGangScheduling { + r.ctrl.GangScheduler = registry.Get(r.ctrl.Config.GangSchedulerName) + } + r.ctrl.Expectations = FakeJobExpectations{ControllerExpectations: k8scontroller.NewControllerExpectations()} + return r +} + +// Test Scenario: check the job is succeeded only if all workers are succeeded +// 1. Create a job with 2 replicas +// 2. Mark the 2 pods as running, and check the job is running +// 3. Mark worker0 as succeeded, the job should still be running, because the successPolicy is AllWorkers +// 4. Mark worker1 as succeeded, now assert the job should be succeeded +func TestAllWorkersSuccessPolicy(t *testing.T) { + scheme := runtime.NewScheme() + _ = api.AddToScheme(scheme) + _ = corev1.AddToScheme(scheme) + + // a job with 2 replicas + tfjob := createTFJob("job1", 2) + fakeClient := fake.NewFakeClientWithScheme(scheme, tfjob) + jobControllerConfig := job_controller.JobControllerConfiguration{} + eventBroadcaster := record.NewBroadcaster() + recorder := eventBroadcaster.NewRecorder(scheme, corev1.EventSource{Component: "broadcast-controller"}) + tfJobReconciler := NewReconcilerTest(fakeClient, scheme, recorder, jobControllerConfig) + + jobRequest := reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: "job1", + Namespace: "default", + }, + } + // reconcile the job, it should create 2 replicas + _, _ = tfJobReconciler.Reconcile(jobRequest) + + // mark two pods running + markPodStatus("job1-worker-0", corev1.PodRunning, tfJobReconciler) + markPodStatus("job1-worker-1", corev1.PodRunning, tfJobReconciler) + + // Reconcile again, the job should go into Running state + _, _ = tfJobReconciler.Reconcile(jobRequest) + _ = tfJobReconciler.Get(context.TODO(), jobRequest.NamespacedName, tfjob) + assert.True(t, util.HasCondition(tfjob.Status, v1.JobRunning)) + + // make job1-worker-0 succeed + markPodStatus("job1-worker-0", corev1.PodSucceeded, tfJobReconciler) + + // reconcile again + _, _ = tfJobReconciler.Reconcile(jobRequest) + // one worker succeeded, because of AllWorker SuccessPolicy, the job is still running + _ = tfJobReconciler.Get(context.TODO(), jobRequest.NamespacedName, tfjob) + assert.True(t, util.HasCondition(tfjob.Status, v1.JobRunning)) + + // mark job1-worker-0 succeed too + markPodStatus("job1-worker-1", corev1.PodSucceeded, tfJobReconciler) + + // reconcile again + _, _ = tfJobReconciler.Reconcile(jobRequest) + + // two workers succeeded, the jobs is succeeded + _ = tfJobReconciler.Get(context.TODO(), jobRequest.NamespacedName, tfjob) + assert.True(t, util.HasCondition(tfjob.Status, v1.JobSucceeded)) +} + +func markPodStatus(podName string, status corev1.PodPhase, tfJobReconciler *TFJobReconcilerTest) { + worker := reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: podName, + Namespace: "default", + }, + } + pod := &corev1.Pod{} + _ = tfJobReconciler.Get(context.TODO(), worker.NamespacedName, pod) + + var containerState corev1.ContainerState + switch status { + case corev1.PodSucceeded: + containerState = corev1.ContainerState{ + Terminated: &corev1.ContainerStateTerminated{ + ExitCode: 0, + }, + } + case corev1.PodRunning: + containerState = corev1.ContainerState{ + Running: &corev1.ContainerStateRunning{ + StartedAt: metav1.Now(), + }, + } + } + + pod.Status.ContainerStatuses = []corev1.ContainerStatus{ + { + Name: "tensorflow", + State: containerState, + }, + } + pod.Status.Phase = status + if status == corev1.PodRunning { + now := metav1.Now() + pod.Status.StartTime = &now + } + _ = tfJobReconciler.Status().Update(context.Background(), pod) +} + +func createTFJob(jobName string, replicas int32) *tfv1.TFJob { + successPolicy := tfv1.SuccessPolicyAllWorkers + tfjob1 := &tfv1.TFJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: jobName, + Namespace: "default", + UID: "12345", + }, + + Spec: tfv1.TFJobSpec{ + TFReplicaSpecs: map[v1.ReplicaType]*v1.ReplicaSpec{ + "Worker": { + Replicas: &replicas, + RestartPolicy: "Never", + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "tensorflow", + Image: "kubedl/tf-mnist-with-summaries:1.0", + }, + }, + }, + }, + }, + }, + SuccessPolicy: &successPolicy, + }, + Status: v1.JobStatus{}, + } + return tfjob1 +} + +// not used, maybe using later.. +func createPodForJob(podName string, job *tfv1.TFJob) *corev1.Pod { + labelGroupName := v1.GroupNameLabel + labelJobName := v1.JobNameLabel + groupName := tfv1.GroupName + labels := map[string]string{ + labelGroupName: groupName, + labelJobName: strings.Replace(job.Name, "/", "-", -1), + } + labels[v1.ReplicaTypeLabel] = "Worker" + labels[v1.ReplicaIndexLabel] = "0" + return &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: podName, + Labels: labels, + Namespace: "default", + DeletionTimestamp: nil, + }, + + Status: corev1.PodStatus{ + ContainerStatuses: []corev1.ContainerStatus{ + { + Name: "tensorflow", + State: corev1.ContainerState{ + Terminated: &corev1.ContainerStateTerminated{ + ExitCode: 0, + }, + }, + }, + }, + Phase: corev1.PodSucceeded, + }, + } +} diff --git a/pkg/util/k8sutil/k8sutil.go b/pkg/util/k8sutil/k8sutil.go index 7f07ab79..9981f01c 100644 --- a/pkg/util/k8sutil/k8sutil.go +++ b/pkg/util/k8sutil/k8sutil.go @@ -99,8 +99,12 @@ func FilterActivePods(pods []*v1.Pod) []*v1.Pod { if IsPodActive(p) { result = append(result, p) } else { - log.Infof("Ignoring inactive pod %v/%v in state %v, deletion time %v", - p.Namespace, p.Name, p.Status.Phase, p.DeletionTimestamp) + deletionTimeStamp := "N/A" + if p.DeletionTimestamp != nil { + deletionTimeStamp = p.DeletionTimestamp.String() + } + log.Infof("Ignoring inactive pod %v/%v in state %v, deletion time %s", + p.Namespace, p.Name, p.Status.Phase, deletionTimeStamp) } } return result diff --git a/pkg/util/k8sutil/k8sutil.go.orig b/pkg/util/k8sutil/k8sutil.go.orig new file mode 100644 index 00000000..7f07ab79 --- /dev/null +++ b/pkg/util/k8sutil/k8sutil.go.orig @@ -0,0 +1,160 @@ +// Copyright 2018 The Kubeflow Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package k8sutil + +import ( + "net" + "os" + + apiv1 "github.com/alibaba/kubedl/pkg/job_controller/api/v1" + log "github.com/sirupsen/logrus" + "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" // for gcp auth + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" +) + +// RecommendedConfigPathEnvVar is a environment variable for path configuration +const RecommendedConfigPathEnvVar = "KUBECONFIG" + +// MustNewKubeClient returns new kubernetes client for cluster configuration +func MustNewKubeClient() kubernetes.Interface { + cfg, err := GetClusterConfig() + if err != nil { + log.Fatal(err) + } + return kubernetes.NewForConfigOrDie(cfg) +} + +// GetClusterConfig obtain the config from the Kube configuration used by kubeconfig, or from k8s cluster. +func GetClusterConfig() (*rest.Config, error) { + if len(os.Getenv(RecommendedConfigPathEnvVar)) > 0 { + // use the current context in kubeconfig + // This is very useful for running locally. + return clientcmd.BuildConfigFromFlags("", os.Getenv(RecommendedConfigPathEnvVar)) + } + + // Work around https://github.com/kubernetes/kubernetes/issues/40973 + // See https://github.com/coreos/etcd-operator/issues/731#issuecomment-283804819 + if len(os.Getenv("KUBERNETES_SERVICE_HOST")) == 0 { + addrs, err := net.LookupHost("kubernetes.default.svc") + if err != nil { + panic(err) + } + if err := os.Setenv("KUBERNETES_SERVICE_HOST", addrs[0]); err != nil { + return nil, err + } + } + if len(os.Getenv("KUBERNETES_SERVICE_PORT")) == 0 { + if err := os.Setenv("KUBERNETES_SERVICE_PORT", "443"); err != nil { + panic(err) + } + } + return rest.InClusterConfig() +} + +// IsKubernetesResourceAlreadyExistError throws error when kubernetes resources already exist. +func IsKubernetesResourceAlreadyExistError(err error) bool { + return apierrors.IsAlreadyExists(err) +} + +// IsKubernetesResourceNotFoundError throws error when there is no kubernetes resource found. +func IsKubernetesResourceNotFoundError(err error) bool { + return apierrors.IsNotFound(err) +} + +// TODO(jlewi): CascadeDeletOptions are part of garbage collection policy. +// CascadeDeleteOptions deletes the workload after the grace period +// Do we want to use this? See +// https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/ +func CascadeDeleteOptions(gracePeriodSeconds int64) *metav1.DeleteOptions { + return &metav1.DeleteOptions{ + GracePeriodSeconds: func(t int64) *int64 { return &t }(gracePeriodSeconds), + PropagationPolicy: func() *metav1.DeletionPropagation { + foreground := metav1.DeletePropagationForeground + return &foreground + }(), + } +} + +// FilterActivePods returns pods that have not terminated. +func FilterActivePods(pods []*v1.Pod) []*v1.Pod { + var result []*v1.Pod + for _, p := range pods { + if IsPodActive(p) { + result = append(result, p) + } else { + log.Infof("Ignoring inactive pod %v/%v in state %v, deletion time %v", + p.Namespace, p.Name, p.Status.Phase, p.DeletionTimestamp) + } + } + return result +} + +func IsPodActive(p *v1.Pod) bool { + return v1.PodSucceeded != p.Status.Phase && + v1.PodFailed != p.Status.Phase && + p.DeletionTimestamp == nil +} + +// filterPodCount returns pods based on their phase. +func FilterPodCount(pods []*v1.Pod, phase v1.PodPhase) int32 { + var result int32 + for i := range pods { + if phase == pods[i].Status.Phase { + result++ + } + } + return result +} + +func GetTotalReplicas(replicas map[apiv1.ReplicaType]*apiv1.ReplicaSpec) int32 { + jobReplicas := int32(0) + for _, r := range replicas { + jobReplicas += *r.Replicas + } + return jobReplicas +} + +func GetTotalFailedReplicas(replicas map[apiv1.ReplicaType]*apiv1.ReplicaStatus) int32 { + totalFailedReplicas := int32(0) + for _, status := range replicas { + totalFailedReplicas += status.Failed + } + return totalFailedReplicas +} + +func GetTotalAvtiveReplicas(replicas map[apiv1.ReplicaType]*apiv1.ReplicaStatus) int32 { + totalActiveReplicas := int32(0) + for _, status := range replicas { + totalActiveReplicas += status.Active + } + return totalActiveReplicas +} + +func ResolveDependentOwner(metaObj metav1.Object) (id, name string) { + if controllerRef := metav1.GetControllerOf(metaObj); controllerRef != nil { + return string(controllerRef.UID), controllerRef.Name + } + return "", "" +} + +func GetReplicaType(pod *v1.Pod) (rtype string, ok bool) { + rtype, ok = pod.Labels[apiv1.ReplicaTypeLabel] + return +} diff --git a/pkg/util/status.go b/pkg/util/status.go index 7e1a3d36..3cfeacbd 100644 --- a/pkg/util/status.go +++ b/pkg/util/status.go @@ -24,27 +24,27 @@ const ( // IsSucceeded checks if the job is succeeded. func IsSucceeded(status apiv1.JobStatus) bool { - return hasCondition(status, apiv1.JobSucceeded) + return HasCondition(status, apiv1.JobSucceeded) } // IsFailed checks if the job is failed. func IsFailed(status apiv1.JobStatus) bool { - return hasCondition(status, apiv1.JobFailed) + return HasCondition(status, apiv1.JobFailed) } // IsRunning checks if the job is running. func IsRunning(status apiv1.JobStatus) bool { - return hasCondition(status, apiv1.JobRunning) + return HasCondition(status, apiv1.JobRunning) } // IsCreated checks if the job has created. func IsCreated(status apiv1.JobStatus) bool { - return hasCondition(status, apiv1.JobCreated) + return HasCondition(status, apiv1.JobCreated) } // IsRestart checks if the job is restarting. func IsRestarting(status apiv1.JobStatus) bool { - return hasCondition(status, apiv1.JobRestarting) + return HasCondition(status, apiv1.JobRestarting) } // UpdateJobConditions adds to the jobStatus a new condition if needed, with the conditionType, reason, and message. @@ -64,7 +64,7 @@ func GetCondition(status apiv1.JobStatus, condType apiv1.JobConditionType) *apiv return nil } -func hasCondition(status apiv1.JobStatus, condType apiv1.JobConditionType) bool { +func HasCondition(status apiv1.JobStatus, condType apiv1.JobConditionType) bool { for _, condition := range status.Conditions { if condition.Type == condType && condition.Status == v1.ConditionTrue { return true