diff --git a/apis/training/v1alpha1/pytorchjob_defaults.go b/apis/training/v1alpha1/pytorchjob_defaults.go index 652badb4..405c89a8 100644 --- a/apis/training/v1alpha1/pytorchjob_defaults.go +++ b/apis/training/v1alpha1/pytorchjob_defaults.go @@ -125,6 +125,9 @@ func SetDefaults_PyTorchJob(job *PyTorchJob) { // Set default replicas and restart policy. if rType == PyTorchReplicaTypeWorker { setDefaults_PyTorchJobWorkerReplicas(spec) + if job.Spec.EnableElastic && job.Spec.ElasticPolicy != nil { + setDefaults_PyTorchJobPort(&spec.Template.Spec) + } } if rType == PyTorchReplicaTypeMaster { setDefaults_PyTorchJobMasterReplicas(spec) diff --git a/apis/training/v1alpha1/pytorchjob_types.go b/apis/training/v1alpha1/pytorchjob_types.go index 4b3f2787..d6d064d7 100644 --- a/apis/training/v1alpha1/pytorchjob_types.go +++ b/apis/training/v1alpha1/pytorchjob_types.go @@ -46,6 +46,29 @@ type PyTorchJobSpec struct { // CacheBackend is used to configure the cache engine for job // +optional CacheBackend *cachev1alpha1.CacheBackendSpec `json:"cacheBackend"` + + // EnableElastic decides whether torch elastic is enabled for job. + // +optional + EnableElastic bool `json:"enableElastic"` + + // ElasticPolicy is used to configure the torch elastic-based elastic scaling support for distributed training job. + // +optional + ElasticPolicy *ElasticPolicy `json:"elasticPolicy,omitempty"` +} + +type ElasticPolicy struct { + // minReplicas is the lower limit for the number of replicas to which the training job + // can scale down. It defaults to null. + MinReplicas *int32 `json:"minReplicas,omitempty"` + + // upper limit for the number of pods that can be set by the autoscaler; cannot be smaller than MinReplicas, defaults to null. + MaxReplicas *int32 `json:"maxReplicas,omitempty"` + + RDZVBackend string `json:"rdzvBackend,omitempty"` + RdzvEndpoint string `json:"rdzvEndpoint"` + + // Number of workers per node; supported values: [auto, cpu, gpu, int]. + NProcPerNode *int32 `json:"nProcPerNode,omitempty"` } // PyTorchJobStatus defines the observed state of PyTorchJob diff --git a/apis/training/v1alpha1/zz_generated.deepcopy.go b/apis/training/v1alpha1/zz_generated.deepcopy.go index 5785441b..5b2cb55e 100644 --- a/apis/training/v1alpha1/zz_generated.deepcopy.go +++ b/apis/training/v1alpha1/zz_generated.deepcopy.go @@ -118,6 +118,36 @@ func (in *ElasticDLJobSpec) DeepCopy() *ElasticDLJobSpec { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ElasticPolicy) DeepCopyInto(out *ElasticPolicy) { + *out = *in + if in.MinReplicas != nil { + in, out := &in.MinReplicas, &out.MinReplicas + *out = new(int32) + **out = **in + } + if in.MaxReplicas != nil { + in, out := &in.MaxReplicas, &out.MaxReplicas + *out = new(int32) + **out = **in + } + if in.NProcPerNode != nil { + in, out := &in.NProcPerNode, &out.NProcPerNode + *out = new(int32) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ElasticPolicy. +func (in *ElasticPolicy) DeepCopy() *ElasticPolicy { + if in == nil { + return nil + } + out := new(ElasticPolicy) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *LegacyV1Alpha1) DeepCopyInto(out *LegacyV1Alpha1) { *out = *in @@ -558,6 +588,11 @@ func (in *PyTorchJobSpec) DeepCopyInto(out *PyTorchJobSpec) { *out = new(cachev1alpha1.CacheBackendSpec) (*in).DeepCopyInto(*out) } + if in.ElasticPolicy != nil { + in, out := &in.ElasticPolicy, &out.ElasticPolicy + *out = new(ElasticPolicy) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PyTorchJobSpec. diff --git a/config/crd/bases/training.kubedl.io_elasticdljobs.yaml b/config/crd/bases/training.kubedl.io_elasticdljobs.yaml index effd16fb..e10314e2 100644 --- a/config/crd/bases/training.kubedl.io_elasticdljobs.yaml +++ b/config/crd/bases/training.kubedl.io_elasticdljobs.yaml @@ -3111,6 +3111,24 @@ spec: - type type: object type: array + elasticScaling: + additionalProperties: + properties: + continue: + type: boolean + currentReplicas: + format: int32 + type: integer + lastReplicas: + format: int32 + type: integer + message: + type: string + startTime: + format: date-time + type: string + type: object + type: object lastReconcileTime: format: date-time type: string diff --git a/config/crd/bases/training.kubedl.io_marsjobs.yaml b/config/crd/bases/training.kubedl.io_marsjobs.yaml index 73fbf03c..35b38e02 100644 --- a/config/crd/bases/training.kubedl.io_marsjobs.yaml +++ b/config/crd/bases/training.kubedl.io_marsjobs.yaml @@ -3133,6 +3133,24 @@ spec: - type type: object type: array + elasticScaling: + additionalProperties: + properties: + continue: + type: boolean + currentReplicas: + format: int32 + type: integer + lastReplicas: + format: int32 + type: integer + message: + type: string + startTime: + format: date-time + type: string + type: object + type: object lastReconcileTime: format: date-time type: string diff --git a/config/crd/bases/training.kubedl.io_mpijobs.yaml b/config/crd/bases/training.kubedl.io_mpijobs.yaml index 878a6b6c..c49737f6 100644 --- a/config/crd/bases/training.kubedl.io_mpijobs.yaml +++ b/config/crd/bases/training.kubedl.io_mpijobs.yaml @@ -6156,6 +6156,24 @@ spec: - type type: object type: array + elasticScaling: + additionalProperties: + properties: + continue: + type: boolean + currentReplicas: + format: int32 + type: integer + lastReplicas: + format: int32 + type: integer + message: + type: string + startTime: + format: date-time + type: string + type: object + type: object lastReconcileTime: format: date-time type: string diff --git a/config/crd/bases/training.kubedl.io_pytorchjobs.yaml b/config/crd/bases/training.kubedl.io_pytorchjobs.yaml index 45b1c8dc..98e582e5 100644 --- a/config/crd/bases/training.kubedl.io_pytorchjobs.yaml +++ b/config/crd/bases/training.kubedl.io_pytorchjobs.yaml @@ -112,6 +112,26 @@ spec: required: - schedule type: object + elasticPolicy: + properties: + maxReplicas: + format: int32 + type: integer + minReplicas: + format: int32 + type: integer + nProcPerNode: + format: int32 + type: integer + rdzvBackend: + type: string + rdzvEndpoint: + type: string + required: + - rdzvEndpoint + type: object + enableElastic: + type: boolean modelVersion: properties: createdBy: @@ -3198,6 +3218,24 @@ spec: - type type: object type: array + elasticScaling: + additionalProperties: + properties: + continue: + type: boolean + currentReplicas: + format: int32 + type: integer + lastReplicas: + format: int32 + type: integer + message: + type: string + startTime: + format: date-time + type: string + type: object + type: object lastReconcileTime: format: date-time type: string diff --git a/config/crd/bases/training.kubedl.io_tfjobs.yaml b/config/crd/bases/training.kubedl.io_tfjobs.yaml index fa9a5f1b..65272230 100644 --- a/config/crd/bases/training.kubedl.io_tfjobs.yaml +++ b/config/crd/bases/training.kubedl.io_tfjobs.yaml @@ -3200,6 +3200,24 @@ spec: - type type: object type: array + elasticScaling: + additionalProperties: + properties: + continue: + type: boolean + currentReplicas: + format: int32 + type: integer + lastReplicas: + format: int32 + type: integer + message: + type: string + startTime: + format: date-time + type: string + type: object + type: object lastReconcileTime: format: date-time type: string diff --git a/config/crd/bases/training.kubedl.io_xdljobs.yaml b/config/crd/bases/training.kubedl.io_xdljobs.yaml index ff3a405b..d4edc6ca 100644 --- a/config/crd/bases/training.kubedl.io_xdljobs.yaml +++ b/config/crd/bases/training.kubedl.io_xdljobs.yaml @@ -3117,6 +3117,24 @@ spec: - type type: object type: array + elasticScaling: + additionalProperties: + properties: + continue: + type: boolean + currentReplicas: + format: int32 + type: integer + lastReplicas: + format: int32 + type: integer + message: + type: string + startTime: + format: date-time + type: string + type: object + type: object lastReconcileTime: format: date-time type: string diff --git a/config/crd/bases/training.kubedl.io_xgboostjobs.yaml b/config/crd/bases/training.kubedl.io_xgboostjobs.yaml index b248367b..357757d7 100644 --- a/config/crd/bases/training.kubedl.io_xgboostjobs.yaml +++ b/config/crd/bases/training.kubedl.io_xgboostjobs.yaml @@ -3111,6 +3111,24 @@ spec: - type type: object type: array + elasticScaling: + additionalProperties: + properties: + continue: + type: boolean + currentReplicas: + format: int32 + type: integer + lastReplicas: + format: int32 + type: integer + message: + type: string + startTime: + format: date-time + type: string + type: object + type: object lastReconcileTime: format: date-time type: string diff --git a/controllers/pytorch/pytorchjob_controller.go b/controllers/pytorch/pytorchjob_controller.go index f11bb9ce..50d6908c 100644 --- a/controllers/pytorch/pytorchjob_controller.go +++ b/controllers/pytorch/pytorchjob_controller.go @@ -265,6 +265,40 @@ func (r *PytorchJobReconciler) SetClusterSpec(ctx context.Context, job interface } } + desiredReplicas, err := computeDesiredReplicas(pytorchJob) + if err != nil { + return err + } + + // Set default value if minReplicas and maxReplicas are not set + var minReplicas, maxReplicas int32 + if pytorchJob.Spec.ElasticPolicy.MinReplicas != nil { + minReplicas = *pytorchJob.Spec.ElasticPolicy.MinReplicas + } else { + minReplicas = desiredReplicas + } + + if pytorchJob.Spec.ElasticPolicy.MaxReplicas != nil { + maxReplicas = *pytorchJob.Spec.ElasticPolicy.MaxReplicas + } else { + maxReplicas = desiredReplicas + } + + var procPerNode int32 + if pytorchJob.Spec.ElasticPolicy.NProcPerNode != nil { + procPerNode = *pytorchJob.Spec.ElasticPolicy.NProcPerNode + } else { + procPerNode = int32(1) + } + + //Generate torch elastic env args. + launchElasticArgs := []string{ + "--rdzv_backend=" + pytorchJob.Spec.ElasticPolicy.RDZVBackend, + "--rdzv_endpoint=" + pytorchJob.Spec.ElasticPolicy.RdzvEndpoint, + "--rdzv_id=" + pytorchJob.Name, + "--nproc_per_node=" + strconv.Itoa(int(procPerNode)), + "--nnodes=" + strconv.Itoa(int(minReplicas)) + ":" + strconv.Itoa(int(maxReplicas))} + for i := range podTemplate.Spec.Containers { if len(podTemplate.Spec.Containers[i].Env) == 0 { podTemplate.Spec.Containers[i].Env = make([]corev1.EnvVar, 0) @@ -285,6 +319,8 @@ func (r *PytorchJobReconciler) SetClusterSpec(ctx context.Context, job interface Name: "PYTHONUNBUFFERED", Value: "0", }) + podTemplate.Spec.Containers[i].Args = append(launchElasticArgs, podTemplate.Spec.Containers[i].Args...) + if enableElasticScaling && rtype != "aimaster" { // Job enables elastic scaling select value of AnnotationWorldSize as its // WORLD_SIZE env value via field-path, the annotated value will be mutated diff --git a/controllers/pytorch/util.go b/controllers/pytorch/util.go index 6880fb19..12de2221 100644 --- a/controllers/pytorch/util.go +++ b/controllers/pytorch/util.go @@ -16,9 +16,23 @@ limitations under the License. package pytorch -import training "github.com/alibaba/kubedl/apis/training/v1alpha1" +import ( + "fmt" + training "github.com/alibaba/kubedl/apis/training/v1alpha1" + v1 "github.com/alibaba/kubedl/pkg/job_controller/api/v1" +) func ContainMasterSpec(job *training.PyTorchJob) bool { _, ok := job.Spec.PyTorchReplicaSpecs[training.PyTorchReplicaTypeMaster] return ok } + +// computeDesiredReplicas retrieve user's replica setting in specs +func computeDesiredReplicas(elasticJob *training.PyTorchJob) (int32, error) { + workerSpecs, exist := elasticJob.Spec.PyTorchReplicaSpecs[v1.ReplicaType(training.PyTorchReplicaTypeMaster)] + if !exist { + return 0, fmt.Errorf("elasticJob %v doesn't have %s", elasticJob, training.PyTorchReplicaTypeMaster) + } + + return *workerSpecs.Replicas, nil +} diff --git a/controllers/torchelastic/elastic_controller.go b/controllers/torchelastic/elastic_controller.go new file mode 100644 index 00000000..b71cbca6 --- /dev/null +++ b/controllers/torchelastic/elastic_controller.go @@ -0,0 +1,23 @@ +package torchelastic + +import ( + "github.com/alibaba/kubedl/controllers/torchelastic/job" + ctrl "sigs.k8s.io/controller-runtime" +) + +const ( + controllerName = "ElasticScalingController" +) + +func SetupWithManager(mgr ctrl.Manager) error { + // New torch elastic controller. + // period represents the time elastic scaling loop repeats. + // count represents the length of training metrics collection for each scale replicas. + torchElasticController := job.NewTorchElasticController(mgr, 30, 5) + + if err := torchElasticController.SetupWithManager(mgr); err != nil { + return err + } + return nil + +} diff --git a/controllers/torchelastic/job/job_elastic_controller.go b/controllers/torchelastic/job/job_elastic_controller.go new file mode 100644 index 00000000..b25b94e8 --- /dev/null +++ b/controllers/torchelastic/job/job_elastic_controller.go @@ -0,0 +1,21 @@ +package job + +import ( + "k8s.io/apimachinery/pkg/runtime" + ctrl "sigs.k8s.io/controller-runtime" + logf "sigs.k8s.io/controller-runtime/pkg/log" +) + +// ElasticController implementations +type ElasticController interface { + SetupWithManager(mgr ctrl.Manager) error +} + +var _ ElasticController = &TorchElasticController{} + +type newJobElasticController func(mgr ctrl.Manager, period, count int) ElasticController + +var ( + log = logf.Log.WithName("job-elastic-controller") + jobElasticCtrlMap = make(map[runtime.Object]newJobElasticController) +) diff --git a/controllers/torchelastic/job/log_util.go b/controllers/torchelastic/job/log_util.go new file mode 100644 index 00000000..fa228382 --- /dev/null +++ b/controllers/torchelastic/job/log_util.go @@ -0,0 +1,105 @@ +package job + +import ( + "bufio" + "context" + "fmt" + logger "github.com/sirupsen/logrus" + "io" + v1 "k8s.io/api/core/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + + "regexp" + "strconv" + "strings" +) + +// Construct a request for getting the logs for a pod and retrieves the logs. +func readRawLogs(client kubernetes.Interface, namespace, podID string, logOptions *v1.PodLogOptions) (string, error) { + readCloser, err := openStream(client, namespace, podID, logOptions) + if err != nil { + return err.Error(), nil + } + + defer func(readCloser io.ReadCloser) { + err := readCloser.Close() + if err != nil { + return + } + }(readCloser) + + reader := bufio.NewReader(readCloser) + line, err := reader.ReadString('\n') + if err != nil { + + return err.Error(), nil + } + + return line, nil +} + +func openStream(client kubernetes.Interface, namespace, podID string, logOptions *v1.PodLogOptions) (io.ReadCloser, error) { + return client.CoreV1().RESTClient().Get(). + Namespace(namespace). + Name(podID). + Resource("pods"). + SubResource("log"). + VersionedParams(logOptions, scheme.ParameterCodec).Stream(context.TODO()) +} + +func podRunning(pod *v1.Pod) bool { + return pod.Status.Phase == v1.PodRunning +} + +func GetDefaultWorkerName(pytorchJobName string) string { + return pytorchJobName + "-" + "worker" + "-" + "0" +} + +func read(client *kubernetes.Clientset, namespace, name string) (MetricObservation, error) { + lines := int64(1) + opts := &v1.PodLogOptions{ + TailLines: &lines, + Follow: true, + } + + //Read raw pod log. + detail, err := readRawLogs(client, namespace, name, opts) + if err != nil { + return MetricObservation{}, err + } + //Extract training metrics from raw log. + rawLog := strings.Split(detail, "\t") + epochRule := regexp.MustCompile(`[0-9]{1,2}`) + batchRule := regexp.MustCompile(`[0-9]{2,4}`) + trainRule := regexp.MustCompile(`[0-9]{1,2}.[0-9]{3}`) + accRule := regexp.MustCompile(`[0-9]{1,2}.[0-9]{1,2}`) + matchTrain, err := regexp.MatchString(`Epoch`, rawLog[0]) + + if err != nil { + return MetricObservation{}, err + } + // If current log is a training log. + if matchTrain { + epochNum, _ := strconv.Atoi(epochRule.FindStringSubmatch(rawLog[0])[0]) + batchNum, _ := strconv.Atoi(batchRule.FindStringSubmatch(rawLog[0])[0]) + trainTime, _ := strconv.ParseFloat(trainRule.FindStringSubmatch(rawLog[1])[0], 64) + accuracy, _ := strconv.ParseFloat(accRule.FindStringSubmatch(rawLog[5])[0], 64) + + observation := MetricObservation{ + Accuracy: accuracy, + Epoch: int32(epochNum), + Latency: trainTime, + Batch: int32(batchNum), + } + // drop the inaccurate train data + if trainTime > 1 { + return MetricObservation{}, fmt.Errorf("drop the inaccurate train data") + } + + logger.Infof("epoch: %d batch: %d train_time: %f accuracy: %f", epochNum, batchNum, trainTime, accuracy) + return observation, nil + } + + return MetricObservation{}, fmt.Errorf("current log is not a training log") +} diff --git a/controllers/torchelastic/job/torchelastic_controller.go b/controllers/torchelastic/job/torchelastic_controller.go new file mode 100644 index 00000000..f00e99ac --- /dev/null +++ b/controllers/torchelastic/job/torchelastic_controller.go @@ -0,0 +1,469 @@ +package job + +import ( + "context" + "fmt" + training "github.com/alibaba/kubedl/apis/training/v1alpha1" + apiv1 "github.com/alibaba/kubedl/pkg/job_controller/api/v1" + controllerv1 "github.com/alibaba/kubedl/pkg/job_controller/api/v1" + commonutil "github.com/alibaba/kubedl/pkg/util" + "github.com/alibaba/kubedl/pkg/util/concurrent" + logger "github.com/sirupsen/logrus" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + "reflect" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/source" + "strings" + "sync" + "time" +) + +const ( + controllerName = "TorchElasticController" + interval = 5 * time.Second + podReadyTimeout = 1 * time.Minute + logTimeout = 1 * time.Minute +) + +type name string +type namespace string + +func init() { + jobElasticCtrlMap[&training.PyTorchJob{}] = NewTorchElasticController +} + +func NewTorchElasticController(mgr ctrl.Manager, period, count int) ElasticController { + metrics := make(map[string]map[int32][]MetricObservation) + torchJobs := make(map[string]TorchElasticJob) + return &TorchElasticController{ + period: period, + metricCount: count, + client: kubernetes.NewForConfigOrDie(mgr.GetConfig()), + metrics: metrics, + torchElasticJobs: torchJobs, + Client: mgr.GetClient(), + } +} + +type TorchElasticController struct { + period int + metricCount int + client *kubernetes.Clientset + client.Client + metrics map[string]map[int32][]MetricObservation + locker sync.Mutex + torchElasticJobs map[string]TorchElasticJob +} + +type TorchElasticJob struct { + Name string + Namespace string + ctx context.Context + cancelFunc context.CancelFunc +} + +type MetricObservation struct { + Epoch int32 `json:"epoch,omitempty"` + Batch int32 `json:"batch,omitempty"` + Accuracy float64 `json:"accuracy,omitempty"` + Latency float64 `json:"latency,omitempty"` +} + +func (ts *TorchElasticController) Reconcile(_ context.Context, req ctrl.Request) (ctrl.Result, error) { + pytorchJob := training.PyTorchJob{} + err := ts.Client.Get(context.Background(), types.NamespacedName{ + Namespace: req.Namespace, + Name: req.Name, + }, &pytorchJob) + + if err != nil { + if errors.IsNotFound(err) { + log.Info("try to fetch pytorch job but it has been deleted.", "key", req.String()) + return ctrl.Result{}, nil + } + return ctrl.Result{}, err + } + return ctrl.Result{}, nil +} + +func (ts *TorchElasticController) SetupWithManager(mgr ctrl.Manager) error { + c, err := controller.New(controllerName, mgr, controller.Options{Reconciler: ts}) + if err != nil { + return err + } + // Watch events with pod events-handler. + if err = c.Watch(&source.Kind{Type: &training.PyTorchJob{}}, &handler.EnqueueRequestForObject{}, predicate.Funcs{ + CreateFunc: onOwnerCreateFunc(ts), + DeleteFunc: onOwnerDeleteFunc(ts), + }); err != nil { + return err + } + + ctx := context.Background() + go wait.UntilWithContext(ctx, ts.startElasticForAllJobs, time.Duration(ts.period)*(time.Second)) + log.Info("Start Elastic Scaling Controller Loop") + + ctx.Done() + log.Info("Shutting down Elastic Scaling Controller Loop") + return nil +} + +func onOwnerCreateFunc(ts *TorchElasticController) func(e event.CreateEvent) bool { + return func(e event.CreateEvent) bool { + pytorchJob, ok := e.Object.(*training.PyTorchJob) + if !ok { + return true + } + if !pytorchJob.Spec.EnableElastic && pytorchJob.Spec.ElasticPolicy == nil { + return true + } + ctx, cancel := context.WithCancel(context.Background()) + ctx = context.WithValue(ctx, name("job"), pytorchJob.Name) + ctx = context.WithValue(ctx, namespace("namespace"), pytorchJob.Namespace) + logger.Info("Create torch elastic job: ", pytorchJob.Name, " in namespace: ", pytorchJob.Namespace) + ts.torchElasticJobs[makeElasticJobName(pytorchJob.Name, pytorchJob.Namespace)] = TorchElasticJob{ + Name: pytorchJob.Name, + Namespace: pytorchJob.Namespace, + ctx: ctx, + cancelFunc: cancel, + } + return true + } +} + +func onOwnerDeleteFunc(ts *TorchElasticController) func(e event.DeleteEvent) bool { + return func(e event.DeleteEvent) bool { + pytorchJob, ok := e.Object.(*training.PyTorchJob) + if !ok { + return true + } + if !pytorchJob.Spec.EnableElastic && pytorchJob.Spec.ElasticPolicy == nil { + return true + } + + logger.Infof("Deleting elastic scaling for pytorch job %s from namespace %s", pytorchJob.Name, pytorchJob.Namespace) + // Delete job infos saved in Torch Elastic controller. + if _, ok := ts.torchElasticJobs[makeElasticJobName(pytorchJob.Name, pytorchJob.Namespace)]; ok { + cancel := ts.torchElasticJobs[makeElasticJobName(pytorchJob.Name, pytorchJob.Namespace)].cancelFunc + defer cancel() + delete(ts.torchElasticJobs, makeElasticJobName(pytorchJob.Name, pytorchJob.Namespace)) + } + + return true + } +} + +// Start elastic scaling loop for all torch elastic jobs. +func (ts *TorchElasticController) startElasticForAllJobs(ctx context.Context) { + tickets := 100 // max semaphore tickets limited. + if len(ts.torchElasticJobs) < 100 { + tickets = len(ts.torchElasticJobs) + } + sema := concurrent.NewSemaphore(tickets) + for _, torchJob := range ts.torchElasticJobs { + sema.Acquire() + + go func(job TorchElasticJob) { + defer sema.Release() + //Start elastic scaling for each torch elastic job. + ts.start(job.ctx, job.cancelFunc, job.Name, job.Namespace) + }(torchJob) + } + // block until all semaphore is released. + sema.Wait() +} + +func (ts *TorchElasticController) start(ctx context.Context, cancel context.CancelFunc, name, namespace string) { + sharedPytorchJob := &training.PyTorchJob{} + jobName := name + jobNamespace := namespace + + // Create metrics for each torch elastic job. + ts.locker.Lock() + if _, ok := ts.metrics[jobName]; !ok { + ts.metrics[jobName] = make(map[int32][]MetricObservation) + } + ts.locker.Unlock() + + err := ts.Client.Get(ctx, types.NamespacedName{Namespace: jobNamespace, Name: jobName}, sharedPytorchJob) + if err != nil { + logger.Infof("try to get job %s from namespace %s but it has been deleted", jobName, jobNamespace) + // cancel the elastic scaling process context of the deleted job. + defer cancel() + return + } + + pytorchJob := sharedPytorchJob.DeepCopy() + if pytorchJob.Spec.ElasticPolicy.MaxReplicas == nil || pytorchJob.Spec.ElasticPolicy.MinReplicas == nil { + logger.Infof("pytorch job %s does not configure the max or min replicas", pytorchJob.Name) + defer cancel() + delete(ts.torchElasticJobs, makeElasticJobName(pytorchJob.Name, pytorchJob.Namespace)) + return + } + + if pytorchJob.Status.ElasticStatus == nil { + initializeElasticStatuses(&pytorchJob.Status, training.PyTorchReplicaTypeWorker) + pytorchJob.Status.ElasticStatus[training.PyTorchReplicaTypeWorker].CurrentReplicas = *pytorchJob.Spec.PyTorchReplicaSpecs[training.PyTorchReplicaTypeWorker].Replicas + pytorchJob.Status.ElasticStatus[training.PyTorchReplicaTypeWorker].Continue = true + now := metav1.Now() + pytorchJob.Status.ElasticStatus[training.PyTorchReplicaTypeWorker].LastUpdateTime = &now + if err := ts.UpdateJobStatusInApiServer(pytorchJob, &pytorchJob.Status); err != nil { + if errors.IsConflict(err) { + // retry later when update operation violates with etcd concurrency control. + log.Info("fail to update pytorch job") + } + } + return + } + + jobStatus := pytorchJob.Status.DeepCopy() + oldStatus := jobStatus.DeepCopy() + if pytorchJob.Status.CompletionTime != nil || pytorchJob.DeletionTimestamp != nil { + logger.Infof("job %s has been completed or deleted and does not need to do elastic scaling", pytorchJob.Name) + defer cancel() + delete(ts.torchElasticJobs, makeElasticJobName(pytorchJob.Name, pytorchJob.Namespace)) + return + } + + currentReplicas := *pytorchJob.Spec.PyTorchReplicaSpecs[training.PyTorchReplicaTypeWorker].Replicas + // Get all pods for the pytorch job. + pods, err := ts.GetPodsForJob(pytorchJob) + if err != nil { + logger.Warnf("Get Pods For Job error %v", err) + } + hasPendingPod := false + hasFailedPod := false + + // Wait for all pods running with timeout seconds. + waitErr := wait.PollImmediate(interval, podReadyTimeout, func() (bool, error) { + for _, pod := range pods { + if isRunning := podRunning(pod); !isRunning { + return false, nil + } + } + return true, nil + }) + if waitErr != nil { + logger.Info("pods did not reach the running state") + } + + for _, pod := range pods { + if pod.Status.Phase == v1.PodPending { + hasPendingPod = true + break + } + } + for _, pod := range pods { + if pod.Status.Phase == v1.PodFailed { + hasFailedPod = true + break + } + } + + // If job has pending pods and current replicas are more than min replicas, return to the last replicas. + if hasPendingPod && currentReplicas > *pytorchJob.Spec.ElasticPolicy.MinReplicas { + lastReplicas := jobStatus.ElasticStatus[training.PyTorchReplicaTypeWorker].LastReplicas + *pytorchJob.Spec.PyTorchReplicaSpecs[training.PyTorchReplicaTypeWorker].Replicas = lastReplicas + // Return to the last replicas. + if err := ts.Client.Update(ctx, pytorchJob); err != nil { + log.Info("fail to update replicas of pytorch job") + } + jobStatus.ElasticStatus[training.PyTorchReplicaTypeWorker].Continue = false + jobStatus.ElasticStatus[training.PyTorchReplicaTypeWorker].LastReplicas = jobStatus.ElasticStatus[training.PyTorchReplicaTypeWorker].CurrentReplicas + jobStatus.ElasticStatus[training.PyTorchReplicaTypeWorker].CurrentReplicas = lastReplicas + jobStatus.ElasticStatus[training.PyTorchReplicaTypeWorker].Message = "there exists pending pods, return to the last replicas" + now := metav1.Now() + jobStatus.ElasticStatus[training.PyTorchReplicaTypeWorker].LastUpdateTime = &now + if err := ts.UpdateJobStatusInApiServer(pytorchJob, jobStatus); err != nil { + if errors.IsConflict(err) { + // retry later when update operation violates with etcd concurrency control. + log.Info("fail to update pytorch job") + } + } + return + + // If job has pending pods and current replicas equals to the min replicas, cancel the elastic scaling process context. + } else if (hasPendingPod && currentReplicas == *pytorchJob.Spec.ElasticPolicy.MinReplicas) || hasFailedPod { + defer cancel() + logger.Info("pods did not reach the running state at min replicas or job is failed, so the elastic scaling controller shutdown") + delete(ts.torchElasticJobs, makeElasticJobName(pytorchJob.Name, pytorchJob.Namespace)) + return + } + + // If job does not need to be scaled, return directly. + if !hasPendingPod && jobStatus.ElasticStatus != nil && jobStatus.ElasticStatus[training.PyTorchReplicaTypeWorker].Continue == false { + log.Info("pytorch job does not need to be scaled") + return + } + + // Read training logs from pytorch pods and save the observation. + observation, err := read(ts.client, jobNamespace, GetDefaultWorkerName(jobName)) + if err != nil { + logger.Infof("fail to read training logs: %s", err) + return + } + + ts.locker.Lock() + defer ts.locker.Unlock() + + // Create metrics for current replicas. + if _, ok := ts.metrics[jobName][currentReplicas]; !ok { + ts.metrics[jobName][currentReplicas] = make([]MetricObservation, 0) + } + ts.metrics[jobName][currentReplicas] = append(ts.metrics[jobName][currentReplicas], observation) + currentLength := len(ts.metrics[jobName][currentReplicas]) + logger.Infof("Current metric length: %d", currentLength) + // If current metrics have reached the metric count, judge the next scaling replicas. + if currentLength >= ts.metricCount { + if currentReplicas > *pytorchJob.Spec.ElasticPolicy.MinReplicas && currentReplicas <= *pytorchJob.Spec.ElasticPolicy.MaxReplicas { + lastReplicas := jobStatus.ElasticStatus[training.PyTorchReplicaTypeWorker].LastReplicas + currentLatency := ts.metrics[jobName][currentReplicas][currentLength-1].Latency + lastReplicaLatency := ts.metrics[jobName][lastReplicas][currentLength-1].Latency + log.Info("last latency: ", lastReplicaLatency, "current latency: ", currentLatency) + + if (lastReplicaLatency / float64(lastReplicas)) > (currentLatency / float64(currentReplicas)) { + + if currentReplicas == *pytorchJob.Spec.ElasticPolicy.MaxReplicas { + jobStatus.ElasticStatus[training.PyTorchReplicaTypeWorker].Message = "The pytorch job has reached the MaxReplicas" + jobStatus.ElasticStatus[training.PyTorchReplicaTypeWorker].Continue = false + ts.metrics[jobName][currentReplicas] = make([]MetricObservation, 0) + } else { + newReplicas := currentReplicas + 1 + *pytorchJob.Spec.PyTorchReplicaSpecs[training.PyTorchReplicaTypeWorker].Replicas = newReplicas + if err := ts.Client.Update(ctx, pytorchJob); err != nil { + log.Info("fail to update pytorch job") + } + + jobStatus.ElasticStatus[training.PyTorchReplicaTypeWorker].LastReplicas = currentReplicas + jobStatus.ElasticStatus[training.PyTorchReplicaTypeWorker].CurrentReplicas = newReplicas + jobStatus.ElasticStatus[training.PyTorchReplicaTypeWorker].Message = "continues to scale pytorch job" + now := metav1.Now() + jobStatus.ElasticStatus[training.PyTorchReplicaTypeWorker].LastUpdateTime = &now + jobStatus.ElasticStatus[training.PyTorchReplicaTypeWorker].Continue = true + if _, ok := ts.metrics[jobName][newReplicas]; !ok { + ts.metrics[jobName][newReplicas] = make([]MetricObservation, 0) + } + } + + } else { + *pytorchJob.Spec.PyTorchReplicaSpecs[training.PyTorchReplicaTypeWorker].Replicas = jobStatus.ElasticStatus[training.PyTorchReplicaTypeWorker].LastReplicas + if err := ts.Client.Update(ctx, pytorchJob); err != nil { + log.Info("fail to update pytorch job") + } + jobStatus.ElasticStatus[training.PyTorchReplicaTypeWorker].CurrentReplicas = lastReplicas + jobStatus.ElasticStatus[training.PyTorchReplicaTypeWorker].LastReplicas = currentReplicas + jobStatus.ElasticStatus[training.PyTorchReplicaTypeWorker].Message = "The pytorch job does not need to be scaled, then go back to the last choice" + now := metav1.Now() + jobStatus.ElasticStatus[training.PyTorchReplicaTypeWorker].LastUpdateTime = &now + jobStatus.ElasticStatus[training.PyTorchReplicaTypeWorker].Continue = false + ts.metrics[jobName][lastReplicas] = make([]MetricObservation, 0) + ts.metrics[jobName][currentReplicas] = make([]MetricObservation, 0) + } + + } else if currentReplicas == *pytorchJob.Spec.ElasticPolicy.MinReplicas && currentReplicas < *pytorchJob.Spec.ElasticPolicy.MaxReplicas { + newReplicas := *pytorchJob.Spec.PyTorchReplicaSpecs[training.PyTorchReplicaTypeWorker].Replicas + 1 + *pytorchJob.Spec.PyTorchReplicaSpecs[training.PyTorchReplicaTypeWorker].Replicas = newReplicas + if err := ts.Client.Update(ctx, pytorchJob); err != nil { + log.Info("fail to update pytorch job") + } + + jobStatus.ElasticStatus[training.PyTorchReplicaTypeWorker].CurrentReplicas = newReplicas + jobStatus.ElasticStatus[training.PyTorchReplicaTypeWorker].LastReplicas = currentReplicas + jobStatus.ElasticStatus[training.PyTorchReplicaTypeWorker].Message = "The pytorch job continues to be scaled" + now := metav1.Now() + jobStatus.ElasticStatus[training.PyTorchReplicaTypeWorker].LastUpdateTime = &now + jobStatus.ElasticStatus[training.PyTorchReplicaTypeWorker].Continue = true + + if _, ok := ts.metrics[jobName][newReplicas]; !ok { + ts.metrics[jobName][newReplicas] = make([]MetricObservation, 0) + } + + } else if currentReplicas == *pytorchJob.Spec.ElasticPolicy.MaxReplicas { + jobStatus.ElasticStatus[training.PyTorchReplicaTypeWorker].Message = "The pytorch job has reached the MaxReplicas" + jobStatus.ElasticStatus[training.PyTorchReplicaTypeWorker].Continue = false + ts.metrics[jobName][currentReplicas] = make([]MetricObservation, 0) + } + } + + // No need to update the job status if the status hasn't changed since last time. + if !reflect.DeepEqual(*oldStatus, jobStatus) { + if err = ts.UpdateJobStatusInApiServer(pytorchJob, jobStatus); err != nil { + if errors.IsConflict(err) { + // retry later when update operation violates with etcd concurrency control. + logger.Info("fail to update pytorch job status") + return + } + } + } + + return +} + +func (ts *TorchElasticController) GetPodsForJob(job *training.PyTorchJob) ([]*v1.Pod, error) { + selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{ + MatchLabels: ts.GenLabels(job.Name), + }) + // List all pods to include those that don't match the selector anymore + // but have a ControllerRef pointing to this controller. + podList := &v1.PodList{} + err = ts.Client.List(context.Background(), podList, client.MatchingLabelsSelector{Selector: selector}) + if err != nil { + return nil, err + } + return commonutil.ToPodPointerList(podList.Items), nil +} + +func (ts *TorchElasticController) GenLabels(jobName string) map[string]string { + + labelGroupName := apiv1.GroupNameLabel + labelJobName := apiv1.JobNameLabel + groupName := ts.GetGroupNameLabelValue() + return map[string]string{ + labelGroupName: groupName, + labelJobName: strings.Replace(jobName, "/", "-", -1), + } +} + +func (ts *TorchElasticController) GetGroupNameLabelValue() string { + return training.SchemeGroupVersion.Group +} + +func (ts *TorchElasticController) ControllerName() string { + return controllerName +} + +func makeElasticJobName(name, namespace string) string { + return name + "-" + namespace +} + +// UpdateJobStatusInApiServer updates the job status in API server +func (ts *TorchElasticController) UpdateJobStatusInApiServer(job interface{}, jobStatus *controllerv1.JobStatus) error { + torchElasticJob, ok := job.(*training.PyTorchJob) + if !ok { + return fmt.Errorf("%+v is not a type of PytorchJob", torchElasticJob) + } + var jobCpy *training.PyTorchJob + // Job status passed in differs with status in job, update in basis of the passed in one. + jobCpy = torchElasticJob.DeepCopy() + jobCpy.Status = *jobStatus.DeepCopy() + return ts.Status().Update(context.Background(), jobCpy) +} + +// initializeReplicaStatuses initializes the ElasticStatuses for replica. +func initializeElasticStatuses(jobStatus *apiv1.JobStatus, rtype apiv1.ReplicaType) { + if jobStatus.ElasticStatus == nil { + jobStatus.ElasticStatus = make(map[apiv1.ReplicaType]*apiv1.ElasticScalingStatus) + } + + jobStatus.ElasticStatus[rtype] = &apiv1.ElasticScalingStatus{} +} diff --git a/example/pytorch/torchelastic/etcd.yaml b/example/pytorch/torchelastic/etcd.yaml new file mode 100644 index 00000000..bd787ece --- /dev/null +++ b/example/pytorch/torchelastic/etcd.yaml @@ -0,0 +1,45 @@ +apiVersion: v1 +kind: Service +metadata: + name: etcd-service + namespace: elastic-job +spec: + ports: + - name: etcd-client-port + port: 2379 + protocol: TCP + targetPort: 2379 + selector: + app: etcd + +--- +apiVersion: v1 +kind: Pod +metadata: + labels: + app: etcd + name: etcd + namespace: elastic-job +spec: + containers: + - command: + - /usr/local/bin/etcd + - --data-dir + - /var/lib/etcd + - --enable-v2 + - --listen-client-urls + - http://0.0.0.0:2379 + - --advertise-client-urls + - http://0.0.0.0:2379 + - --initial-cluster-state + - new + image: k8s.gcr.io/etcd:3.5.1-0 + name: etcd + ports: + - containerPort: 2379 + name: client + protocol: TCP + - containerPort: 2380 + name: server + protocol: TCP + restartPolicy: Always diff --git a/example/pytorch/torchelastic/torchelastic.yaml b/example/pytorch/torchelastic/torchelastic.yaml new file mode 100644 index 00000000..938be11e --- /dev/null +++ b/example/pytorch/torchelastic/torchelastic.yaml @@ -0,0 +1,55 @@ +apiVersion: training.kubedl.io/v1alpha1 +kind: "PyTorchJob" +metadata: + name: "resnet" + namespace: elastic-job +spec: + enableElastic: true + elasticPolicy: + rdzvBackend: etcd + rdzvEndpoint: "etcd-service:2379" + minReplicas: 1 + maxReplicas: 3 + nProcPerNode: 1 + pytorchReplicaSpecs: + Master: + replicas: 1 + restartPolicy: ExitCode + template: + spec: + containers: + - name: pytorch + image: kubedl/pytorch-dist-example + imagePullPolicy: Always + Worker: + replicas: 1 + restartPolicy: ExitCode + template: + spec: + volumes: + - name: checkpoint + persistentVolumeClaim: + claimName: pvc-torch-checkpoint + containers: + - name: pytorch + image: wanziyu/imagenet:1.1 + imagePullPolicy: Always + args: + - "/workspace/examples/imagenet/main.py" + - "--arch=resnet50" + - "--epochs=20" + - "--batch-size=64" + - "--print-freq=50" + # number of data loader workers (NOT trainers) + # zero means load the data on the same process as the trainer + # this is set so that the container does not OOM since + # pytorch data loaders use shm + - "--workers=0" + - "/workspace/data/tiny-imagenet-200" + - "--checkpoint-file=/mnt/blob/data/checkpoint.pth.tar" + resources: + limits: + nvidia.com/gpu: 1 + volumeMounts: + - name: checkpoint + mountPath: "/mnt/blob/data" \ No newline at end of file diff --git a/main.go b/main.go index 80472b38..d059ae94 100644 --- a/main.go +++ b/main.go @@ -17,6 +17,7 @@ limitations under the License. package main import ( + "github.com/alibaba/kubedl/controllers/torchelastic" "os" "k8s.io/apimachinery/pkg/util/net" @@ -117,6 +118,11 @@ func main() { os.Exit(1) } + if err = torchelastic.SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to setup elastic scaling controllers") + os.Exit(1) + } + // Start monitoring for default registry. metrics.StartMonitoringForDefaultRegistry(metricsAddr) diff --git a/pkg/job_controller/api/v1/types.go b/pkg/job_controller/api/v1/types.go index 2ac1acb7..f33a40d6 100644 --- a/pkg/job_controller/api/v1/types.go +++ b/pkg/job_controller/api/v1/types.go @@ -41,6 +41,10 @@ type JobStatus struct { // It is represented in RFC3339 form and is in UTC. CompletionTime *metav1.Time `json:"completionTime,omitempty"` + // Represents the elastic scaling status for training jobs, + // specifies the status of current elastic scaling. + ElasticStatus map[ReplicaType]*ElasticScalingStatus `json:"elasticScaling,omitempty"` + // Represents last time when the job was reconciled. It is not guaranteed to // be set in happens-before order across separate operations. // It is represented in RFC3339 form and is in UTC. @@ -57,6 +61,24 @@ type JobStatus struct { // own set of ReplicaTypes. type ReplicaType string +// ElasticScalingStatus represents the current elastic scaling status of the training job. +type ElasticScalingStatus struct { + // The number of current scaling pod replicas. + CurrentReplicas int32 `json:"currentReplicas,omitempty"` + + // The number of last scaling pod replicas. + LastReplicas int32 `json:"lastReplicas,omitempty"` + + // Continue represents whether the job needs to continue scaling. + Continue bool `json:"continue,omitempty"` + + // The time this elastic scaling loop was started. + LastUpdateTime *metav1.Time `json:"startTime,omitempty"` + + // A human readable message indicating details about the transition. + Message string `json:"message,omitempty"` +} + // ReplicaStatus represents the current observed state of the replica. type ReplicaStatus struct { // The number of actively running pods. diff --git a/pkg/job_controller/job.go b/pkg/job_controller/job.go index c6319b66..da7e2093 100644 --- a/pkg/job_controller/job.go +++ b/pkg/job_controller/job.go @@ -317,10 +317,15 @@ func (jc *JobController) ReconcileJobs(job interface{}, replicas map[apiv1.Repli continue } - // Service is in need only for Master - if jc.Controller.GetAPIGroupVersionKind().Kind == training.PyTorchJobKind && - rtype != training.PyTorchReplicaTypeMaster { - continue + if jc.Controller.GetAPIGroupVersionKind().Kind == training.PyTorchJobKind { + pytorchJob, ok := job.(*training.PyTorchJob) + if !ok { + log.Warnf("Job is not a type of PytorchJob %v", err) + } + // Service is in need only for pytorch Master + if !pytorchJob.Spec.EnableElastic && rtype != training.PyTorchReplicaTypeMaster { + continue + } } err = jc.ReconcileServices(ctx, metaObject, services, rtype, spec) diff --git a/pkg/job_controller/service.go b/pkg/job_controller/service.go index f5d652b2..7ac7af6d 100644 --- a/pkg/job_controller/service.go +++ b/pkg/job_controller/service.go @@ -162,11 +162,28 @@ func (jc *JobController) FilterServicesForReplicaType(services []*v1.Service, re return result, nil } +// calculateServiceSliceSize compare max pod index with desired replicas and return larger size +func calculateServiceSliceSize(services []*v1.Service, replicas int) int { + size := 0 + for _, svc := range services { + if _, ok := svc.Labels[apiv1.ReplicaIndexLabel]; !ok { + continue + } + index, err := strconv.Atoi(svc.Labels[apiv1.ReplicaIndexLabel]) + if err != nil { + continue + } + size = maxInt(size, index) + } + // size comes from index, need to +1 to indicate real size + return maxInt(size+1, replicas) +} + // GetServiceSlices returns a slice, which element is the slice of service. // Assume the return object is serviceSlices, then serviceSlices[i] is an // array of pointers to services corresponding to Services for replica i. func (jc *JobController) GetServiceSlices(services []*v1.Service, replicas int, logger *log.Entry) [][]*v1.Service { - serviceSlices := make([][]*v1.Service, replicas) + serviceSlices := make([][]*v1.Service, calculateServiceSliceSize(services, replicas)) for _, service := range services { if _, ok := service.Labels[apiv1.ReplicaIndexLabel]; !ok { logger.Warning("The service do not have the index label.") diff --git a/pkg/job_controller/util.go b/pkg/job_controller/util.go index 61229548..239fd31f 100644 --- a/pkg/job_controller/util.go +++ b/pkg/job_controller/util.go @@ -75,3 +75,10 @@ func ReplicaTypes(specs map[v1.ReplicaType]*v1.ReplicaSpec) []v1.ReplicaType { } return replicas } + +func maxInt(x, y int) int { + if x < y { + return y + } + return x +}