diff --git a/apis/training/v1alpha1/pytorchjob_types.go b/apis/training/v1alpha1/pytorchjob_types.go index d6d064d7..c0bececc 100644 --- a/apis/training/v1alpha1/pytorchjob_types.go +++ b/apis/training/v1alpha1/pytorchjob_types.go @@ -49,7 +49,7 @@ type PyTorchJobSpec struct { // EnableElastic decides whether torch elastic is enabled for job. // +optional - EnableElastic bool `json:"enableElastic"` + EnableElastic bool `json:"enableElastic,omitempty"` // ElasticPolicy is used to configure the torch elastic-based elastic scaling support for distributed training job. // +optional @@ -64,7 +64,7 @@ type ElasticPolicy struct { // upper limit for the number of pods that can be set by the autoscaler; cannot be smaller than MinReplicas, defaults to null. MaxReplicas *int32 `json:"maxReplicas,omitempty"` - RDZVBackend string `json:"rdzvBackend,omitempty"` + RDZVBackend string `json:"rdzvBackend"` RdzvEndpoint string `json:"rdzvEndpoint"` // Number of workers per node; supported values: [auto, cpu, gpu, int]. diff --git a/config/crd/bases/training.kubedl.io_elasticdljobs.yaml b/config/crd/bases/training.kubedl.io_elasticdljobs.yaml index e10314e2..4ba30e4e 100644 --- a/config/crd/bases/training.kubedl.io_elasticdljobs.yaml +++ b/config/crd/bases/training.kubedl.io_elasticdljobs.yaml @@ -3119,6 +3119,8 @@ spec: currentReplicas: format: int32 type: integer + elasticCondition: + type: string lastReplicas: format: int32 type: integer diff --git a/config/crd/bases/training.kubedl.io_marsjobs.yaml b/config/crd/bases/training.kubedl.io_marsjobs.yaml index 35b38e02..79fadc23 100644 --- a/config/crd/bases/training.kubedl.io_marsjobs.yaml +++ b/config/crd/bases/training.kubedl.io_marsjobs.yaml @@ -3141,6 +3141,8 @@ spec: currentReplicas: format: int32 type: integer + elasticCondition: + type: string lastReplicas: format: int32 type: integer diff --git a/config/crd/bases/training.kubedl.io_mpijobs.yaml b/config/crd/bases/training.kubedl.io_mpijobs.yaml index c49737f6..e283f7dc 100644 --- a/config/crd/bases/training.kubedl.io_mpijobs.yaml +++ b/config/crd/bases/training.kubedl.io_mpijobs.yaml @@ -6164,6 +6164,8 @@ spec: currentReplicas: format: int32 type: integer + elasticCondition: + type: string lastReplicas: format: int32 type: integer diff --git a/config/crd/bases/training.kubedl.io_pytorchjobs.yaml b/config/crd/bases/training.kubedl.io_pytorchjobs.yaml index 98e582e5..3238995d 100644 --- a/config/crd/bases/training.kubedl.io_pytorchjobs.yaml +++ b/config/crd/bases/training.kubedl.io_pytorchjobs.yaml @@ -128,6 +128,7 @@ spec: rdzvEndpoint: type: string required: + - rdzvBackend - rdzvEndpoint type: object enableElastic: @@ -3226,6 +3227,8 @@ spec: currentReplicas: format: int32 type: integer + elasticCondition: + type: string lastReplicas: format: int32 type: integer diff --git a/config/crd/bases/training.kubedl.io_tfjobs.yaml b/config/crd/bases/training.kubedl.io_tfjobs.yaml index 65272230..5bd61575 100644 --- a/config/crd/bases/training.kubedl.io_tfjobs.yaml +++ b/config/crd/bases/training.kubedl.io_tfjobs.yaml @@ -3208,6 +3208,8 @@ spec: currentReplicas: format: int32 type: integer + elasticCondition: + type: string lastReplicas: format: int32 type: integer diff --git a/config/crd/bases/training.kubedl.io_xdljobs.yaml b/config/crd/bases/training.kubedl.io_xdljobs.yaml index d4edc6ca..a9bb6cd0 100644 --- a/config/crd/bases/training.kubedl.io_xdljobs.yaml +++ b/config/crd/bases/training.kubedl.io_xdljobs.yaml @@ -3125,6 +3125,8 @@ spec: currentReplicas: format: int32 type: integer + elasticCondition: + type: string lastReplicas: format: int32 type: integer diff --git a/config/crd/bases/training.kubedl.io_xgboostjobs.yaml b/config/crd/bases/training.kubedl.io_xgboostjobs.yaml index 357757d7..c74e55e5 100644 --- a/config/crd/bases/training.kubedl.io_xgboostjobs.yaml +++ b/config/crd/bases/training.kubedl.io_xgboostjobs.yaml @@ -3119,6 +3119,8 @@ spec: currentReplicas: format: int32 type: integer + elasticCondition: + type: string lastReplicas: format: int32 type: integer diff --git a/controllers/pytorch/elastic_scale.go b/controllers/pytorch/elastic_scale.go index 06364e23..bce51efc 100644 --- a/controllers/pytorch/elastic_scale.go +++ b/controllers/pytorch/elastic_scale.go @@ -35,6 +35,7 @@ const ( AnnotationCheckpointRequestedVersion = v1.KubeDLPrefix + "/ckpt-requested-version" AnnotationCheckpointCompletedVersion = v1.KubeDLPrefix + "/ckpt-completed-version" AnnotationReadyToStartWorker = v1.KubeDLPrefix + "/ready-to-start-worker" + AnnotationReadyToRestartWorker = v1.KubeDLPrefix + "/ready-to-restart-worker" AnnotationImmediatelyStartWorker = v1.KubeDLPrefix + "/immediately-start-worker" AnnotationWorldSize = v1.KubeDLPrefix + "/world-size" ) diff --git a/controllers/pytorch/pytorchjob_controller.go b/controllers/pytorch/pytorchjob_controller.go index 50d6908c..ac51a030 100644 --- a/controllers/pytorch/pytorchjob_controller.go +++ b/controllers/pytorch/pytorchjob_controller.go @@ -319,7 +319,10 @@ func (r *PytorchJobReconciler) SetClusterSpec(ctx context.Context, job interface Name: "PYTHONUNBUFFERED", Value: "0", }) - podTemplate.Spec.Containers[i].Args = append(launchElasticArgs, podTemplate.Spec.Containers[i].Args...) + + if pytorchJob.Spec.EnableElastic && pytorchJob.Spec.ElasticPolicy != nil { + podTemplate.Spec.Containers[i].Args = append(launchElasticArgs, podTemplate.Spec.Containers[i].Args...) + } if enableElasticScaling && rtype != "aimaster" { // Job enables elastic scaling select value of AnnotationWorldSize as its diff --git a/controllers/torchelastic/elastic.go b/controllers/torchelastic/elastic.go new file mode 100644 index 00000000..d1c7d90f --- /dev/null +++ b/controllers/torchelastic/elastic.go @@ -0,0 +1,223 @@ +/* +Copyright 2022 The Alibaba Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package torchelastic + +import ( + "context" + training "github.com/alibaba/kubedl/apis/training/v1alpha1" + apiv1 "github.com/alibaba/kubedl/pkg/job_controller/api/v1" + logger "github.com/sirupsen/logrus" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" + "reflect" +) + +func (ts *TorchElasticController) start(ctx context.Context, cancel context.CancelFunc, name, namespace string) { + sharedPytorchJob := &training.PyTorchJob{} + jobName := name + jobNamespace := namespace + + // Create metrics for each torch elastic job. + ts.locker.Lock() + if _, ok := ts.metrics[jobName]; !ok { + ts.metrics[jobName] = make(map[int32][]MetricObservation) + } + ts.locker.Unlock() + + err := ts.Client.Get(ctx, types.NamespacedName{Namespace: jobNamespace, Name: jobName}, sharedPytorchJob) + if err != nil { + logger.Infof("try to get job %s from namespace %s but it has been deleted", jobName, jobNamespace) + // cancel the elastic scaling process context of the deleted job. + defer cancel() + return + } + + pytorchJob := sharedPytorchJob.DeepCopy() + if pytorchJob.Spec.ElasticPolicy.MaxReplicas == nil || pytorchJob.Spec.ElasticPolicy.MinReplicas == nil { + logger.Infof("pytorch job %s does not configure the max or min replicas", pytorchJob.Name) + defer cancel() + delete(ts.torchElasticJobs, makeElasticJobName(pytorchJob.Name, pytorchJob.Namespace)) + return + } + + if pytorchJob.Status.ElasticStatus == nil { + initializeElasticStatuses(pytorchJob, training.PyTorchReplicaTypeWorker) + if err := ts.UpdateJobStatusInApiServer(pytorchJob, &pytorchJob.Status); err != nil { + if errors.IsConflict(err) { + // retry later when update operation violates with etcd concurrency control. + log.Info("fail to update pytorch job") + } + } + return + } + + jobStatus := pytorchJob.Status.DeepCopy() + oldStatus := jobStatus.DeepCopy() + if pytorchJob.Status.CompletionTime != nil || pytorchJob.DeletionTimestamp != nil { + logger.Infof("job %s has been completed or deleted and does not need to do elastic scaling", pytorchJob.Name) + defer cancel() + delete(ts.torchElasticJobs, makeElasticJobName(pytorchJob.Name, pytorchJob.Namespace)) + delete(ts.metrics, makeElasticJobName(pytorchJob.Name, pytorchJob.Namespace)) + return + } + + currentReplicas := *pytorchJob.Spec.PyTorchReplicaSpecs[training.PyTorchReplicaTypeWorker].Replicas + + // Wait for all pods running and judge whether there exists pending or failed pods. + hasPendingPod, hasFailedPod := ts.waitForAllPodsRunning(pytorchJob) + + // If job has pending pods and current replicas are more than min replicas, return to the last replicas. + if hasPendingPod && currentReplicas > *pytorchJob.Spec.ElasticPolicy.MinReplicas { + lastReplicas := jobStatus.ElasticStatus[training.PyTorchReplicaTypeWorker].LastReplicas + *pytorchJob.Spec.PyTorchReplicaSpecs[training.PyTorchReplicaTypeWorker].Replicas = lastReplicas + // Return to the last replicas. + if err := ts.Client.Update(ctx, pytorchJob); err != nil { + log.Info("fail to update replicas of pytorch job") + } + + updateElasticStatusForPendingJob(pytorchJob, lastReplicas, training.PyTorchReplicaTypeWorker) + if err := ts.UpdateJobStatusInApiServer(pytorchJob, &pytorchJob.Status); err != nil { + if errors.IsConflict(err) { + // retry later when update operation violates with etcd concurrency control. + log.Info("fail to update pytorch job") + } + } + return + + // If job has pending pods and current replicas equals to the min replicas, cancel the elastic scaling process context. + } else if (hasPendingPod && currentReplicas == *pytorchJob.Spec.ElasticPolicy.MinReplicas) || hasFailedPod { + defer cancel() + logger.Info("pods did not reach the running state at min replicas or job is failed, so the elastic scaling controller shutdown") + delete(ts.torchElasticJobs, makeElasticJobName(pytorchJob.Name, pytorchJob.Namespace)) + return + } + + if !hasPendingPod && jobStatus.ElasticStatus != nil && jobStatus.ElasticStatus[training.PyTorchReplicaTypeWorker].Continue == false { + // If job metrics have reached the max, restart stale pods. + if jobStatus.ElasticStatus[training.PyTorchReplicaTypeWorker].ElasticCondition == apiv1.ElasticMaxMetric { + pods, err := ts.GetPodsForJob(pytorchJob) + if err != nil { + logger.Warnf("Get Pods For Job error %v", err) + } + // Restart stale torch elastic pods. + complete := ts.restartStalePytorchPods(pods, pytorchJob) + if !complete { + logger.Info("restart pods does not complete") + return + } + logger.Info("restart pods has completed") + jobStatus.ElasticStatus[training.PyTorchReplicaTypeWorker].ElasticCondition = apiv1.ElasticStop + if err = ts.UpdateJobStatusInApiServer(pytorchJob, jobStatus); err != nil { + if errors.IsConflict(err) { + // retry later when update operation violates with etcd concurrency control. + logger.Info("fail to update pytorch job status") + return + } + } + return + // If current replicas reach the defined max replicas or elastic condition is stopped, return directly. + } else if jobStatus.ElasticStatus[training.PyTorchReplicaTypeWorker].ElasticCondition == apiv1.ElasticStop || jobStatus.ElasticStatus[training.PyTorchReplicaTypeWorker].ElasticCondition == apiv1.ElasticMaxReplica { + log.Info("Pytorch job does not need to be scaled") + return + } + } + + // Read training logs from pytorch pods and save the observation. + observation, err := read(ts.client, jobNamespace, GetDefaultWorkerName(jobName)) + if err != nil { + logger.Infof("fail to read training logs: %v", err) + return + } + + ts.locker.Lock() + defer ts.locker.Unlock() + + // Create metrics for current replicas. + if _, ok := ts.metrics[jobName][currentReplicas]; !ok { + ts.metrics[jobName][currentReplicas] = make([]MetricObservation, 0) + } + ts.metrics[jobName][currentReplicas] = append(ts.metrics[jobName][currentReplicas], observation) + currentLength := len(ts.metrics[jobName][currentReplicas]) + logger.Infof("Current metric length: %d", currentLength) + + // If current metrics have reached the metric count, judge the next scaling replicas. + if currentLength >= ts.metricCount { + if currentReplicas > *pytorchJob.Spec.ElasticPolicy.MinReplicas && currentReplicas <= *pytorchJob.Spec.ElasticPolicy.MaxReplicas { + lastReplicas := jobStatus.ElasticStatus[training.PyTorchReplicaTypeWorker].LastReplicas + + if ts.IsSatisfyElasticContinue(jobName, currentReplicas, lastReplicas) { + if currentReplicas == *pytorchJob.Spec.ElasticPolicy.MaxReplicas { + updateElasticStatusForMaxReplicaJob(pytorchJob, training.PyTorchReplicaTypeWorker) + ts.metrics[jobName][currentReplicas] = make([]MetricObservation, 0) + } else { + newReplicas := computeNewReplicas(currentReplicas) + *pytorchJob.Spec.PyTorchReplicaSpecs[training.PyTorchReplicaTypeWorker].Replicas = newReplicas + if err := ts.Client.Update(ctx, pytorchJob); err != nil { + log.Info("fail to update pytorch job") + } + + updateElasticStatusForContinueJob(pytorchJob, currentReplicas, newReplicas, training.PyTorchReplicaTypeWorker) + if _, ok := ts.metrics[jobName][newReplicas]; !ok { + ts.metrics[jobName][newReplicas] = make([]MetricObservation, 0) + } + } + + } else { + *pytorchJob.Spec.PyTorchReplicaSpecs[training.PyTorchReplicaTypeWorker].Replicas = lastReplicas + if err := ts.Client.Update(ctx, pytorchJob); err != nil { + log.Info("fail to update pytorch job") + } + + updateElasticStatusForMaxMetricJob(pytorchJob, currentReplicas, lastReplicas, training.PyTorchReplicaTypeWorker) + ts.metrics[jobName][lastReplicas] = make([]MetricObservation, 0) + ts.metrics[jobName][currentReplicas] = make([]MetricObservation, 0) + } + + } else if currentReplicas == *pytorchJob.Spec.ElasticPolicy.MinReplicas && currentReplicas < *pytorchJob.Spec.ElasticPolicy.MaxReplicas { + newReplicas := computeNewReplicas(currentReplicas) + *pytorchJob.Spec.PyTorchReplicaSpecs[training.PyTorchReplicaTypeWorker].Replicas = newReplicas + if err := ts.Client.Update(ctx, pytorchJob); err != nil { + log.Info("fail to update pytorch job") + } + + updateElasticStatusForContinueJob(pytorchJob, currentReplicas, newReplicas, training.PyTorchReplicaTypeWorker) + if _, ok := ts.metrics[jobName][newReplicas]; !ok { + ts.metrics[jobName][newReplicas] = make([]MetricObservation, 0) + } + + } else if currentReplicas == *pytorchJob.Spec.ElasticPolicy.MaxReplicas { + updateElasticStatusForMaxReplicaJob(pytorchJob, training.PyTorchReplicaTypeWorker) + if _, ok := ts.metrics[jobName][currentReplicas]; ok { + ts.metrics[jobName][currentReplicas] = make([]MetricObservation, 0) + } + + } + } + + // No need to update the job status if the status hasn't changed since last time. + if !reflect.DeepEqual(*oldStatus, pytorchJob.Status) { + if err = ts.UpdateJobStatusInApiServer(pytorchJob, &pytorchJob.Status); err != nil { + if errors.IsConflict(err) { + // retry later when update operation violates with etcd concurrency control. + logger.Info("fail to update pytorch job status") + return + } + } + } + + return +} diff --git a/controllers/torchelastic/elastic_controller.go b/controllers/torchelastic/elastic_controller.go index b71cbca6..c65204b5 100644 --- a/controllers/torchelastic/elastic_controller.go +++ b/controllers/torchelastic/elastic_controller.go @@ -1,19 +1,30 @@ +/* +Copyright 2022 The Alibaba Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package torchelastic import ( - "github.com/alibaba/kubedl/controllers/torchelastic/job" ctrl "sigs.k8s.io/controller-runtime" ) -const ( - controllerName = "ElasticScalingController" -) - func SetupWithManager(mgr ctrl.Manager) error { // New torch elastic controller. // period represents the time elastic scaling loop repeats. - // count represents the length of training metrics collection for each scale replicas. - torchElasticController := job.NewTorchElasticController(mgr, 30, 5) + // count represents the length of training metrics collection for each replica. + torchElasticController := NewTorchElasticController(mgr, 30, 5) if err := torchElasticController.SetupWithManager(mgr); err != nil { return err diff --git a/controllers/torchelastic/job.go b/controllers/torchelastic/job.go new file mode 100644 index 00000000..3817a7a9 --- /dev/null +++ b/controllers/torchelastic/job.go @@ -0,0 +1,126 @@ +/* +Copyright 2022 The Alibaba Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package torchelastic + +import ( + "context" + "fmt" + training "github.com/alibaba/kubedl/apis/training/v1alpha1" + apiv1 "github.com/alibaba/kubedl/pkg/job_controller/api/v1" + commonutil "github.com/alibaba/kubedl/pkg/util" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +func makeElasticJobName(name, namespace string) string { + return name + "-" + namespace +} + +// UpdateJobStatusInApiServer updates the job status in API server +func (ts *TorchElasticController) UpdateJobStatusInApiServer(job interface{}, jobStatus *apiv1.JobStatus) error { + torchElasticJob, ok := job.(*training.PyTorchJob) + if !ok { + return fmt.Errorf("%+v is not a type of PytorchJob", torchElasticJob) + } + var jobCpy *training.PyTorchJob + // Job status passed in differs with status in job, update in basis of the passed in one. + jobCpy = torchElasticJob.DeepCopy() + jobCpy.Status = *jobStatus.DeepCopy() + return ts.Status().Update(context.Background(), jobCpy) +} + +// initializeReplicaStatuses initializes the ElasticStatuses for replica. +func initializeElasticStatuses(pytorchJob *training.PyTorchJob, rtype apiv1.ReplicaType) { + jobStatus := &pytorchJob.Status + if jobStatus.ElasticStatus == nil { + jobStatus.ElasticStatus = make(map[apiv1.ReplicaType]*apiv1.ElasticScalingStatus) + } + + jobStatus.ElasticStatus[rtype] = &apiv1.ElasticScalingStatus{ElasticCondition: apiv1.ElasticStart} + jobStatus.ElasticStatus[rtype].CurrentReplicas = *pytorchJob.Spec.PyTorchReplicaSpecs[rtype].Replicas + jobStatus.ElasticStatus[rtype].Continue = true + now := metav1.Now() + jobStatus.ElasticStatus[rtype].LastUpdateTime = &now +} + +func updateElasticStatusForPendingJob(pytorchJob *training.PyTorchJob, lastReplicas int32, rtype apiv1.ReplicaType) { + jobStatus := &pytorchJob.Status + jobStatus.ElasticStatus[rtype].Continue = false + jobStatus.ElasticStatus[rtype].LastReplicas = jobStatus.ElasticStatus[training.PyTorchReplicaTypeWorker].CurrentReplicas + jobStatus.ElasticStatus[rtype].CurrentReplicas = lastReplicas + jobStatus.ElasticStatus[rtype].Message = "There exists pending pods, return to the last replicas" + now := metav1.Now() + jobStatus.ElasticStatus[rtype].LastUpdateTime = &now + jobStatus.ElasticStatus[rtype].ElasticCondition = apiv1.ElasticStop +} + +func updateElasticStatusForContinueJob(pytorchJob *training.PyTorchJob, currentReplicas, newReplicas int32, rtype apiv1.ReplicaType) { + jobStatus := &pytorchJob.Status + jobStatus.ElasticStatus[rtype].LastReplicas = currentReplicas + jobStatus.ElasticStatus[rtype].CurrentReplicas = newReplicas + jobStatus.ElasticStatus[rtype].Message = "Pytorch job continues to be scaled" + now := metav1.Now() + jobStatus.ElasticStatus[rtype].LastUpdateTime = &now + jobStatus.ElasticStatus[rtype].Continue = true + jobStatus.ElasticStatus[rtype].ElasticCondition = apiv1.ElasticContinue +} + +func updateElasticStatusForMaxReplicaJob(pytorchJob *training.PyTorchJob, rtype apiv1.ReplicaType) { + jobStatus := &pytorchJob.Status + jobStatus.ElasticStatus[rtype].Message = "Pytorch job has reached the max replicas" + jobStatus.ElasticStatus[rtype].Continue = false + jobStatus.ElasticStatus[rtype].ElasticCondition = apiv1.ElasticMaxReplica +} + +func updateElasticStatusForMaxMetricJob(pytorchJob *training.PyTorchJob, currentReplicas, lastReplicas int32, rtype apiv1.ReplicaType) { + jobStatus := &pytorchJob.Status + jobStatus.ElasticStatus[rtype].CurrentReplicas = lastReplicas + jobStatus.ElasticStatus[rtype].LastReplicas = currentReplicas + jobStatus.ElasticStatus[rtype].Message = "Pytorch job has reached the max metrics" + now := metav1.Now() + jobStatus.ElasticStatus[rtype].LastUpdateTime = &now + jobStatus.ElasticStatus[rtype].Continue = false + jobStatus.ElasticStatus[rtype].ElasticCondition = apiv1.ElasticMaxMetric +} + +func (ts *TorchElasticController) IsSatisfyElasticContinue(jobName string, currentReplicas, lastReplicas int32) bool { + currentLength := ts.metricCount + currentLatency := ts.metrics[jobName][currentReplicas][currentLength-1].Latency + lastReplicaLatency := ts.metrics[jobName][lastReplicas][currentLength-1].Latency + //Decide whether the elastic scaling can continue by the ratio of batch training latency and replicas. + return lastReplicaLatency/float64(lastReplicas) > currentLatency/float64(currentReplicas) +} + +func computeNewReplicas(currentReplicas int32) int32 { + // Double the replicas in the next elastic scaling loop. + return currentReplicas * 2 +} + +func (ts *TorchElasticController) GetPodsForJob(job *training.PyTorchJob) ([]*v1.Pod, error) { + selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{ + MatchLabels: ts.GenLabels(job.Name), + }) + // List all pods to include those that don't match the selector anymore + // but have a ControllerRef pointing to this controller. + podList := &v1.PodList{} + err = ts.Client.List(context.Background(), podList, client.MatchingLabelsSelector{Selector: selector}) + if err != nil { + return nil, err + } + return commonutil.ToPodPointerList(podList.Items), nil +} diff --git a/controllers/torchelastic/job/job_elastic_controller.go b/controllers/torchelastic/job/job_elastic_controller.go deleted file mode 100644 index b25b94e8..00000000 --- a/controllers/torchelastic/job/job_elastic_controller.go +++ /dev/null @@ -1,21 +0,0 @@ -package job - -import ( - "k8s.io/apimachinery/pkg/runtime" - ctrl "sigs.k8s.io/controller-runtime" - logf "sigs.k8s.io/controller-runtime/pkg/log" -) - -// ElasticController implementations -type ElasticController interface { - SetupWithManager(mgr ctrl.Manager) error -} - -var _ ElasticController = &TorchElasticController{} - -type newJobElasticController func(mgr ctrl.Manager, period, count int) ElasticController - -var ( - log = logf.Log.WithName("job-elastic-controller") - jobElasticCtrlMap = make(map[runtime.Object]newJobElasticController) -) diff --git a/controllers/torchelastic/job/torchelastic_controller.go b/controllers/torchelastic/job/torchelastic_controller.go deleted file mode 100644 index f00e99ac..00000000 --- a/controllers/torchelastic/job/torchelastic_controller.go +++ /dev/null @@ -1,469 +0,0 @@ -package job - -import ( - "context" - "fmt" - training "github.com/alibaba/kubedl/apis/training/v1alpha1" - apiv1 "github.com/alibaba/kubedl/pkg/job_controller/api/v1" - controllerv1 "github.com/alibaba/kubedl/pkg/job_controller/api/v1" - commonutil "github.com/alibaba/kubedl/pkg/util" - "github.com/alibaba/kubedl/pkg/util/concurrent" - logger "github.com/sirupsen/logrus" - v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/kubernetes" - "reflect" - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller" - "sigs.k8s.io/controller-runtime/pkg/event" - "sigs.k8s.io/controller-runtime/pkg/handler" - "sigs.k8s.io/controller-runtime/pkg/predicate" - "sigs.k8s.io/controller-runtime/pkg/source" - "strings" - "sync" - "time" -) - -const ( - controllerName = "TorchElasticController" - interval = 5 * time.Second - podReadyTimeout = 1 * time.Minute - logTimeout = 1 * time.Minute -) - -type name string -type namespace string - -func init() { - jobElasticCtrlMap[&training.PyTorchJob{}] = NewTorchElasticController -} - -func NewTorchElasticController(mgr ctrl.Manager, period, count int) ElasticController { - metrics := make(map[string]map[int32][]MetricObservation) - torchJobs := make(map[string]TorchElasticJob) - return &TorchElasticController{ - period: period, - metricCount: count, - client: kubernetes.NewForConfigOrDie(mgr.GetConfig()), - metrics: metrics, - torchElasticJobs: torchJobs, - Client: mgr.GetClient(), - } -} - -type TorchElasticController struct { - period int - metricCount int - client *kubernetes.Clientset - client.Client - metrics map[string]map[int32][]MetricObservation - locker sync.Mutex - torchElasticJobs map[string]TorchElasticJob -} - -type TorchElasticJob struct { - Name string - Namespace string - ctx context.Context - cancelFunc context.CancelFunc -} - -type MetricObservation struct { - Epoch int32 `json:"epoch,omitempty"` - Batch int32 `json:"batch,omitempty"` - Accuracy float64 `json:"accuracy,omitempty"` - Latency float64 `json:"latency,omitempty"` -} - -func (ts *TorchElasticController) Reconcile(_ context.Context, req ctrl.Request) (ctrl.Result, error) { - pytorchJob := training.PyTorchJob{} - err := ts.Client.Get(context.Background(), types.NamespacedName{ - Namespace: req.Namespace, - Name: req.Name, - }, &pytorchJob) - - if err != nil { - if errors.IsNotFound(err) { - log.Info("try to fetch pytorch job but it has been deleted.", "key", req.String()) - return ctrl.Result{}, nil - } - return ctrl.Result{}, err - } - return ctrl.Result{}, nil -} - -func (ts *TorchElasticController) SetupWithManager(mgr ctrl.Manager) error { - c, err := controller.New(controllerName, mgr, controller.Options{Reconciler: ts}) - if err != nil { - return err - } - // Watch events with pod events-handler. - if err = c.Watch(&source.Kind{Type: &training.PyTorchJob{}}, &handler.EnqueueRequestForObject{}, predicate.Funcs{ - CreateFunc: onOwnerCreateFunc(ts), - DeleteFunc: onOwnerDeleteFunc(ts), - }); err != nil { - return err - } - - ctx := context.Background() - go wait.UntilWithContext(ctx, ts.startElasticForAllJobs, time.Duration(ts.period)*(time.Second)) - log.Info("Start Elastic Scaling Controller Loop") - - ctx.Done() - log.Info("Shutting down Elastic Scaling Controller Loop") - return nil -} - -func onOwnerCreateFunc(ts *TorchElasticController) func(e event.CreateEvent) bool { - return func(e event.CreateEvent) bool { - pytorchJob, ok := e.Object.(*training.PyTorchJob) - if !ok { - return true - } - if !pytorchJob.Spec.EnableElastic && pytorchJob.Spec.ElasticPolicy == nil { - return true - } - ctx, cancel := context.WithCancel(context.Background()) - ctx = context.WithValue(ctx, name("job"), pytorchJob.Name) - ctx = context.WithValue(ctx, namespace("namespace"), pytorchJob.Namespace) - logger.Info("Create torch elastic job: ", pytorchJob.Name, " in namespace: ", pytorchJob.Namespace) - ts.torchElasticJobs[makeElasticJobName(pytorchJob.Name, pytorchJob.Namespace)] = TorchElasticJob{ - Name: pytorchJob.Name, - Namespace: pytorchJob.Namespace, - ctx: ctx, - cancelFunc: cancel, - } - return true - } -} - -func onOwnerDeleteFunc(ts *TorchElasticController) func(e event.DeleteEvent) bool { - return func(e event.DeleteEvent) bool { - pytorchJob, ok := e.Object.(*training.PyTorchJob) - if !ok { - return true - } - if !pytorchJob.Spec.EnableElastic && pytorchJob.Spec.ElasticPolicy == nil { - return true - } - - logger.Infof("Deleting elastic scaling for pytorch job %s from namespace %s", pytorchJob.Name, pytorchJob.Namespace) - // Delete job infos saved in Torch Elastic controller. - if _, ok := ts.torchElasticJobs[makeElasticJobName(pytorchJob.Name, pytorchJob.Namespace)]; ok { - cancel := ts.torchElasticJobs[makeElasticJobName(pytorchJob.Name, pytorchJob.Namespace)].cancelFunc - defer cancel() - delete(ts.torchElasticJobs, makeElasticJobName(pytorchJob.Name, pytorchJob.Namespace)) - } - - return true - } -} - -// Start elastic scaling loop for all torch elastic jobs. -func (ts *TorchElasticController) startElasticForAllJobs(ctx context.Context) { - tickets := 100 // max semaphore tickets limited. - if len(ts.torchElasticJobs) < 100 { - tickets = len(ts.torchElasticJobs) - } - sema := concurrent.NewSemaphore(tickets) - for _, torchJob := range ts.torchElasticJobs { - sema.Acquire() - - go func(job TorchElasticJob) { - defer sema.Release() - //Start elastic scaling for each torch elastic job. - ts.start(job.ctx, job.cancelFunc, job.Name, job.Namespace) - }(torchJob) - } - // block until all semaphore is released. - sema.Wait() -} - -func (ts *TorchElasticController) start(ctx context.Context, cancel context.CancelFunc, name, namespace string) { - sharedPytorchJob := &training.PyTorchJob{} - jobName := name - jobNamespace := namespace - - // Create metrics for each torch elastic job. - ts.locker.Lock() - if _, ok := ts.metrics[jobName]; !ok { - ts.metrics[jobName] = make(map[int32][]MetricObservation) - } - ts.locker.Unlock() - - err := ts.Client.Get(ctx, types.NamespacedName{Namespace: jobNamespace, Name: jobName}, sharedPytorchJob) - if err != nil { - logger.Infof("try to get job %s from namespace %s but it has been deleted", jobName, jobNamespace) - // cancel the elastic scaling process context of the deleted job. - defer cancel() - return - } - - pytorchJob := sharedPytorchJob.DeepCopy() - if pytorchJob.Spec.ElasticPolicy.MaxReplicas == nil || pytorchJob.Spec.ElasticPolicy.MinReplicas == nil { - logger.Infof("pytorch job %s does not configure the max or min replicas", pytorchJob.Name) - defer cancel() - delete(ts.torchElasticJobs, makeElasticJobName(pytorchJob.Name, pytorchJob.Namespace)) - return - } - - if pytorchJob.Status.ElasticStatus == nil { - initializeElasticStatuses(&pytorchJob.Status, training.PyTorchReplicaTypeWorker) - pytorchJob.Status.ElasticStatus[training.PyTorchReplicaTypeWorker].CurrentReplicas = *pytorchJob.Spec.PyTorchReplicaSpecs[training.PyTorchReplicaTypeWorker].Replicas - pytorchJob.Status.ElasticStatus[training.PyTorchReplicaTypeWorker].Continue = true - now := metav1.Now() - pytorchJob.Status.ElasticStatus[training.PyTorchReplicaTypeWorker].LastUpdateTime = &now - if err := ts.UpdateJobStatusInApiServer(pytorchJob, &pytorchJob.Status); err != nil { - if errors.IsConflict(err) { - // retry later when update operation violates with etcd concurrency control. - log.Info("fail to update pytorch job") - } - } - return - } - - jobStatus := pytorchJob.Status.DeepCopy() - oldStatus := jobStatus.DeepCopy() - if pytorchJob.Status.CompletionTime != nil || pytorchJob.DeletionTimestamp != nil { - logger.Infof("job %s has been completed or deleted and does not need to do elastic scaling", pytorchJob.Name) - defer cancel() - delete(ts.torchElasticJobs, makeElasticJobName(pytorchJob.Name, pytorchJob.Namespace)) - return - } - - currentReplicas := *pytorchJob.Spec.PyTorchReplicaSpecs[training.PyTorchReplicaTypeWorker].Replicas - // Get all pods for the pytorch job. - pods, err := ts.GetPodsForJob(pytorchJob) - if err != nil { - logger.Warnf("Get Pods For Job error %v", err) - } - hasPendingPod := false - hasFailedPod := false - - // Wait for all pods running with timeout seconds. - waitErr := wait.PollImmediate(interval, podReadyTimeout, func() (bool, error) { - for _, pod := range pods { - if isRunning := podRunning(pod); !isRunning { - return false, nil - } - } - return true, nil - }) - if waitErr != nil { - logger.Info("pods did not reach the running state") - } - - for _, pod := range pods { - if pod.Status.Phase == v1.PodPending { - hasPendingPod = true - break - } - } - for _, pod := range pods { - if pod.Status.Phase == v1.PodFailed { - hasFailedPod = true - break - } - } - - // If job has pending pods and current replicas are more than min replicas, return to the last replicas. - if hasPendingPod && currentReplicas > *pytorchJob.Spec.ElasticPolicy.MinReplicas { - lastReplicas := jobStatus.ElasticStatus[training.PyTorchReplicaTypeWorker].LastReplicas - *pytorchJob.Spec.PyTorchReplicaSpecs[training.PyTorchReplicaTypeWorker].Replicas = lastReplicas - // Return to the last replicas. - if err := ts.Client.Update(ctx, pytorchJob); err != nil { - log.Info("fail to update replicas of pytorch job") - } - jobStatus.ElasticStatus[training.PyTorchReplicaTypeWorker].Continue = false - jobStatus.ElasticStatus[training.PyTorchReplicaTypeWorker].LastReplicas = jobStatus.ElasticStatus[training.PyTorchReplicaTypeWorker].CurrentReplicas - jobStatus.ElasticStatus[training.PyTorchReplicaTypeWorker].CurrentReplicas = lastReplicas - jobStatus.ElasticStatus[training.PyTorchReplicaTypeWorker].Message = "there exists pending pods, return to the last replicas" - now := metav1.Now() - jobStatus.ElasticStatus[training.PyTorchReplicaTypeWorker].LastUpdateTime = &now - if err := ts.UpdateJobStatusInApiServer(pytorchJob, jobStatus); err != nil { - if errors.IsConflict(err) { - // retry later when update operation violates with etcd concurrency control. - log.Info("fail to update pytorch job") - } - } - return - - // If job has pending pods and current replicas equals to the min replicas, cancel the elastic scaling process context. - } else if (hasPendingPod && currentReplicas == *pytorchJob.Spec.ElasticPolicy.MinReplicas) || hasFailedPod { - defer cancel() - logger.Info("pods did not reach the running state at min replicas or job is failed, so the elastic scaling controller shutdown") - delete(ts.torchElasticJobs, makeElasticJobName(pytorchJob.Name, pytorchJob.Namespace)) - return - } - - // If job does not need to be scaled, return directly. - if !hasPendingPod && jobStatus.ElasticStatus != nil && jobStatus.ElasticStatus[training.PyTorchReplicaTypeWorker].Continue == false { - log.Info("pytorch job does not need to be scaled") - return - } - - // Read training logs from pytorch pods and save the observation. - observation, err := read(ts.client, jobNamespace, GetDefaultWorkerName(jobName)) - if err != nil { - logger.Infof("fail to read training logs: %s", err) - return - } - - ts.locker.Lock() - defer ts.locker.Unlock() - - // Create metrics for current replicas. - if _, ok := ts.metrics[jobName][currentReplicas]; !ok { - ts.metrics[jobName][currentReplicas] = make([]MetricObservation, 0) - } - ts.metrics[jobName][currentReplicas] = append(ts.metrics[jobName][currentReplicas], observation) - currentLength := len(ts.metrics[jobName][currentReplicas]) - logger.Infof("Current metric length: %d", currentLength) - // If current metrics have reached the metric count, judge the next scaling replicas. - if currentLength >= ts.metricCount { - if currentReplicas > *pytorchJob.Spec.ElasticPolicy.MinReplicas && currentReplicas <= *pytorchJob.Spec.ElasticPolicy.MaxReplicas { - lastReplicas := jobStatus.ElasticStatus[training.PyTorchReplicaTypeWorker].LastReplicas - currentLatency := ts.metrics[jobName][currentReplicas][currentLength-1].Latency - lastReplicaLatency := ts.metrics[jobName][lastReplicas][currentLength-1].Latency - log.Info("last latency: ", lastReplicaLatency, "current latency: ", currentLatency) - - if (lastReplicaLatency / float64(lastReplicas)) > (currentLatency / float64(currentReplicas)) { - - if currentReplicas == *pytorchJob.Spec.ElasticPolicy.MaxReplicas { - jobStatus.ElasticStatus[training.PyTorchReplicaTypeWorker].Message = "The pytorch job has reached the MaxReplicas" - jobStatus.ElasticStatus[training.PyTorchReplicaTypeWorker].Continue = false - ts.metrics[jobName][currentReplicas] = make([]MetricObservation, 0) - } else { - newReplicas := currentReplicas + 1 - *pytorchJob.Spec.PyTorchReplicaSpecs[training.PyTorchReplicaTypeWorker].Replicas = newReplicas - if err := ts.Client.Update(ctx, pytorchJob); err != nil { - log.Info("fail to update pytorch job") - } - - jobStatus.ElasticStatus[training.PyTorchReplicaTypeWorker].LastReplicas = currentReplicas - jobStatus.ElasticStatus[training.PyTorchReplicaTypeWorker].CurrentReplicas = newReplicas - jobStatus.ElasticStatus[training.PyTorchReplicaTypeWorker].Message = "continues to scale pytorch job" - now := metav1.Now() - jobStatus.ElasticStatus[training.PyTorchReplicaTypeWorker].LastUpdateTime = &now - jobStatus.ElasticStatus[training.PyTorchReplicaTypeWorker].Continue = true - if _, ok := ts.metrics[jobName][newReplicas]; !ok { - ts.metrics[jobName][newReplicas] = make([]MetricObservation, 0) - } - } - - } else { - *pytorchJob.Spec.PyTorchReplicaSpecs[training.PyTorchReplicaTypeWorker].Replicas = jobStatus.ElasticStatus[training.PyTorchReplicaTypeWorker].LastReplicas - if err := ts.Client.Update(ctx, pytorchJob); err != nil { - log.Info("fail to update pytorch job") - } - jobStatus.ElasticStatus[training.PyTorchReplicaTypeWorker].CurrentReplicas = lastReplicas - jobStatus.ElasticStatus[training.PyTorchReplicaTypeWorker].LastReplicas = currentReplicas - jobStatus.ElasticStatus[training.PyTorchReplicaTypeWorker].Message = "The pytorch job does not need to be scaled, then go back to the last choice" - now := metav1.Now() - jobStatus.ElasticStatus[training.PyTorchReplicaTypeWorker].LastUpdateTime = &now - jobStatus.ElasticStatus[training.PyTorchReplicaTypeWorker].Continue = false - ts.metrics[jobName][lastReplicas] = make([]MetricObservation, 0) - ts.metrics[jobName][currentReplicas] = make([]MetricObservation, 0) - } - - } else if currentReplicas == *pytorchJob.Spec.ElasticPolicy.MinReplicas && currentReplicas < *pytorchJob.Spec.ElasticPolicy.MaxReplicas { - newReplicas := *pytorchJob.Spec.PyTorchReplicaSpecs[training.PyTorchReplicaTypeWorker].Replicas + 1 - *pytorchJob.Spec.PyTorchReplicaSpecs[training.PyTorchReplicaTypeWorker].Replicas = newReplicas - if err := ts.Client.Update(ctx, pytorchJob); err != nil { - log.Info("fail to update pytorch job") - } - - jobStatus.ElasticStatus[training.PyTorchReplicaTypeWorker].CurrentReplicas = newReplicas - jobStatus.ElasticStatus[training.PyTorchReplicaTypeWorker].LastReplicas = currentReplicas - jobStatus.ElasticStatus[training.PyTorchReplicaTypeWorker].Message = "The pytorch job continues to be scaled" - now := metav1.Now() - jobStatus.ElasticStatus[training.PyTorchReplicaTypeWorker].LastUpdateTime = &now - jobStatus.ElasticStatus[training.PyTorchReplicaTypeWorker].Continue = true - - if _, ok := ts.metrics[jobName][newReplicas]; !ok { - ts.metrics[jobName][newReplicas] = make([]MetricObservation, 0) - } - - } else if currentReplicas == *pytorchJob.Spec.ElasticPolicy.MaxReplicas { - jobStatus.ElasticStatus[training.PyTorchReplicaTypeWorker].Message = "The pytorch job has reached the MaxReplicas" - jobStatus.ElasticStatus[training.PyTorchReplicaTypeWorker].Continue = false - ts.metrics[jobName][currentReplicas] = make([]MetricObservation, 0) - } - } - - // No need to update the job status if the status hasn't changed since last time. - if !reflect.DeepEqual(*oldStatus, jobStatus) { - if err = ts.UpdateJobStatusInApiServer(pytorchJob, jobStatus); err != nil { - if errors.IsConflict(err) { - // retry later when update operation violates with etcd concurrency control. - logger.Info("fail to update pytorch job status") - return - } - } - } - - return -} - -func (ts *TorchElasticController) GetPodsForJob(job *training.PyTorchJob) ([]*v1.Pod, error) { - selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{ - MatchLabels: ts.GenLabels(job.Name), - }) - // List all pods to include those that don't match the selector anymore - // but have a ControllerRef pointing to this controller. - podList := &v1.PodList{} - err = ts.Client.List(context.Background(), podList, client.MatchingLabelsSelector{Selector: selector}) - if err != nil { - return nil, err - } - return commonutil.ToPodPointerList(podList.Items), nil -} - -func (ts *TorchElasticController) GenLabels(jobName string) map[string]string { - - labelGroupName := apiv1.GroupNameLabel - labelJobName := apiv1.JobNameLabel - groupName := ts.GetGroupNameLabelValue() - return map[string]string{ - labelGroupName: groupName, - labelJobName: strings.Replace(jobName, "/", "-", -1), - } -} - -func (ts *TorchElasticController) GetGroupNameLabelValue() string { - return training.SchemeGroupVersion.Group -} - -func (ts *TorchElasticController) ControllerName() string { - return controllerName -} - -func makeElasticJobName(name, namespace string) string { - return name + "-" + namespace -} - -// UpdateJobStatusInApiServer updates the job status in API server -func (ts *TorchElasticController) UpdateJobStatusInApiServer(job interface{}, jobStatus *controllerv1.JobStatus) error { - torchElasticJob, ok := job.(*training.PyTorchJob) - if !ok { - return fmt.Errorf("%+v is not a type of PytorchJob", torchElasticJob) - } - var jobCpy *training.PyTorchJob - // Job status passed in differs with status in job, update in basis of the passed in one. - jobCpy = torchElasticJob.DeepCopy() - jobCpy.Status = *jobStatus.DeepCopy() - return ts.Status().Update(context.Background(), jobCpy) -} - -// initializeReplicaStatuses initializes the ElasticStatuses for replica. -func initializeElasticStatuses(jobStatus *apiv1.JobStatus, rtype apiv1.ReplicaType) { - if jobStatus.ElasticStatus == nil { - jobStatus.ElasticStatus = make(map[apiv1.ReplicaType]*apiv1.ElasticScalingStatus) - } - - jobStatus.ElasticStatus[rtype] = &apiv1.ElasticScalingStatus{} -} diff --git a/controllers/torchelastic/job_elastic_controller.go b/controllers/torchelastic/job_elastic_controller.go new file mode 100644 index 00000000..2a88db2b --- /dev/null +++ b/controllers/torchelastic/job_elastic_controller.go @@ -0,0 +1,37 @@ +/* +Copyright 2022 The Alibaba Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package torchelastic + +import ( + "k8s.io/apimachinery/pkg/runtime" + ctrl "sigs.k8s.io/controller-runtime" + logf "sigs.k8s.io/controller-runtime/pkg/log" +) + +// ElasticController implementations +type ElasticController interface { + SetupWithManager(mgr ctrl.Manager) error +} + +var _ ElasticController = &TorchElasticController{} + +type newJobElasticController func(mgr ctrl.Manager, period, count int) ElasticController + +var ( + log = logf.Log.WithName("job-elastic-controller") + jobElasticCtrlMap = make(map[runtime.Object]newJobElasticController) +) diff --git a/controllers/torchelastic/job/log_util.go b/controllers/torchelastic/log_util.go similarity index 99% rename from controllers/torchelastic/job/log_util.go rename to controllers/torchelastic/log_util.go index fa228382..37c86899 100644 --- a/controllers/torchelastic/job/log_util.go +++ b/controllers/torchelastic/log_util.go @@ -1,4 +1,4 @@ -package job +package torchelastic import ( "bufio" diff --git a/controllers/torchelastic/pod.go b/controllers/torchelastic/pod.go new file mode 100644 index 00000000..70d7141d --- /dev/null +++ b/controllers/torchelastic/pod.go @@ -0,0 +1,269 @@ +/* +Copyright 2022 The Alibaba Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package torchelastic + +import ( + "context" + trainingv1alpha1 "github.com/alibaba/kubedl/apis/training/v1alpha1" + "github.com/alibaba/kubedl/controllers/pytorch" + v1 "github.com/alibaba/kubedl/pkg/job_controller/api/v1" + "github.com/alibaba/kubedl/pkg/util/concurrent" + "github.com/alibaba/kubedl/pkg/util/k8sutil" + patchutil "github.com/alibaba/kubedl/pkg/util/patch" + kruisev1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + logger "github.com/sirupsen/logrus" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/utils/pointer" + "strconv" + "strings" +) + +func (ts *TorchElasticController) recreatePodContainers(job *trainingv1alpha1.PyTorchJob, pod *corev1.Pod, generation string) error { + crr := kruisev1alpha1.ContainerRecreateRequest{ + ObjectMeta: metav1.ObjectMeta{ + Name: pod.Name, + Namespace: pod.Namespace, + Labels: map[string]string{ + v1.LabelGeneration: generation, + }, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "v1", + Kind: "Pod", + Name: pod.Name, + UID: pod.UID, + Controller: pointer.BoolPtr(false), + BlockOwnerDeletion: pointer.BoolPtr(true), + }, + { + APIVersion: job.APIVersion, + Kind: job.Kind, + Name: job.Name, + UID: job.UID, + Controller: pointer.BoolPtr(false), + BlockOwnerDeletion: pointer.BoolPtr(true), + }, + }, + }, + Spec: kruisev1alpha1.ContainerRecreateRequestSpec{ + PodName: pod.Name, + Strategy: &kruisev1alpha1.ContainerRecreateRequestStrategy{OrderedRecreate: false}, + }, + } + + for ci := range pod.Spec.Containers { + container := &pod.Spec.Containers[ci] + crr.Spec.Containers = append(crr.Spec.Containers, kruisev1alpha1.ContainerRecreateRequestContainer{Name: container.Name}) + } + return ts.Client.Create(context.Background(), &crr) +} + +func (ts *TorchElasticController) restartStaleWorker(job *trainingv1alpha1.PyTorchJob, pod *corev1.Pod, worldSize, generation int64) (completed bool, err error) { + expectedWorldSize := strconv.FormatInt(worldSize, 10) + expectedGeneration := strconv.FormatInt(generation, 10) + podKey := pod.Namespace + "/" + pod.Name + + if job.Annotations[pytorch.AnnotationReadyToRestartWorker] == "true" && !k8sutil.IsPodActive(pod) { + err = ts.Client.Delete(context.Background(), pod) + return err == nil, err + } + + if pod.Labels[v1.LabelGeneration] == expectedGeneration { + return true, nil + } + + log.Info("refresh stale pod to latest generation", "pod", podKey, "generation", generation) + + completed, err = ts.restartWorkerInKruiseProtocol(job, pod, expectedWorldSize, expectedGeneration) + if !completed { + return false, err + } + + // Finally, incremental generation for current worker and mark refreshment done. + patch := patchutil.NewStrategicPatch() + patch.InsertLabel(v1.LabelGeneration, expectedGeneration) + err = ts.Client.Patch(context.Background(), pod, patch) + if err != nil { + return false, err + } + logger.Infof("succeed to refresh pod to generation: %v", generation) + return true, nil +} + +func (ts *TorchElasticController) restartWorkerInKruiseProtocol(job *trainingv1alpha1.PyTorchJob, pod *corev1.Pod, expectedWorldSize, expectedGeneration string) (completed bool, err error) { + podKey := pod.Namespace + "/" + pod.Name + crr := kruisev1alpha1.ContainerRecreateRequest{} + if curWorldSize, ok := pod.Annotations[pytorch.AnnotationWorldSize]; !ok || curWorldSize != expectedWorldSize { + log.Info("update latest world size of pytorch", + "key", podKey, "current world size", curWorldSize, "target world size", expectedWorldSize) + patch := patchutil.NewStrategicPatch() + patch.InsertAnnotation(pytorch.AnnotationWorldSize, expectedWorldSize) + if err = ts.Client.Patch(context.Background(), pod, patch); err != nil { + log.Error(err, "failed to refresh world-size of stale worker", "pod", podKey, "world size", expectedWorldSize) + return false, err + } + return false, nil + } + + if err = ts.Client.Get(context.Background(), types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace}, &crr); err != nil { + if errors.IsNotFound(err) { + logger.Info("Not found ContainerRecreateRequest") + return false, ts.recreatePodContainers(job, pod, expectedGeneration) + } + + log.Error(err, "failed to get latest container-recreate-request for stale worker", + "pod", podKey) + return false, err + } + // crr created in previous round, clean it. + if crr.Labels[v1.LabelGeneration] != expectedGeneration { + if err = ts.Client.Delete(context.Background(), &crr); err != nil { + return false, err + } + return false, ts.recreatePodContainers(job, pod, expectedGeneration) + } + + if crr.Status.Phase == kruisev1alpha1.ContainerRecreateRequestFailed { + logger.Infof("failed to restart containers of pod %s/%s, fallback to recreate pod", pod.Namespace, pod.Name) + err = ts.Client.Delete(context.Background(), pod) + return err == nil, err + } + + recreateDone := crr.Status.Phase == kruisev1alpha1.ContainerRecreateRequestCompleted || crr.Status.Phase == kruisev1alpha1.ContainerRecreateRequestSucceeded + if !recreateDone { + logger.Error("container recreate request has not completed yet", "pod", podKey) + return false, nil + } + + // Finalize container-recreate-request object once it completes, because elastic scaling is repeatable + // and 'crr' request will be re-initiated. + defer ts.Client.Delete(context.Background(), &crr) + + logger.Info("ContainerRecreateSucceed", "succeed to recreate containers in stale worker: %s", podKey) + return true, nil +} + +func FilterRunningPods(pods []*corev1.Pod) []*corev1.Pod { + var result []*corev1.Pod + for _, p := range pods { + if podRunning(p) { + result = append(result, p) + } else { + deletionTimeStamp := "N/A" + if p.DeletionTimestamp != nil { + deletionTimeStamp = p.DeletionTimestamp.String() + } + logger.Infof("Ignoring inactive pod %v/%v in state %v, deletion time %s", + p.Namespace, p.Name, p.Status.Phase, deletionTimeStamp) + } + } + return result +} + +func (ts *TorchElasticController) restartStalePytorchPods(pods []*corev1.Pod, pytorchJob *trainingv1alpha1.PyTorchJob) (completed bool) { + + runningPods := FilterRunningPods(pods) + _, stalePods := k8sutil.FilterStalePodsByReplicaType(runningPods, pytorchJob.Generation, strings.ToLower(string(v1.JobReplicaTypeAIMaster))) + staleWorkers := stalePods[strings.ToLower(string(trainingv1alpha1.PyTorchReplicaTypeWorker))] + totalReplicas := len(stalePods) + workerNums := len(staleWorkers) + logger.Infof("worker nums: %d", workerNums) + + if pytorchJob.Annotations[pytorch.AnnotationReadyToRestartWorker] == "false" { + log.Info("PytorchJob does not need to restart workers") + return false + } + + tickets := 100 // max semaphore tickets limited. + if len(staleWorkers) < 100 { + tickets = len(staleWorkers) + } + sema := concurrent.NewSemaphore(tickets) + for _, pod := range staleWorkers { + sema.Acquire() + + go func(worker *corev1.Pod) { + defer sema.Release() + if completed, err := ts.restartStaleWorker(pytorchJob, worker, int64(totalReplicas), pytorchJob.Generation); err != nil { + logger.Warnf("Restart worker %s failed becasue error %v", worker.Name, err) + } else if completed { + workerNums-- + } + }(pod) + } + // block until all semaphore is released. + sema.Wait() + if workerNums != 0 { + log.Info("refresh stale workers has not completed yet", "key", pytorchJob.Namespace+"/"+pytorchJob.Name) + return false + } + + if len(stalePods) == 0 || workerNums == 0 { + log.Info("all pods are in latest generation, mark ready-to-start-worker as false") + patch := patchutil.NewMergePatch() + patch.InsertAnnotation(pytorch.AnnotationReadyToRestartWorker, "false") + + if err := ts.Client.Patch(context.Background(), pytorchJob, patch); err != nil { + logger.Infof("fail to patch pytorchJob: %v", err) + return false + } + logger.Infof("pytorch job %s/%s elastic scaling successfully finished, total replicas: %v", pytorchJob.Namespace, pytorchJob.Name, totalReplicas) + completed = true + } + + return completed + +} + +func (ts *TorchElasticController) waitForAllPodsRunning(pytorchJob *trainingv1alpha1.PyTorchJob) (hasPendingPod, hasFailedPod bool) { + pods, err := ts.GetPodsForJob(pytorchJob) + if err != nil { + logger.Warnf("Get Pods For Job error %v", err) + } + + // Wait for all pods running with timeout seconds. + waitErr := wait.PollImmediate(interval, podReadyTimeout, func() (bool, error) { + for _, pod := range pods { + if isRunning := podRunning(pod); !isRunning { + return false, nil + } + } + return true, nil + }) + + if waitErr != nil { + logger.Info("pods did not reach the running state") + } + + for _, pod := range pods { + if pod.Status.Phase == corev1.PodPending { + hasPendingPod = true + break + } + } + for _, pod := range pods { + if pod.Status.Phase == corev1.PodFailed { + hasFailedPod = true + break + } + } + return +} diff --git a/controllers/torchelastic/torchelastic_controller.go b/controllers/torchelastic/torchelastic_controller.go new file mode 100644 index 00000000..d1c52070 --- /dev/null +++ b/controllers/torchelastic/torchelastic_controller.go @@ -0,0 +1,216 @@ +/* +Copyright 2022 The Alibaba Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package torchelastic + +import ( + "context" + training "github.com/alibaba/kubedl/apis/training/v1alpha1" + apiv1 "github.com/alibaba/kubedl/pkg/job_controller/api/v1" + "github.com/alibaba/kubedl/pkg/util/concurrent" + logger "github.com/sirupsen/logrus" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/source" + "strings" + "sync" + "time" +) + +const ( + controllerName = "TorchElasticController" + interval = 5 * time.Second + podReadyTimeout = 1 * time.Minute +) + +type name string +type namespace string + +func init() { + jobElasticCtrlMap[&training.PyTorchJob{}] = NewTorchElasticController +} + +func NewTorchElasticController(mgr ctrl.Manager, period, count int) ElasticController { + metrics := make(map[string]map[int32][]MetricObservation) + torchJobs := make(map[string]TorchElasticJob) + return &TorchElasticController{ + period: period, + metricCount: count, + client: kubernetes.NewForConfigOrDie(mgr.GetConfig()), + metrics: metrics, + torchElasticJobs: torchJobs, + Client: mgr.GetClient(), + } +} + +type TorchElasticController struct { + period int + metricCount int + client *kubernetes.Clientset + client.Client + locker sync.Mutex + // metrics stores observations collected from running pods + metrics map[string]map[int32][]MetricObservation + // torchElasticJobs stores torch-elastic jobs infos. + torchElasticJobs map[string]TorchElasticJob +} + +// TorchElasticJob represents one elastic job. +type TorchElasticJob struct { + Name string + Namespace string + ctx context.Context + cancelFunc context.CancelFunc +} + +// MetricObservation represents one metric set collected from training pods. +type MetricObservation struct { + Epoch int32 `json:"epoch,omitempty"` + Batch int32 `json:"batch,omitempty"` + Accuracy float64 `json:"accuracy,omitempty"` + Latency float64 `json:"latency,omitempty"` +} + +func (ts *TorchElasticController) Reconcile(_ context.Context, req ctrl.Request) (ctrl.Result, error) { + pytorchJob := training.PyTorchJob{} + err := ts.Client.Get(context.Background(), types.NamespacedName{ + Namespace: req.Namespace, + Name: req.Name, + }, &pytorchJob) + + if err != nil { + if errors.IsNotFound(err) { + log.Info("try to fetch pytorch job but it has been deleted.", "key", req.String()) + return ctrl.Result{}, nil + } + return ctrl.Result{}, err + } + return ctrl.Result{}, nil +} + +func (ts *TorchElasticController) SetupWithManager(mgr ctrl.Manager) error { + c, err := controller.New(controllerName, mgr, controller.Options{Reconciler: ts}) + if err != nil { + return err + } + // Watch events with pod events-handler. + if err = c.Watch(&source.Kind{Type: &training.PyTorchJob{}}, &handler.EnqueueRequestForObject{}, predicate.Funcs{ + CreateFunc: onOwnerCreateFunc(ts), + DeleteFunc: onOwnerDeleteFunc(ts), + }); err != nil { + return err + } + + ctx := context.Background() + go wait.UntilWithContext(ctx, ts.startElasticForAllJobs, time.Duration(ts.period)*(time.Second)) + log.Info("Start Elastic Scaling Controller Loop") + + ctx.Done() + log.Info("Shutting down Elastic Scaling Controller Loop") + return nil +} + +func onOwnerCreateFunc(ts *TorchElasticController) func(e event.CreateEvent) bool { + return func(e event.CreateEvent) bool { + pytorchJob, ok := e.Object.(*training.PyTorchJob) + if !ok { + return true + } + if !pytorchJob.Spec.EnableElastic && pytorchJob.Spec.ElasticPolicy == nil { + return true + } + ctx, cancel := context.WithCancel(context.Background()) + ctx = context.WithValue(ctx, name("job"), pytorchJob.Name) + ctx = context.WithValue(ctx, namespace("namespace"), pytorchJob.Namespace) + logger.Info("Create torch elastic job: ", pytorchJob.Name, " in namespace: ", pytorchJob.Namespace) + ts.torchElasticJobs[makeElasticJobName(pytorchJob.Name, pytorchJob.Namespace)] = TorchElasticJob{ + Name: pytorchJob.Name, + Namespace: pytorchJob.Namespace, + ctx: ctx, + cancelFunc: cancel, + } + return true + } +} + +func onOwnerDeleteFunc(ts *TorchElasticController) func(e event.DeleteEvent) bool { + return func(e event.DeleteEvent) bool { + pytorchJob, ok := e.Object.(*training.PyTorchJob) + if !ok { + return true + } + if !pytorchJob.Spec.EnableElastic && pytorchJob.Spec.ElasticPolicy == nil { + return true + } + + logger.Infof("Deleting elastic scaling for pytorch job %s from namespace %s", pytorchJob.Name, pytorchJob.Namespace) + // Delete job infos saved in Torch Elastic controller. + if _, ok := ts.torchElasticJobs[makeElasticJobName(pytorchJob.Name, pytorchJob.Namespace)]; ok { + cancel := ts.torchElasticJobs[makeElasticJobName(pytorchJob.Name, pytorchJob.Namespace)].cancelFunc + defer cancel() + delete(ts.torchElasticJobs, makeElasticJobName(pytorchJob.Name, pytorchJob.Namespace)) + delete(ts.metrics, makeElasticJobName(pytorchJob.Name, pytorchJob.Namespace)) + } + + return true + } +} + +// Start elastic scaling loop for all torch elastic jobs. +func (ts *TorchElasticController) startElasticForAllJobs(ctx context.Context) { + tickets := 100 // max semaphore tickets limited. + if len(ts.torchElasticJobs) < 100 { + tickets = len(ts.torchElasticJobs) + } + sema := concurrent.NewSemaphore(tickets) + for _, torchJob := range ts.torchElasticJobs { + sema.Acquire() + + go func(job TorchElasticJob) { + defer sema.Release() + //Start elastic scaling for each torch elastic job. + ts.start(job.ctx, job.cancelFunc, job.Name, job.Namespace) + }(torchJob) + } + // block until all semaphore is released. + sema.Wait() +} + +func (ts *TorchElasticController) GenLabels(jobName string) map[string]string { + labelGroupName := apiv1.GroupNameLabel + labelJobName := apiv1.JobNameLabel + groupName := ts.GetGroupNameLabelValue() + return map[string]string{ + labelGroupName: groupName, + labelJobName: strings.Replace(jobName, "/", "-", -1), + } +} + +func (ts *TorchElasticController) GetGroupNameLabelValue() string { + return training.SchemeGroupVersion.Group +} + +func (ts *TorchElasticController) ControllerName() string { + return controllerName +} diff --git a/pkg/job_controller/api/v1/types.go b/pkg/job_controller/api/v1/types.go index f33a40d6..cf920dc1 100644 --- a/pkg/job_controller/api/v1/types.go +++ b/pkg/job_controller/api/v1/types.go @@ -62,16 +62,20 @@ type JobStatus struct { type ReplicaType string // ElasticScalingStatus represents the current elastic scaling status of the training job. +// +k8s:deepcopy-gen=true type ElasticScalingStatus struct { + // Type of elastic scaling condition. + ElasticCondition ElasticConditionType `json:"elasticCondition,omitempty"` + + // Continue represents whether the job needs to continue scaling. + Continue bool `json:"continue,omitempty"` + // The number of current scaling pod replicas. CurrentReplicas int32 `json:"currentReplicas,omitempty"` // The number of last scaling pod replicas. LastReplicas int32 `json:"lastReplicas,omitempty"` - // Continue represents whether the job needs to continue scaling. - Continue bool `json:"continue,omitempty"` - // The time this elastic scaling loop was started. LastUpdateTime *metav1.Time `json:"startTime,omitempty"` @@ -153,6 +157,23 @@ type JobCondition struct { LastTransitionTime metav1.Time `json:"lastTransitionTime,omitempty"` } +// ElasticConditionType defines all kinds of elastic scaling conditions. +type ElasticConditionType string + +const ( + ElasticJobPending ElasticConditionType = "HasPendingPod" + // ElasticStart means the elastic scaling has been started. + ElasticStart ElasticConditionType = "Start" + // ElasticStop means the elastic scaling has been stopped. + ElasticStop ElasticConditionType = "Stop" + // ElasticContinue means the elastic scaling continues. + ElasticContinue ElasticConditionType = "Continue" + // ElasticMaxMetric means the training metrics have reached the max. + ElasticMaxMetric ElasticConditionType = "ReachMaxMetric" + // ElasticMaxReplica means the replicas have reached the maxReplicas. + ElasticMaxReplica ElasticConditionType = "ReachMaxReplicas" +) + // JobConditionType defines all kinds of types of JobStatus. type JobConditionType string