From 1eaa73e9b6b59258befeab7c8174dbd6e412c6b2 Mon Sep 17 00:00:00 2001 From: JMGaljaard Date: Tue, 23 Aug 2022 12:09:08 +0200 Subject: [PATCH] Update codebase to make use of kubeflow 1.5.0 training operator --- fltk/core/distributed/orchestrator.py | 15 +++++++-------- fltk/util/cluster/client.py | 24 +++++++++--------------- requirements-cpu.txt | 2 +- requirements-gpu.txt | 2 +- 4 files changed, 18 insertions(+), 25 deletions(-) diff --git a/fltk/core/distributed/orchestrator.py b/fltk/core/distributed/orchestrator.py index 38b56abc..3a7c0fa3 100644 --- a/fltk/core/distributed/orchestrator.py +++ b/fltk/core/distributed/orchestrator.py @@ -3,11 +3,11 @@ import time import uuid from queue import PriorityQueue -from typing import List, OrderedDict, Dict, Type, Union +from typing import List, OrderedDict, Dict, Type from jinja2 import Environment, FileSystemLoader -from kubeflow.pytorchjob import PyTorchJobClient -from kubeflow.pytorchjob.constants.constants import PYTORCHJOB_GROUP, PYTORCHJOB_VERSION, PYTORCHJOB_PLURAL +from kubeflow.training import PyTorchJobClient +from kubeflow.training.constants.constants import PYTORCHJOB_GROUP, PYTORCHJOB_VERSION, PYTORCHJOB_PLURAL from kubernetes import client from kubernetes.client import V1ConfigMap, V1ObjectMeta @@ -54,7 +54,6 @@ def render_template(task: ArrivalTask, tpe: str, replication: int, experiment_pa def _prepare_experiment_maps(task: ArrivalTask, config: DistributedConfig, u_id: uuid.UUID, replication: int = 1) -> \ (OrderedDict[str, V1ConfigMap], OrderedDict[str, str]): - type_dict = collections.OrderedDict() name_dict = collections.OrderedDict() for tpe in task.type_map.keys(): @@ -79,8 +78,8 @@ def _generate_task(arrival) -> ArrivalTask: unique_identifier: uuid.UUID = uuid.uuid4() task_type: Type[ArrivalTask] = get_job_arrival_class(arrival.task.experiment_type) task = task_type.build(arrival=arrival, - u_id=unique_identifier, - replication=arrival.task.replication) + u_id=unique_identifier, + replication=arrival.task.replication) return task @@ -173,10 +172,10 @@ def run(self, clear: bool = False) -> None: def run_batch(self, clear: bool = False) -> None: """ - Main loop of the Orchestrator. + Main loop of the Orchestrator for processing a configuration as a batch, i.e. deploy one-after-another without + any scheduling or simulation applied. @param clear: Boolean indicating whether a previous deployment needs to be cleaned up (i.e. lingering jobs that were deployed by the previous run). - @type clear: bool @return: None @rtype: None diff --git a/fltk/util/cluster/client.py b/fltk/util/cluster/client.py index 341f794e..5883bb6a 100644 --- a/fltk/util/cluster/client.py +++ b/fltk/util/cluster/client.py @@ -1,4 +1,5 @@ import logging +import time from collections import defaultdict from dataclasses import dataclass from multiprocessing.pool import ThreadPool @@ -6,8 +7,7 @@ from uuid import UUID import schedule -import time -from kubeflow.pytorchjob import V1PyTorchJob, V1ReplicaSpec, V1PyTorchJobSpec +from kubeflow.training import V1ReplicaSpec, KubeflowOrgV1PyTorchJob, KubeflowOrgV1PyTorchJobSpec from kubernetes import client from kubernetes.client import V1ObjectMeta, V1ResourceRequirements, V1Container, V1PodTemplateSpec, \ V1VolumeMount, V1Toleration, V1Volume, V1PersistentVolumeClaimVolumeSource, V1ConfigMapVolumeSource @@ -42,7 +42,7 @@ class BuildDescription: typed_containers = OrderedDict[str, V1Container]() typed_templates = OrderedDict[str, V1PodTemplateSpec]() id: Optional[UUID] = None # pylint: disable=invalid-name - spec: Optional[V1PyTorchJobSpec] = None + spec: Optional[KubeflowOrgV1PyTorchJobSpec] = None tolerations: Optional[List[V1Toleration]] = None @@ -420,14 +420,12 @@ def build_spec(self, task: ArrivalTask, restart_policy: str = 'Never') -> None: replicas=task.typed_replica_count(tpe), restart_policy=restart_policy, template=tpe_template) - typed_replica_spec.openapi_types = typed_replica_spec.swagger_types pt_rep_spec[tpe] = typed_replica_spec - job_spec = V1PyTorchJobSpec(pytorch_replica_specs=pt_rep_spec) - job_spec.openapi_types = job_spec.swagger_types + job_spec = KubeflowOrgV1PyTorchJobSpec(pytorch_replica_specs=pt_rep_spec) self._build_description.spec = job_spec - def construct(self) -> V1PyTorchJob: + def construct(self) -> KubeflowOrgV1PyTorchJob: """ Contruct V1PyTorch object following the description of the building process. Note that V1PyTorchJob differs slightly from a V1Job object in Kubernetes. Refer to the kubeflow documentation for more information on the @@ -435,7 +433,7 @@ def construct(self) -> V1PyTorchJob: @return: V1PyTorchJob object that was dynamically constructed. @rtype: V1PyTorchJob """ - job = V1PyTorchJob( + job = KubeflowOrgV1PyTorchJob( api_version="kubeflow.org/v1", kind="PyTorchJob", metadata=V1ObjectMeta(name=f'trainjob-{self._build_description.id}', namespace='test'), @@ -454,7 +452,7 @@ def create_identifier(self, task: ArrivalTask): def construct_job(conf: DistributedConfig, task: ArrivalTask, - configmap_name_dict: Optional[Dict[str, str]] = None) -> V1PyTorchJob: + configmap_name_dict: Optional[Dict[str, str]] = None) -> KubeflowOrgV1PyTorchJob: """ Function to build a Job, based on the specifications of an ArrivalTask, and the general configuration of the DistributedConfig. @@ -464,10 +462,9 @@ def construct_job(conf: DistributedConfig, task: ArrivalTask, @type task: DistributedArrivalTask @param configmap_name_dict: Mapping of pod names to their respective K8s configMap names. @type configmap_name_dict: Optional[Dict[str, str]] - @return: KubeFlow compatible PyTorchJob description to create a Job with the requested system and hyper-parameters. - @rtype: V1PyTorchJob + @return: KubeFlow compatible KubeflowOrgV1PyTorchJob description to create a Job with the requested system and hyper-parameters. + @rtype: KubeflowOrgV1PyTorchJob """ - dp_builder = DeploymentBuilder() dp_builder.create_identifier(task) dp_builder.build_resources(task) @@ -476,7 +473,4 @@ def construct_job(conf: DistributedConfig, task: ArrivalTask, dp_builder.build_template(configmap_name_dict) dp_builder.build_spec(task) job = dp_builder.construct() - # Fix to deploy on more up-to-date Kubernetes clusters. See if needed for KubeFlow operator release. - # This can be removed under Kubeflow v1.5 - job.openapi_types = job.swagger_types return job diff --git a/requirements-cpu.txt b/requirements-cpu.txt index 7571784e..f0128902 100644 --- a/requirements-cpu.txt +++ b/requirements-cpu.txt @@ -20,7 +20,7 @@ iniconfig==1.1.1 iteration-utilities==0.11.0 Jinja2==3.1.1 joblib==1.0.1 -kubeflow-pytorchjob==0.1.3 +kubeflow-training==1.5.0 kubernetes==17.17.0 Markdown==3.3.4 MarkupSafe==2.1.1 diff --git a/requirements-gpu.txt b/requirements-gpu.txt index 177faa1d..12803b69 100644 --- a/requirements-gpu.txt +++ b/requirements-gpu.txt @@ -20,7 +20,7 @@ iniconfig==1.1.1 iteration-utilities==0.11.0 Jinja2==3.1.1 joblib==1.0.1 -kubeflow-pytorchjob==0.1.3 +kubeflow-training==0.1.3 kubernetes==17.17.0 Markdown==3.3.4 MarkupSafe==2.1.1