Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

a direct way to specify the worker spec #601

Open
keewis opened this issue Jan 6, 2023 · 4 comments
Open

a direct way to specify the worker spec #601

keewis opened this issue Jan 6, 2023 · 4 comments

Comments

@keewis
Copy link

keewis commented Jan 6, 2023

I'm frequently confused by the way the API requires me to specify resources for the jobqueue: I need to specify the job size and the number of workers per job (there's quite a few more knobs, of course), and it would evenly distribute the resources to each worker. I can then choose how many jobs to submit.

However, as a user (with admittedly a limited knowledge of how HPC work, so what I'm describing may be naive), my view is usually something like:

  • my local jobqueue allows individual jobs to request up to 115GB of memory, 28 cores and a certain walltime, and for some queues there's also minimum resource limits
  • I want to have about 14 workers, with about 15 GB and 2 threads each, where the concrete worker specs are often somewhat arbitrary and depend on my knowledge of the problem I'm trying to compute

This usually leads to me trying to group the workers manually to optimally fit the resource limits (so I don't get de-prioritized by submitting too many jobs).

Instead, I ideally would like an API allows me to specify (or retrieve) the resource limits per job of the jobqueue and the desired worker spec. It would then try to optimally distribute the workers and submit the jobs for me (and fail early if the resource limits don't allow the worker spec I requested).

Does something like this exist already? If not, would you be open to adding something like that? Is there anything I'm missing that would inhibit something like this?

@jacobtomlinson
Copy link
Member

As fair as I'm aware we aren't really doing this anywhere in the Dask deployment ecosystem. Most cluster managers have settings for node sizes and a way to configure the worker and leave it as an exercise to the user to fit one into the other.

I expect the majority of cases folks configure the node size and leave the worker on auto. Effectively not tuning the cluster to their problem.

The exception may be dask-kubernetes where the user configures the pod size and Kubernetes handles packing those pods into nodes. Often there is just a 1:1 mapping between the pod and worker settings in that case.

This topic is definitely interesting, but I feel like it would add a bunch of complexity that may be hard to wrangle.

@guillaumeeb
Copy link
Member

Hi @keewis, I confirm what @jacobtomlinson is saying, not aware of anything like that, and if he says so, this must be true :).

Just to make this more concrete, with your example, currently you do the maths and come up with something like (here for PBS):

cluster = PBSCluster(cores=14, processes=7, memory="105GiB", resources_spec="ncpus=28:memory=115gb")
cluster.scale(14)

I understand that this is somewhat cumbersome or even hard for some users to do right... And if you have various worker specs depending on various workflows, this gets even heavier to handle...

And what you would like is something like:

cluster = PBSCluster(job_cores=28, jobs_memory="115GiB", worker_threads=2, worker_memory="15GiB")
cluster.scale(14).

Are we okay on this?

If yes, I'd say we don't want to change all the existing classes and APIs. But maybe this could be done at a higher level with a new class on top of others? The hard point being that not all JobQueueCluster implementations use the same signature to declare job resources... I've not thought about it more than a few minutes, so maybe we could do something clever, but I don't have a nice idea that comes to my mind.

@jacobtomlinson would dask-ctl or any other tool help for this?

@keewis
Copy link
Author

keewis commented Jan 17, 2023

And what you would like is something like

essentially, though to be even more specific, what I meant was

cluster = PBSCluster(job_max_cores=28, job_max_memory="115GiB", worker_threads=2, worker_memory="15GiB")
cluster.scale(14)

But maybe this could be done at a higher level with a new class on top of others?

So something that translates the high-level spec to the API of the lower-level classes?

Without unifying the JobQueueCluster class signatures I guess that would involve maintaining a mapping of translation functions for each cluster implementation. Basically, we'd have a main translation function that would take the per-worker spec and convert it to a general job spec, and then the specific functions would adjust that to the individual implementations.

draft implementation of the proposed API
import dask_jobqueue
from dask.utils import parse_bytes, format_bytes


def convert_to_low_level(*, max_cores, max_memory, worker_threads, worker_memory):
    workers = max_memory // worker_memory

    if workers == 0:
        raise ValueError(
            f"can't use more than {format_bytes(max_memory)} per worker (got {format_bytes(worker_memory)})"
        )

    max_threads_per_worker = max_cores // workers
    if worker_threads is not None:
        if worker_threads > max_threads_per_worker:
            raise ValueError(
                f"can't use more than {max_threads_per_worker} threads per worker (total available: {max_cores}, with {workers} workers)"
            )
    else:
        worker_threads = max_threads_per_worker

    memory = worker_memory * workers
    cores = worker_threads * workers

    return {"memory": format_bytes(memory), "cores": cores, "processes": workers}


cluster_implementations = {
    "pbs": dask_jobqueue.PBSCluster,
    "slurm": dask_jobqueue.SLURMCluster,
}
parameter_translations = {
    "pbs": lambda x: x,
    "slurm": lambda x: x,
}


class WorkerSpecCluster:
    def __init__(
        self,
        *,
        kind,
        max_cores,
        max_memory,
        worker_memory,
        worker_threads=None,
        **additional_kwargs,
    ):
        main_kwargs = convert_to_low_level(
            max_cores=max_cores,
            max_memory=parse_bytes(max_memory),
            worker_memory=parse_bytes(worker_memory),
            worker_threads=worker_threads,
        )
        translator = parameter_translations.get(kind, lambda x: x)
        kwargs = additional_kwargs | translator(main_kwargs)

        cluster_implementation = cluster_implementations.get(kind)
        if cluster_implementation is None:
            raise ValueError(
                f"unknown implementation {kind!r}, choose one of {{{', '.join(sorted(cluster_implementations))}}}"
            )

        self._cluster = cluster_implementation(**kwargs)
        
    def _html_repr_(self):
        return self._cluster._html_repr_()
    
    def __getattr__(self, name):
        return getattr(self._cluster, name)

but when trying it on my local jobqueue, I immediately ran into the issue that min_cores == max_cores, so that is probably also going to have to be addressed somehow.

@guillaumeeb
Copy link
Member

So something that translates the high-level spec to the API of the lower-level classes?

Yep, this is what I had in mind.

Without unifying the JobQueueCluster class signatures

This is also something that could/should be considered, I think we might achieve some level of unification if we'd want to (at least how to specify job_cores, job_memory, worker_processes/memory/threads), although there have been several discussion on weather to use total cores vs threads by worker at the beginning of Dask jobqueue.

Basically, we'd have a main translation function that would take the per-worker spec and convert it to a general job spec, and then the specific functions would adjust that to the individual implementations

Yep, I agree with this part.

draft implementation of the proposed API

That looks like a good starting point!

but when trying it on my local jobqueue, I immediately ran into the issue that min_cores == max_cores, so that is probably also going to have to be addressed somehow.

You mean your job scheduler wants you to always book a complete compute node? So dask-jobqueue must ask for the max core?
I think that is what I had in mind with

cluster = PBSCluster(job_cores=28, jobs_memory="115GiB", worker_threads=2, worker_memory="15GiB")
cluster.scale(14).

Just book this number of cores and this memory with the job scheduling system, and fit as much workers with my spec in it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants