From 26ed6679f1e78f7184455861754a71f15abd8028 Mon Sep 17 00:00:00 2001 From: alisterburt Date: Fri, 3 Mar 2023 15:08:15 +0000 Subject: [PATCH 1/4] add namespaces for cluster classes and job classes --- dask_jobqueue/_clusters.py | 10 ++++++++++ dask_jobqueue/_jobs.py | 10 ++++++++++ 2 files changed, 20 insertions(+) create mode 100644 dask_jobqueue/_clusters.py create mode 100644 dask_jobqueue/_jobs.py diff --git a/dask_jobqueue/_clusters.py b/dask_jobqueue/_clusters.py new file mode 100644 index 00000000..b1253a74 --- /dev/null +++ b/dask_jobqueue/_clusters.py @@ -0,0 +1,10 @@ +"""Namespace for dask_jobqueue.core.JobQueueCluster implementations.""" + +from .htcondor import HTCondorJob +from .local import LocalJob +from .lsf import LSFJob +from .moab import MoabJob +from .oar import OARJob +from .pbs import PBSJob +from .sge import SGEJob +from .slurm import SLURMJob diff --git a/dask_jobqueue/_jobs.py b/dask_jobqueue/_jobs.py new file mode 100644 index 00000000..520e8542 --- /dev/null +++ b/dask_jobqueue/_jobs.py @@ -0,0 +1,10 @@ +"""Namespace for dask_jobqueue.core.Job implementations.""" + +from .htcondor import HTCondorJob +from .local import LocalJob +from .lsf import LSFJob +from .moab import MoabJob +from .oar import OARJob +from .pbs import PBSJob +from .sge import SGEJob +from .slurm import SLURMJob From 54e4927452e2ad98bf70b0258058b8abbf75d2bd Mon Sep 17 00:00:00 2001 From: alisterburt Date: Mon, 6 Mar 2023 12:46:23 +0000 Subject: [PATCH 2/4] implement class method for JobQueueCluster initialisation from name --- dask_jobqueue/_clusters.py | 29 +++++++++++++++++++++-------- dask_jobqueue/_jobs.py | 10 ---------- dask_jobqueue/core.py | 22 +++++++++++++++++++++- 3 files changed, 42 insertions(+), 19 deletions(-) delete mode 100644 dask_jobqueue/_jobs.py diff --git a/dask_jobqueue/_clusters.py b/dask_jobqueue/_clusters.py index b1253a74..4635b8ee 100644 --- a/dask_jobqueue/_clusters.py +++ b/dask_jobqueue/_clusters.py @@ -1,10 +1,23 @@ """Namespace for dask_jobqueue.core.JobQueueCluster implementations.""" +from typing import Dict -from .htcondor import HTCondorJob -from .local import LocalJob -from .lsf import LSFJob -from .moab import MoabJob -from .oar import OARJob -from .pbs import PBSJob -from .sge import SGEJob -from .slurm import SLURMJob +from .core import JobQueueCluster +from .htcondor import HTCondorJob, HTCondorCluster +from .local import LocalJob, LocalCluster +from .lsf import LSFJob, LSFCluster +from .moab import MoabJob, MoabCluster +from .oar import OARJob, OARCluster +from .pbs import PBSJob, PBSCluster +from .sge import SGEJob, SGECluster +from .slurm import SLURMJob, SLURMCluster + +CLUSTER_CLASSES: Dict[str, JobQueueCluster] = { + HTCondorJob.config_name: HTCondorCluster, + LocalJob.config_name: LocalCluster, + LSFJob.config_name: LSFCluster, + MoabJob.config_name: MoabCluster, + OARJob.config_name: OARCluster, + PBSJob.config_name: PBSCluster, + SGEJob.config_name: SGECluster, + SLURMJob.config_name: SLURMCluster, +} diff --git a/dask_jobqueue/_jobs.py b/dask_jobqueue/_jobs.py deleted file mode 100644 index 520e8542..00000000 --- a/dask_jobqueue/_jobs.py +++ /dev/null @@ -1,10 +0,0 @@ -"""Namespace for dask_jobqueue.core.Job implementations.""" - -from .htcondor import HTCondorJob -from .local import LocalJob -from .lsf import LSFJob -from .moab import MoabJob -from .oar import OARJob -from .pbs import PBSJob -from .sge import SGEJob -from .slurm import SLURMJob diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index 01c1756d..9c22272a 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -73,7 +73,6 @@ Name of Dask worker. This is typically set by the Cluster """.strip() - cluster_parameters = """ n_workers : int Number of workers to start by default. Defaults to 0. @@ -830,3 +829,24 @@ def adapt( if maximum_jobs is not None: kwargs["maximum"] = maximum_jobs * self._dummy_job.worker_processes return super().adapt(*args, **kwargs) + + @classmethod + def from_name(cls, name: str): + """Initialise a named Dask cluster specified in the jobqueue config. + + Named clusters should specify the appropriate job scheduling system under the + 'job-scheduling-system' key in the jobqueue config alongside specific keyword + arguments passed to the JobQueue cluster class on initialisation. + + Parameters + ---------- + name: str + Key in the dask jobqueue config specifying the cluster to be initialised. + """ + from ._clusters import CLUSTER_CLASSES # avoids circular import of subclasses + if name in CLUSTER_CLASSES: + return CLUSTER_CLASSES[name]() + else: + config = dask.config.get(f"jobqueue.{name}") + job_sheduling_system = config.pop("job-scheduling-system") + return CLUSTER_CLASSES[job_sheduling_system](**config) From 407da63b5d059f54d8cb505667b417c1e0b94c45 Mon Sep 17 00:00:00 2001 From: alisterburt Date: Mon, 6 Mar 2023 14:50:42 +0000 Subject: [PATCH 3/4] add tests --- dask_jobqueue/tests/test_jobqueue_core.py | 52 +++++++++++++++++++++-- 1 file changed, 48 insertions(+), 4 deletions(-) diff --git a/dask_jobqueue/tests/test_jobqueue_core.py b/dask_jobqueue/tests/test_jobqueue_core.py index 8f743f7e..12dd3f5e 100644 --- a/dask_jobqueue/tests/test_jobqueue_core.py +++ b/dask_jobqueue/tests/test_jobqueue_core.py @@ -4,6 +4,7 @@ import sys import re import psutil +from unittest import mock import pytest @@ -12,12 +13,14 @@ from distributed.security import Security from distributed import Client +import dask_jobqueue from dask_jobqueue import ( JobQueueCluster, HTCondorCluster, ) from dask_jobqueue.core import Job from dask_jobqueue.local import LocalCluster +from dask_jobqueue._clusters import CLUSTER_CLASSES from dask_jobqueue.sge import SGEJob @@ -55,7 +58,7 @@ def test_shebang_settings(Cluster, request): request.node.add_marker( pytest.mark.xfail( reason="%s has a peculiar submit script and does not have a shebang" - % type(Cluster).__name__ + % type(Cluster).__name__ ) ) default_shebang = "#!/usr/bin/env bash" @@ -244,7 +247,6 @@ def get_interface_and_port(index=0): def test_scheduler_options(Cluster): - interface, port = get_interface_and_port() with Cluster( @@ -321,7 +323,6 @@ def test_import_scheduler_options_from_config(Cluster): with dask.config.set( {"jobqueue.%s.scheduler-options" % default_config_name: scheduler_options} ): - with Cluster(cores=2, memory="2GB") as cluster: scheduler_options = cluster.scheduler_spec["options"] assert scheduler_options.get("interface") == config_scheduler_interface @@ -455,7 +456,8 @@ def test_security_temporary_defaults(EnvSpecificCluster, loop): memory="500MiB", security=True, protocol="tls", - loop=loop, # for some reason (bug?) using the loop fixture requires using a new test case + loop=loop, + # for some reason (bug?) using the loop fixture requires using a new test case ) as cluster: assert cluster.security assert cluster.scheduler_spec["options"]["security"] == cluster.security @@ -469,3 +471,45 @@ def test_security_temporary_defaults(EnvSpecificCluster, loop): future = client.submit(lambda x: x + 1, 10) result = future.result(timeout=30) assert result == 11 + + +def test_jobqueuecluster_from_name_for_existing_scheduling_systems(): + for scheduling_system in CLUSTER_CLASSES.keys(): + temporary_config = { + f"jobqueue.{scheduling_system}.cores": 8, + f"jobqueue.{scheduling_system}.memory": "24GB", + f"jobqueue.{scheduling_system}.disk": "1GB", + } + with dask.config.set(temporary_config): + cluster = JobQueueCluster.from_name(scheduling_system) + assert isinstance(cluster, CLUSTER_CLASSES[scheduling_system]) + + +def test_jobqueuecluster_from_name_for_custom_cluster(): + temporary_config = { + "jobqueue.custom-condor-cluster.job-scheduling-system": 'htcondor', + "jobqueue.htcondor.cores": 8, + "jobqueue.htcondor.memory": "24GB", + "jobqueue.htcondor.disk": "1GB", + } + # check for creation of correct cluster class + with dask.config.set(temporary_config): + cluster = JobQueueCluster.from_name("custom-condor-cluster") + assert isinstance(cluster, HTCondorCluster) + + +@mock.patch('dask_jobqueue.htcondor.HTCondorCluster.__new__') +def test_jobqueuecluster_from_name_attribute_override(mock_cluster): + temporary_config = { + "jobqueue.custom-condor-cluster.job-scheduling-system": 'htcondor', + "jobqueue.htcondor.cores": 8, # overriden in custom cluster + "jobqueue.custom-condor-cluster.cores": 16, + "jobqueue.htcondor.memory": "24GB", + "jobqueue.htcondor.disk": "1GB", + } + + # check that number of cores defined in the custom cluster overrides the + # default specified in htcondor cluster config + with dask.config.set(temporary_config): + JobQueueCluster.from_name("custom-condor-cluster") + mock_cluster.assert_called_with(HTCondorCluster, cores=16) From 5085eaa5a11819dc908fb03c99ada7ee9f0bc2f7 Mon Sep 17 00:00:00 2001 From: alisterburt Date: Sun, 12 Mar 2023 13:26:29 +0000 Subject: [PATCH 4/4] fix F401 --- dask_jobqueue/tests/test_jobqueue_core.py | 1 - 1 file changed, 1 deletion(-) diff --git a/dask_jobqueue/tests/test_jobqueue_core.py b/dask_jobqueue/tests/test_jobqueue_core.py index 12dd3f5e..c70363fa 100644 --- a/dask_jobqueue/tests/test_jobqueue_core.py +++ b/dask_jobqueue/tests/test_jobqueue_core.py @@ -13,7 +13,6 @@ from distributed.security import Security from distributed import Client -import dask_jobqueue from dask_jobqueue import ( JobQueueCluster, HTCondorCluster,