Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SLURMRunner from jacobtomlinson/dask-hpc-runners #659

Merged
merged 17 commits into from
Aug 21, 2024

Conversation

jacobtomlinson
Copy link
Member

@jacobtomlinson jacobtomlinson commented Aug 7, 2024

Towards #638

As we decided that the code from jacobtomlinson/dask-hpc-runners would be best to live in this repo this PR adds the SLURMRunner along with the base class and other core code.

Design

The core assumption in the design of the runner model is that the same script will be executed many times by a job scheduler.

(mpirun|srun|qsub|etc) -n4 myscript.py
├── [0] myscript.py
├── [1] myscript.py
├── [2] myscript.py
└── [3] myscript.py

Within the script the runner class is created.

from dask_jobqueue import SomeRunner
from dask.distributed import Client

with SomeRunner(**kwargs) as runner:
    with Client(runner) as client:
        client.wait_for_workers(2)
        # Do some Dask work

This will result in multiple processes runnning on an HPC that are all instantiating the runner class.

The processes need to coordinate to decide which process should run the Dask Scheduler, which should be Dask Workers and which should continue running the rest of the client code within the script. This coordination happens during the __init__() of the runner class.

The Scheduler and Worker processes exit after they complete to avoid running the client code multiple times. This means that only one of the processes will continue past the __init__() of the runner class, the rest will exit at that point after the work is done.

Base class

In the new file runner.py contains the BaseRunner class that can be used for implementing other runners. It also includes an AsyncRunner class that is used for testing.

The minimum required to implement a new runner is the following methods.

from dask_jobqueue.runner import BaseRunner

class MyRunner(BaseRunner):

    async def get_role(self) -> str:
        """Figure out whether I am a scheduler, worker or client.

        A common way to do this is by using a process ID. Many job queues give each process
        a monotonic index that starts from zero. So we can assume proc 0 is the scheduler, proc 1
        is the client and any other procs are workers.
        """
        ...

    async def get_scheduler_address(self) -> str:
        """If I am not the scheduler discover the scheduler address.

        A common way to do this is to read a scheduler file from a shared filesystem.

        Alternatively if the scheduler process can broadcast it's address via something like MPI
        we can define ``BaseRunner.set_scheduler_address()`` which will be called on the scheduler 
        and then recieve the broadcast in this method.
        """
        ...

The BaseRunner class handles starting up Dask once these methods have been implemented. It also provides many stubbed out hooks to allow you to write code that runs before/after each component is created. E.g BaseRunner.before_scheduler_start(), BaseRunner.before_worker_start() and BaseRunner.before_client_start().

The runner must know the address of the scheduler so that it can coordinate the clean shutdown of all processes when we reach the end of the code (either via __exit__() or a finalizer). This communication happens independently of any clients that may be created.

Slurm implementation

This PR also adds a Slurm implementation to slurm.py.

In the get_role() method I use the SLURM_PROCID environment variable to infer the role.

I also add a default scheduler option to set the scheduler_file="scheduler-{job_id}.json" and I look up the Job ID from the SLURM_JOB_ID environment variable to ensource uniqueness. This effectively allows us to broadcast the scheduler address via the shared filesystem.

Then in the get_scheduler_address() method I wait for the scheduler file to exist and then open and read the address from the scheduler file in the same way the dask.distributed.Client does.

Example

# myscript.py
from dask.distributed import Client
from dask_jobqueue.slurm import SLURMRunner

with SLURMRunner(scheduler_file="/path/to/shared/filesystem/dask-slurm-{job_id}.json") as runner:
    with Client(runner) as client:
        client.wait_for_workers(2)
        # Do some Dask work

Then I can submit this script via srun or an sbatch script.

srun -n4 myscript.py

TODO

  • Add base runner class
  • Add base tests
  • Add Slurm runner
  • Add Slurm tests
  • Update documentation

@jacobtomlinson jacobtomlinson marked this pull request as draft August 7, 2024 14:14
@jacobtomlinson jacobtomlinson marked this pull request as ready for review August 8, 2024 16:32
@jacobtomlinson
Copy link
Member Author

I'm going to mark this as ready to review as all the tehnical work is done and the tests are passing (ignore the failed SGE test #653).

It still needs docs but I'd love to get a review, even just a high level design review, before I dive into writing up the documentation. @guillaumeeb @lesteve @kmpaul if any of you have time to take a look I'd really appreciate it!

@jacobtomlinson
Copy link
Member Author

I've gone over the documentation and given is an overhaul to make space for the new features.

  • I've consolidated the existing documentation under the Dynamic Clusters section.
  • I've added a new Batch Runners section to document the new classes.
  • I've updated the index to describe the difference between the two paradigms.
  • I've also fixed up a few Sphinx warnings, but the existing documentation is largely untouched.
  • Also added redirects for some of the pages I renamed under the new structure.

@jacobtomlinson
Copy link
Member Author

Other maintainers of this repo expressed to me via email they they have limited capacity for reviews at the moment. To avoid being blocked on review any longer I'm going to self-merge this. But if anyone wants to revisit this work with a review down the line then I would encourage you to open an issue where we can discuss things.

@jacobtomlinson jacobtomlinson merged commit 3a00196 into dask:main Aug 21, 2024
10 of 11 checks passed
@jacobtomlinson jacobtomlinson deleted the slurm-runner branch August 21, 2024 14:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant