-
-
Notifications
You must be signed in to change notification settings - Fork 143
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
a direct way to specify the worker spec #601
Comments
As fair as I'm aware we aren't really doing this anywhere in the Dask deployment ecosystem. Most cluster managers have settings for node sizes and a way to configure the worker and leave it as an exercise to the user to fit one into the other. I expect the majority of cases folks configure the node size and leave the worker on auto. Effectively not tuning the cluster to their problem. The exception may be dask-kubernetes where the user configures the pod size and Kubernetes handles packing those pods into nodes. Often there is just a 1:1 mapping between the pod and worker settings in that case. This topic is definitely interesting, but I feel like it would add a bunch of complexity that may be hard to wrangle. |
Hi @keewis, I confirm what @jacobtomlinson is saying, not aware of anything like that, and if he says so, this must be true :). Just to make this more concrete, with your example, currently you do the maths and come up with something like (here for PBS): cluster = PBSCluster(cores=14, processes=7, memory="105GiB", resources_spec="ncpus=28:memory=115gb")
cluster.scale(14) I understand that this is somewhat cumbersome or even hard for some users to do right... And if you have various worker specs depending on various workflows, this gets even heavier to handle... And what you would like is something like: cluster = PBSCluster(job_cores=28, jobs_memory="115GiB", worker_threads=2, worker_memory="15GiB")
cluster.scale(14). Are we okay on this? If yes, I'd say we don't want to change all the existing classes and APIs. But maybe this could be done at a higher level with a new class on top of others? The hard point being that not all JobQueueCluster implementations use the same signature to declare job resources... I've not thought about it more than a few minutes, so maybe we could do something clever, but I don't have a nice idea that comes to my mind. @jacobtomlinson would dask-ctl or any other tool help for this? |
essentially, though to be even more specific, what I meant was cluster = PBSCluster(job_max_cores=28, job_max_memory="115GiB", worker_threads=2, worker_memory="15GiB")
cluster.scale(14)
So something that translates the high-level spec to the API of the lower-level classes? Without unifying the draft implementation of the proposed APIimport dask_jobqueue
from dask.utils import parse_bytes, format_bytes
def convert_to_low_level(*, max_cores, max_memory, worker_threads, worker_memory):
workers = max_memory // worker_memory
if workers == 0:
raise ValueError(
f"can't use more than {format_bytes(max_memory)} per worker (got {format_bytes(worker_memory)})"
)
max_threads_per_worker = max_cores // workers
if worker_threads is not None:
if worker_threads > max_threads_per_worker:
raise ValueError(
f"can't use more than {max_threads_per_worker} threads per worker (total available: {max_cores}, with {workers} workers)"
)
else:
worker_threads = max_threads_per_worker
memory = worker_memory * workers
cores = worker_threads * workers
return {"memory": format_bytes(memory), "cores": cores, "processes": workers}
cluster_implementations = {
"pbs": dask_jobqueue.PBSCluster,
"slurm": dask_jobqueue.SLURMCluster,
}
parameter_translations = {
"pbs": lambda x: x,
"slurm": lambda x: x,
}
class WorkerSpecCluster:
def __init__(
self,
*,
kind,
max_cores,
max_memory,
worker_memory,
worker_threads=None,
**additional_kwargs,
):
main_kwargs = convert_to_low_level(
max_cores=max_cores,
max_memory=parse_bytes(max_memory),
worker_memory=parse_bytes(worker_memory),
worker_threads=worker_threads,
)
translator = parameter_translations.get(kind, lambda x: x)
kwargs = additional_kwargs | translator(main_kwargs)
cluster_implementation = cluster_implementations.get(kind)
if cluster_implementation is None:
raise ValueError(
f"unknown implementation {kind!r}, choose one of {{{', '.join(sorted(cluster_implementations))}}}"
)
self._cluster = cluster_implementation(**kwargs)
def _html_repr_(self):
return self._cluster._html_repr_()
def __getattr__(self, name):
return getattr(self._cluster, name) but when trying it on my local jobqueue, I immediately ran into the issue that |
Yep, this is what I had in mind.
This is also something that could/should be considered, I think we might achieve some level of unification if we'd want to (at least how to specify job_cores, job_memory, worker_processes/memory/threads), although there have been several discussion on weather to use total cores vs threads by worker at the beginning of Dask jobqueue.
Yep, I agree with this part.
That looks like a good starting point!
You mean your job scheduler wants you to always book a complete compute node? So dask-jobqueue must ask for the max core? cluster = PBSCluster(job_cores=28, jobs_memory="115GiB", worker_threads=2, worker_memory="15GiB")
cluster.scale(14). Just book this number of cores and this memory with the job scheduling system, and fit as much workers with my spec in it. |
I'm frequently confused by the way the API requires me to specify resources for the jobqueue: I need to specify the job size and the number of workers per job (there's quite a few more knobs, of course), and it would evenly distribute the resources to each worker. I can then choose how many jobs to submit.
However, as a user (with admittedly a limited knowledge of how HPC work, so what I'm describing may be naive), my view is usually something like:
This usually leads to me trying to group the workers manually to optimally fit the resource limits (so I don't get de-prioritized by submitting too many jobs).
Instead, I ideally would like an API allows me to specify (or retrieve) the resource limits per job of the jobqueue and the desired worker spec. It would then try to optimally distribute the workers and submit the jobs for me (and fail early if the resource limits don't allow the worker spec I requested).
Does something like this exist already? If not, would you be open to adding something like that? Is there anything I'm missing that would inhibit something like this?
The text was updated successfully, but these errors were encountered: