Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to instantiate named clusters from jobqueue config #604

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions dask_jobqueue/_clusters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
"""Namespace for dask_jobqueue.core.JobQueueCluster implementations."""
from typing import Dict

from .core import JobQueueCluster
from .htcondor import HTCondorJob, HTCondorCluster
from .local import LocalJob, LocalCluster
from .lsf import LSFJob, LSFCluster
from .moab import MoabJob, MoabCluster
from .oar import OARJob, OARCluster
from .pbs import PBSJob, PBSCluster
from .sge import SGEJob, SGECluster
from .slurm import SLURMJob, SLURMCluster

CLUSTER_CLASSES: Dict[str, JobQueueCluster] = {
HTCondorJob.config_name: HTCondorCluster,
LocalJob.config_name: LocalCluster,
LSFJob.config_name: LSFCluster,
MoabJob.config_name: MoabCluster,
OARJob.config_name: OARCluster,
PBSJob.config_name: PBSCluster,
SGEJob.config_name: SGECluster,
SLURMJob.config_name: SLURMCluster,
}
22 changes: 21 additions & 1 deletion dask_jobqueue/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@
Name of Dask worker. This is typically set by the Cluster
""".strip()


cluster_parameters = """
n_workers : int
Number of workers to start by default. Defaults to 0.
Expand Down Expand Up @@ -830,3 +829,24 @@ def adapt(
if maximum_jobs is not None:
kwargs["maximum"] = maximum_jobs * self._dummy_job.worker_processes
return super().adapt(*args, **kwargs)

@classmethod
def from_name(cls, name: str):
"""Initialise a named Dask cluster specified in the jobqueue config.

Named clusters should specify the appropriate job scheduling system under the
'job-scheduling-system' key in the jobqueue config alongside specific keyword
arguments passed to the JobQueue cluster class on initialisation.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
arguments passed to the JobQueue cluster class on initialisation.
arguments passed to the JobQueueCluster class on initialisation.


Parameters
----------
name: str
Key in the dask jobqueue config specifying the cluster to be initialised.
"""
from ._clusters import CLUSTER_CLASSES # avoids circular import of subclasses
if name in CLUSTER_CLASSES:
return CLUSTER_CLASSES[name]()
else:
config = dask.config.get(f"jobqueue.{name}")
job_sheduling_system = config.pop("job-scheduling-system")
return CLUSTER_CLASSES[job_sheduling_system](**config)
51 changes: 47 additions & 4 deletions dask_jobqueue/tests/test_jobqueue_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import sys
import re
import psutil
from unittest import mock

import pytest

Expand All @@ -18,6 +19,7 @@
)
from dask_jobqueue.core import Job
from dask_jobqueue.local import LocalCluster
from dask_jobqueue._clusters import CLUSTER_CLASSES

from dask_jobqueue.sge import SGEJob

Expand Down Expand Up @@ -55,7 +57,7 @@ def test_shebang_settings(Cluster, request):
request.node.add_marker(
pytest.mark.xfail(
reason="%s has a peculiar submit script and does not have a shebang"
% type(Cluster).__name__
% type(Cluster).__name__
)
)
default_shebang = "#!/usr/bin/env bash"
Expand Down Expand Up @@ -244,7 +246,6 @@ def get_interface_and_port(index=0):


def test_scheduler_options(Cluster):

interface, port = get_interface_and_port()

with Cluster(
Expand Down Expand Up @@ -321,7 +322,6 @@ def test_import_scheduler_options_from_config(Cluster):
with dask.config.set(
{"jobqueue.%s.scheduler-options" % default_config_name: scheduler_options}
):

with Cluster(cores=2, memory="2GB") as cluster:
scheduler_options = cluster.scheduler_spec["options"]
assert scheduler_options.get("interface") == config_scheduler_interface
Expand Down Expand Up @@ -455,7 +455,8 @@ def test_security_temporary_defaults(EnvSpecificCluster, loop):
memory="500MiB",
security=True,
protocol="tls",
loop=loop, # for some reason (bug?) using the loop fixture requires using a new test case
loop=loop,
# for some reason (bug?) using the loop fixture requires using a new test case
) as cluster:
assert cluster.security
assert cluster.scheduler_spec["options"]["security"] == cluster.security
Expand All @@ -469,3 +470,45 @@ def test_security_temporary_defaults(EnvSpecificCluster, loop):
future = client.submit(lambda x: x + 1, 10)
result = future.result(timeout=30)
assert result == 11


def test_jobqueuecluster_from_name_for_existing_scheduling_systems():
for scheduling_system in CLUSTER_CLASSES.keys():
temporary_config = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is setting this temporary config really useful to this test?

f"jobqueue.{scheduling_system}.cores": 8,
f"jobqueue.{scheduling_system}.memory": "24GB",
f"jobqueue.{scheduling_system}.disk": "1GB",
}
with dask.config.set(temporary_config):
cluster = JobQueueCluster.from_name(scheduling_system)
assert isinstance(cluster, CLUSTER_CLASSES[scheduling_system])


def test_jobqueuecluster_from_name_for_custom_cluster():
temporary_config = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here also you don't seem to check the custom config. Wasn't that mean to check that htcondor default config wasn't taken into account?

"jobqueue.custom-condor-cluster.job-scheduling-system": 'htcondor',
"jobqueue.htcondor.cores": 8,
"jobqueue.htcondor.memory": "24GB",
"jobqueue.htcondor.disk": "1GB",
}
# check for creation of correct cluster class
with dask.config.set(temporary_config):
cluster = JobQueueCluster.from_name("custom-condor-cluster")
assert isinstance(cluster, HTCondorCluster)


@mock.patch('dask_jobqueue.htcondor.HTCondorCluster.__new__')
def test_jobqueuecluster_from_name_attribute_override(mock_cluster):
temporary_config = {
"jobqueue.custom-condor-cluster.job-scheduling-system": 'htcondor',
"jobqueue.htcondor.cores": 8, # overriden in custom cluster
"jobqueue.custom-condor-cluster.cores": 16,
"jobqueue.htcondor.memory": "24GB",
"jobqueue.htcondor.disk": "1GB",
}

# check that number of cores defined in the custom cluster overrides the
# default specified in htcondor cluster config
with dask.config.set(temporary_config):
JobQueueCluster.from_name("custom-condor-cluster")
mock_cluster.assert_called_with(HTCondorCluster, cores=16)