diff --git a/dask_jobqueue/_clusters.py b/dask_jobqueue/_clusters.py new file mode 100644 index 00000000..4635b8ee --- /dev/null +++ b/dask_jobqueue/_clusters.py @@ -0,0 +1,23 @@ +"""Namespace for dask_jobqueue.core.JobQueueCluster implementations.""" +from typing import Dict + +from .core import JobQueueCluster +from .htcondor import HTCondorJob, HTCondorCluster +from .local import LocalJob, LocalCluster +from .lsf import LSFJob, LSFCluster +from .moab import MoabJob, MoabCluster +from .oar import OARJob, OARCluster +from .pbs import PBSJob, PBSCluster +from .sge import SGEJob, SGECluster +from .slurm import SLURMJob, SLURMCluster + +CLUSTER_CLASSES: Dict[str, JobQueueCluster] = { + HTCondorJob.config_name: HTCondorCluster, + LocalJob.config_name: LocalCluster, + LSFJob.config_name: LSFCluster, + MoabJob.config_name: MoabCluster, + OARJob.config_name: OARCluster, + PBSJob.config_name: PBSCluster, + SGEJob.config_name: SGECluster, + SLURMJob.config_name: SLURMCluster, +} diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index 01c1756d..9c22272a 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -73,7 +73,6 @@ Name of Dask worker. This is typically set by the Cluster """.strip() - cluster_parameters = """ n_workers : int Number of workers to start by default. Defaults to 0. @@ -830,3 +829,24 @@ def adapt( if maximum_jobs is not None: kwargs["maximum"] = maximum_jobs * self._dummy_job.worker_processes return super().adapt(*args, **kwargs) + + @classmethod + def from_name(cls, name: str): + """Initialise a named Dask cluster specified in the jobqueue config. + + Named clusters should specify the appropriate job scheduling system under the + 'job-scheduling-system' key in the jobqueue config alongside specific keyword + arguments passed to the JobQueue cluster class on initialisation. + + Parameters + ---------- + name: str + Key in the dask jobqueue config specifying the cluster to be initialised. + """ + from ._clusters import CLUSTER_CLASSES # avoids circular import of subclasses + if name in CLUSTER_CLASSES: + return CLUSTER_CLASSES[name]() + else: + config = dask.config.get(f"jobqueue.{name}") + job_sheduling_system = config.pop("job-scheduling-system") + return CLUSTER_CLASSES[job_sheduling_system](**config) diff --git a/dask_jobqueue/tests/test_jobqueue_core.py b/dask_jobqueue/tests/test_jobqueue_core.py index 8f743f7e..c70363fa 100644 --- a/dask_jobqueue/tests/test_jobqueue_core.py +++ b/dask_jobqueue/tests/test_jobqueue_core.py @@ -4,6 +4,7 @@ import sys import re import psutil +from unittest import mock import pytest @@ -18,6 +19,7 @@ ) from dask_jobqueue.core import Job from dask_jobqueue.local import LocalCluster +from dask_jobqueue._clusters import CLUSTER_CLASSES from dask_jobqueue.sge import SGEJob @@ -55,7 +57,7 @@ def test_shebang_settings(Cluster, request): request.node.add_marker( pytest.mark.xfail( reason="%s has a peculiar submit script and does not have a shebang" - % type(Cluster).__name__ + % type(Cluster).__name__ ) ) default_shebang = "#!/usr/bin/env bash" @@ -244,7 +246,6 @@ def get_interface_and_port(index=0): def test_scheduler_options(Cluster): - interface, port = get_interface_and_port() with Cluster( @@ -321,7 +322,6 @@ def test_import_scheduler_options_from_config(Cluster): with dask.config.set( {"jobqueue.%s.scheduler-options" % default_config_name: scheduler_options} ): - with Cluster(cores=2, memory="2GB") as cluster: scheduler_options = cluster.scheduler_spec["options"] assert scheduler_options.get("interface") == config_scheduler_interface @@ -455,7 +455,8 @@ def test_security_temporary_defaults(EnvSpecificCluster, loop): memory="500MiB", security=True, protocol="tls", - loop=loop, # for some reason (bug?) using the loop fixture requires using a new test case + loop=loop, + # for some reason (bug?) using the loop fixture requires using a new test case ) as cluster: assert cluster.security assert cluster.scheduler_spec["options"]["security"] == cluster.security @@ -469,3 +470,45 @@ def test_security_temporary_defaults(EnvSpecificCluster, loop): future = client.submit(lambda x: x + 1, 10) result = future.result(timeout=30) assert result == 11 + + +def test_jobqueuecluster_from_name_for_existing_scheduling_systems(): + for scheduling_system in CLUSTER_CLASSES.keys(): + temporary_config = { + f"jobqueue.{scheduling_system}.cores": 8, + f"jobqueue.{scheduling_system}.memory": "24GB", + f"jobqueue.{scheduling_system}.disk": "1GB", + } + with dask.config.set(temporary_config): + cluster = JobQueueCluster.from_name(scheduling_system) + assert isinstance(cluster, CLUSTER_CLASSES[scheduling_system]) + + +def test_jobqueuecluster_from_name_for_custom_cluster(): + temporary_config = { + "jobqueue.custom-condor-cluster.job-scheduling-system": 'htcondor', + "jobqueue.htcondor.cores": 8, + "jobqueue.htcondor.memory": "24GB", + "jobqueue.htcondor.disk": "1GB", + } + # check for creation of correct cluster class + with dask.config.set(temporary_config): + cluster = JobQueueCluster.from_name("custom-condor-cluster") + assert isinstance(cluster, HTCondorCluster) + + +@mock.patch('dask_jobqueue.htcondor.HTCondorCluster.__new__') +def test_jobqueuecluster_from_name_attribute_override(mock_cluster): + temporary_config = { + "jobqueue.custom-condor-cluster.job-scheduling-system": 'htcondor', + "jobqueue.htcondor.cores": 8, # overriden in custom cluster + "jobqueue.custom-condor-cluster.cores": 16, + "jobqueue.htcondor.memory": "24GB", + "jobqueue.htcondor.disk": "1GB", + } + + # check that number of cores defined in the custom cluster overrides the + # default specified in htcondor cluster config + with dask.config.set(temporary_config): + JobQueueCluster.from_name("custom-condor-cluster") + mock_cluster.assert_called_with(HTCondorCluster, cores=16)