Skip to content

Commit

Permalink
Update imports and versions of objects for cluster utilities
Browse files Browse the repository at this point in the history
  • Loading branch information
JMGaljaard committed Sep 4, 2022
1 parent 68bf45a commit a0f06da
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 7 deletions.
1 change: 1 addition & 0 deletions fltk/util/cluster/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .client import ClusterManager
22 changes: 15 additions & 7 deletions fltk/util/cluster/client.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from __future__ import annotations
import logging
import time
from collections import defaultdict
Expand All @@ -7,17 +8,20 @@
from uuid import UUID

import schedule
from kubeflow.training import V1ReplicaSpec, KubeflowOrgV1PyTorchJob, KubeflowOrgV1PyTorchJobSpec
from kubeflow.training import V1ReplicaSpec, KubeflowOrgV1PyTorchJob, KubeflowOrgV1PyTorchJobSpec, V1RunPolicy
from kubernetes import client
from kubernetes.client import V1ObjectMeta, V1ResourceRequirements, V1Container, V1PodTemplateSpec, \
V1VolumeMount, V1Toleration, V1Volume, V1PersistentVolumeClaimVolumeSource, V1ConfigMapVolumeSource

from fltk.util.cluster.conversion import Convert
from fltk.util.config import DistributedConfig
from fltk.util.singleton import Singleton
from fltk.util.task.config.parameter import SystemResources
from fltk.util.task.task import DistributedArrivalTask, ArrivalTask, FederatedArrivalTask

from typing import TYPE_CHECKING

if TYPE_CHECKING:
from fltk.util.config import DistributedConfig

@dataclass
class Resource:
Expand Down Expand Up @@ -53,6 +57,8 @@ class ResourceWatchDog:
on GithHub:
https://gist.github.com/gorenje/dff508489c3c8a460433ad709f14b7db
N.B. this class acts as a starting point for scheduling based on resource availability.
"""
_alive: False
_time: float = -1
Expand Down Expand Up @@ -217,11 +223,11 @@ def _generate_command(config: DistributedConfig, task: ArrivalTask) -> List[str]
federated = isinstance(task, FederatedArrivalTask)
if federated:
# Perform Federated Learning experiment.
command = ('python3 -m fltk remote experiments/node.config.yaml')
command = 'python3 -m fltk remote experiments/node.config.yaml'
else:
# TODO: Set correct backend depending on CUDA.
command = (f'python3 -m fltk client {config.config_path} {task.id} '
f'experiments/node.config.yaml '
f'--backend gloo')
f'experiments/node.config.json --backend gloo')
return command.split(' ')


Expand Down Expand Up @@ -401,7 +407,7 @@ def build_template(self, configmap_name_dict: Optional[Dict[str, str]]) -> None:
volumes=volumes,
tolerations=self._build_description.tolerations))

def build_spec(self, task: ArrivalTask, restart_policy: str = 'Never') -> None:
def build_spec(self, task: ArrivalTask, restart_policy: str = 'Never', clean_policy: Optional[str] = None) -> None:
"""
Function to build V1JobSpec object that contains the specifications of the Pods to be spawned in Kubernetes.
Effectively this function creates the replica counts for the `Master` and `Worker` nodes in the train job
Expand All @@ -422,7 +428,9 @@ def build_spec(self, task: ArrivalTask, restart_policy: str = 'Never') -> None:
template=tpe_template)
pt_rep_spec[tpe] = typed_replica_spec

job_spec = KubeflowOrgV1PyTorchJobSpec(pytorch_replica_specs=pt_rep_spec)
# N.B. This will not result in deleting pods.
job_spec = KubeflowOrgV1PyTorchJobSpec(pytorch_replica_specs=pt_rep_spec,
run_policy=V1RunPolicy(clean_pod_policy=clean_policy))
self._build_description.spec = job_spec

def construct(self) -> KubeflowOrgV1PyTorchJob:
Expand Down

0 comments on commit a0f06da

Please sign in to comment.