Skip to content

Commit

Permalink
Update codebase to make use of kubeflow 1.5.0 training operator
Browse files Browse the repository at this point in the history
  • Loading branch information
JMGaljaard committed Sep 4, 2022
1 parent e880e11 commit 1eaa73e
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 25 deletions.
15 changes: 7 additions & 8 deletions fltk/core/distributed/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
import time
import uuid
from queue import PriorityQueue
from typing import List, OrderedDict, Dict, Type, Union
from typing import List, OrderedDict, Dict, Type

from jinja2 import Environment, FileSystemLoader
from kubeflow.pytorchjob import PyTorchJobClient
from kubeflow.pytorchjob.constants.constants import PYTORCHJOB_GROUP, PYTORCHJOB_VERSION, PYTORCHJOB_PLURAL
from kubeflow.training import PyTorchJobClient
from kubeflow.training.constants.constants import PYTORCHJOB_GROUP, PYTORCHJOB_VERSION, PYTORCHJOB_PLURAL
from kubernetes import client
from kubernetes.client import V1ConfigMap, V1ObjectMeta

Expand Down Expand Up @@ -54,7 +54,6 @@ def render_template(task: ArrivalTask, tpe: str, replication: int, experiment_pa

def _prepare_experiment_maps(task: ArrivalTask, config: DistributedConfig, u_id: uuid.UUID, replication: int = 1) -> \
(OrderedDict[str, V1ConfigMap], OrderedDict[str, str]):

type_dict = collections.OrderedDict()
name_dict = collections.OrderedDict()
for tpe in task.type_map.keys():
Expand All @@ -79,8 +78,8 @@ def _generate_task(arrival) -> ArrivalTask:
unique_identifier: uuid.UUID = uuid.uuid4()
task_type: Type[ArrivalTask] = get_job_arrival_class(arrival.task.experiment_type)
task = task_type.build(arrival=arrival,
u_id=unique_identifier,
replication=arrival.task.replication)
u_id=unique_identifier,
replication=arrival.task.replication)
return task


Expand Down Expand Up @@ -173,10 +172,10 @@ def run(self, clear: bool = False) -> None:

