-
-
Notifications
You must be signed in to change notification settings - Fork 143
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add ability to instantiate named clusters from jobqueue config #604
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
"""Namespace for dask_jobqueue.core.JobQueueCluster implementations.""" | ||
from typing import Dict | ||
|
||
from .core import JobQueueCluster | ||
from .htcondor import HTCondorJob, HTCondorCluster | ||
from .local import LocalJob, LocalCluster | ||
from .lsf import LSFJob, LSFCluster | ||
from .moab import MoabJob, MoabCluster | ||
from .oar import OARJob, OARCluster | ||
from .pbs import PBSJob, PBSCluster | ||
from .sge import SGEJob, SGECluster | ||
from .slurm import SLURMJob, SLURMCluster | ||
|
||
CLUSTER_CLASSES: Dict[str, JobQueueCluster] = { | ||
HTCondorJob.config_name: HTCondorCluster, | ||
LocalJob.config_name: LocalCluster, | ||
LSFJob.config_name: LSFCluster, | ||
MoabJob.config_name: MoabCluster, | ||
OARJob.config_name: OARCluster, | ||
PBSJob.config_name: PBSCluster, | ||
SGEJob.config_name: SGECluster, | ||
SLURMJob.config_name: SLURMCluster, | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,6 +4,7 @@ | |
import sys | ||
import re | ||
import psutil | ||
from unittest import mock | ||
|
||
import pytest | ||
|
||
|
@@ -18,6 +19,7 @@ | |
) | ||
from dask_jobqueue.core import Job | ||
from dask_jobqueue.local import LocalCluster | ||
from dask_jobqueue._clusters import CLUSTER_CLASSES | ||
|
||
from dask_jobqueue.sge import SGEJob | ||
|
||
|
@@ -55,7 +57,7 @@ def test_shebang_settings(Cluster, request): | |
request.node.add_marker( | ||
pytest.mark.xfail( | ||
reason="%s has a peculiar submit script and does not have a shebang" | ||
% type(Cluster).__name__ | ||
% type(Cluster).__name__ | ||
) | ||
) | ||
default_shebang = "#!/usr/bin/env bash" | ||
|
@@ -244,7 +246,6 @@ def get_interface_and_port(index=0): | |
|
||
|
||
def test_scheduler_options(Cluster): | ||
|
||
interface, port = get_interface_and_port() | ||
|
||
with Cluster( | ||
|
@@ -321,7 +322,6 @@ def test_import_scheduler_options_from_config(Cluster): | |
with dask.config.set( | ||
{"jobqueue.%s.scheduler-options" % default_config_name: scheduler_options} | ||
): | ||
|
||
with Cluster(cores=2, memory="2GB") as cluster: | ||
scheduler_options = cluster.scheduler_spec["options"] | ||
assert scheduler_options.get("interface") == config_scheduler_interface | ||
|
@@ -455,7 +455,8 @@ def test_security_temporary_defaults(EnvSpecificCluster, loop): | |
memory="500MiB", | ||
security=True, | ||
protocol="tls", | ||
loop=loop, # for some reason (bug?) using the loop fixture requires using a new test case | ||
loop=loop, | ||
# for some reason (bug?) using the loop fixture requires using a new test case | ||
) as cluster: | ||
assert cluster.security | ||
assert cluster.scheduler_spec["options"]["security"] == cluster.security | ||
|
@@ -469,3 +470,45 @@ def test_security_temporary_defaults(EnvSpecificCluster, loop): | |
future = client.submit(lambda x: x + 1, 10) | ||
result = future.result(timeout=30) | ||
assert result == 11 | ||
|
||
|
||
def test_jobqueuecluster_from_name_for_existing_scheduling_systems(): | ||
for scheduling_system in CLUSTER_CLASSES.keys(): | ||
temporary_config = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is setting this temporary config really useful to this test? |
||
f"jobqueue.{scheduling_system}.cores": 8, | ||
f"jobqueue.{scheduling_system}.memory": "24GB", | ||
f"jobqueue.{scheduling_system}.disk": "1GB", | ||
} | ||
with dask.config.set(temporary_config): | ||
cluster = JobQueueCluster.from_name(scheduling_system) | ||
assert isinstance(cluster, CLUSTER_CLASSES[scheduling_system]) | ||
|
||
|
||
def test_jobqueuecluster_from_name_for_custom_cluster(): | ||
temporary_config = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here also you don't seem to check the custom config. Wasn't that mean to check that htcondor default config wasn't taken into account? |
||
"jobqueue.custom-condor-cluster.job-scheduling-system": 'htcondor', | ||
"jobqueue.htcondor.cores": 8, | ||
"jobqueue.htcondor.memory": "24GB", | ||
"jobqueue.htcondor.disk": "1GB", | ||
} | ||
# check for creation of correct cluster class | ||
with dask.config.set(temporary_config): | ||
cluster = JobQueueCluster.from_name("custom-condor-cluster") | ||
assert isinstance(cluster, HTCondorCluster) | ||
|
||
|
||
@mock.patch('dask_jobqueue.htcondor.HTCondorCluster.__new__') | ||
def test_jobqueuecluster_from_name_attribute_override(mock_cluster): | ||
temporary_config = { | ||
"jobqueue.custom-condor-cluster.job-scheduling-system": 'htcondor', | ||
"jobqueue.htcondor.cores": 8, # overriden in custom cluster | ||
"jobqueue.custom-condor-cluster.cores": 16, | ||
"jobqueue.htcondor.memory": "24GB", | ||
"jobqueue.htcondor.disk": "1GB", | ||
} | ||
|
||
# check that number of cores defined in the custom cluster overrides the | ||
# default specified in htcondor cluster config | ||
with dask.config.set(temporary_config): | ||
JobQueueCluster.from_name("custom-condor-cluster") | ||
mock_cluster.assert_called_with(HTCondorCluster, cores=16) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.