def run_batch(self, clear: bool = False) -> None:
"""
Main loop of the Orchestrator.
Main loop of the Orchestrator for processing a configuration as a batch, i.e. deploy one-after-another without
any scheduling or simulation applied.
@param clear: Boolean indicating whether a previous deployment needs to be cleaned up (i.e. lingering jobs that
were deployed by the previous run).
@type clear: bool
@return: None
@rtype: None
Expand Down
24 changes: 9 additions & 15 deletions fltk/util/cluster/client.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import logging
import time
from collections import defaultdict
from dataclasses import dataclass
from multiprocessing.pool import ThreadPool
from typing import Dict, List, Tuple, Optional, OrderedDict, Union
from uuid import UUID

import schedule
import time
from kubeflow.pytorchjob import V1PyTorchJob, V1ReplicaSpec, V1PyTorchJobSpec
from kubeflow.training import V1ReplicaSpec, KubeflowOrgV1PyTorchJob, KubeflowOrgV1PyTorchJobSpec
from kubernetes import client
from kubernetes.client import V1ObjectMeta, V1ResourceRequirements, V1Container, V1PodTemplateSpec, \
V1VolumeMount, V1Toleration, V1Volume, V1PersistentVolumeClaimVolumeSource, V1ConfigMapVolumeSource
Expand Down Expand Up @@ -42,7 +42,7 @@ class BuildDescription:
typed_containers = OrderedDict[str, V1Container]()
typed_templates = OrderedDict[str, V1PodTemplateSpec]()
id: Optional[UUID] = None # pylint: disable=invalid-name
spec: Optional[V1PyTorchJobSpec] = None
spec: Optional[KubeflowOrgV1PyTorchJobSpec] = None
tolerations: Optional[List[V1Toleration]] = None


Expand Down Expand Up @@ -420,22 +420,20 @@ def build_spec(self, task: ArrivalTask, restart_policy: str = 'Never') -> None:
replicas=task.typed_replica_count(tpe),
restart_policy=restart_policy,
template=tpe_template)
typed_replica_spec.openapi_types = typed_replica_spec.swagger_types
pt_rep_spec[tpe] = typed_replica_spec

job_spec = V1PyTorchJobSpec(pytorch_replica_specs=pt_rep_spec)
job_spec.openapi_types = job_spec.swagger_types
job_spec = KubeflowOrgV1PyTorchJobSpec(pytorch_replica_specs=pt_rep_spec)
self._build_description.spec = job_spec

def construct(self) -> V1PyTorchJob:
def construct(self) -> KubeflowOrgV1PyTorchJob:
"""
Contruct V1PyTorch object following the description of the building process. Note that V1PyTorchJob differs
slightly from a V1Job object in Kubernetes. Refer to the kubeflow documentation for more information on the
PV1PyTorchJob object.
@return: V1PyTorchJob object that was dynamically constructed.
@rtype: V1PyTorchJob
"""
job = V1PyTorchJob(
job = KubeflowOrgV1PyTorchJob(
api_version="kubeflow.org/v1",
kind="PyTorchJob",
metadata=V1ObjectMeta(name=f'trainjob-{self._build_description.id}', namespace='test'),
Expand All @@ -454,7 +452,7 @@ def create_identifier(self, task: ArrivalTask):


def construct_job(conf: DistributedConfig, task: ArrivalTask,
configmap_name_dict: Optional[Dict[str, str]] = None) -> V1PyTorchJob:
configmap_name_dict: Optional[Dict[str, str]] = None) -> KubeflowOrgV1PyTorchJob:
"""
Function to build a Job, based on the specifications of an ArrivalTask, and the general configuration of the
DistributedConfig.
Expand All @@ -464,10 +462,9 @@ def construct_job(conf: DistributedConfig, task: ArrivalTask,
@type task: DistributedArrivalTask
@param configmap_name_dict: Mapping of pod names to their respective K8s configMap names.
@type configmap_name_dict: Optional[Dict[str, str]]
@return: KubeFlow compatible PyTorchJob description to create a Job with the requested system and hyper-parameters.
@rtype: V1PyTorchJob
@return: KubeFlow compatible KubeflowOrgV1PyTorchJob description to create a Job with the requested system and hyper-parameters.
@rtype: KubeflowOrgV1PyTorchJob
"""

dp_builder = DeploymentBuilder()
dp_builder.create_identifier(task)
dp_builder.build_resources(task)
Expand All @@ -476,7 +473,4 @@ def construct_job(conf: DistributedConfig, task: ArrivalTask,
dp_builder.build_template(configmap_name_dict)
dp_builder.build_spec(task)
job = dp_builder.construct()
# Fix to deploy on more up-to-date Kubernetes clusters. See if needed for KubeFlow operator release.
# This can be removed under Kubeflow v1.5
job.openapi_types = job.swagger_types
return job
2 changes: 1 addition & 1 deletion requirements-cpu.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ iniconfig==1.1.1
iteration-utilities==0.11.0
Jinja2==3.1.1
joblib==1.0.1
kubeflow-pytorchjob==0.1.3
kubeflow-training==1.5.0
kubernetes==17.17.0
Markdown==3.3.4
MarkupSafe==2.1.1
Expand Down
2 changes: 1 addition & 1 deletion requirements-gpu.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ iniconfig==1.1.1
iteration-utilities==0.11.0
Jinja2==3.1.1
joblib==1.0.1
kubeflow-pytorchjob==0.1.3
kubeflow-training==0.1.3
kubernetes==17.17.0
Markdown==3.3.4
MarkupSafe==2.1.1
Expand Down

0 comments on commit 1eaa73e

Please sign in to comment.