Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restart cluster job on task completion #597

Open
EvanKomp opened this issue Nov 10, 2022 · 3 comments
Open

Restart cluster job on task completion #597

EvanKomp opened this issue Nov 10, 2022 · 3 comments

Comments

@EvanKomp
Copy link

EvanKomp commented Nov 10, 2022

Use case conditions:

  • tasks have large and variable run times
  • task executes 3rd party software, such that dask cannot migrate the execution state
  • workers have a wall time eg HPC cluster

Current behavior:

Compute is wasted on tasks that cannot finish in remaining walltime, task is restarted from scratch on new worker after worker death.

Originally working with this issue on the forums here

More context:

Task executes a 3rd party software (mine requiring multiple threads, see the issues linked in the above post for examples). Code looks something like the following:

import dask
from dask_jobqueue import SLURMCluster
from distributed import Client
import distributed
import logging
import time
            
def do_one(x):
    worker = distributed.get_worker()
    logger = logging.getLogger('worker')
    logger.setLevel(logging.INFO)
    fh = logging.FileHandler(f'worker_{worker.id}.log', mode='w')
    fh.setLevel(logging.INFO)
    logger.addHandler(fh)

    logger.info(f"I am working on {x}")
    # run third party software
    # this takes a while but is not very consistent in total time
    logger.info(f"I finished {x}")
    return f"Input {x} done"

if __name__ =='__main__':
    cluster = SLURMCluster(
        memory="1g",
        walltime='00:30:00',
        job_extra_directives=['--nodes=1', '--ntasks-per-node=1'],
        cores=1,
        processes=1,
        worker_extra_args=["--lifetime", "28m", "--lifetime-stagger", "50s"],
        job_cpu=6
    )
    cluster.adapt(minimum=2, maximum=10)
    client = Client(cluster)
    
    results = []
    for future in distributed.as_completed(client.map(
        do_one, list(range(100,132))
    )):
        result = future.result()
        results.append(result)

Result of worker_XXX.log

2022-11-07-12:00:00 INFO I am working on 1
2022-11-07-12:22:00 INFO I finished 1
2022-11-07-12:22:03 INFO I am working on 10

Worker XXX is killed at 12:29 due to walltime. 7 minutes of compute is wasted because the state cannot be changed. Task 10 starts from scratch on a new worker.

Attempts to fix:

Short of figuring out a way to move the execution state, I figure the best strategy is to have each task get a brand new SLURM job, so that no compute is wasted and any task that can finish in the walltime works.

  1. I tried a worker plugin like so:
class KillerNannyPlugin(distributed.diagnostics.plugin.WorkerPlugin):
    """Better as a nanny plugin but those are not running transitions properly."""
    def __init__(self, max_stagger_seconds: float = 5):
        self.max_stagger_seconds = max_stagger_seconds
    
    def setup(self, worker):
        self.worker = worker
        
    def transition(self, key, start, finish, *args, **kwargs):
        if start == 'memory' and finish == 'released':
            self.worker.io_loop.call_later(3+random.random() * self.max_stagger_seconds, self.worker.close_gracefully, restart=True)
  • This was successful in ensuring each task got its own job, but caused task repeat to be on the order of 100%, defeating the point of saving compute
  1. Have the client retire the worker that just completed a task when the job is done, like so:
for future in distributed.as_completed(client.map(
        do_one, list(range(100,132))
    )):
        who_has = client.who_has(future)
        closing = list(list(who_has.values())[0])
        client.retire_workers(closing)
  • I also added a small time delay to the worker function such that next tasks did not start (and begin wasting energy) while the client retired the worked.
  • This seems to have the desired effect, any consequences of this are not clear to me as I observe that tasks are not repeated nor do tasks start on a job that is about to time out.

I think this should be codified somehow as the "solution" above is quite hacky. My intuition says that it would fit best as a scheduler plugin, as using the worker plugin above clearly had adverse effects on task balancing. Happy to help contribute with some input on where this would fit best if it would be a useful addition.

@guillaumeeb
Copy link
Member

Hi @EvanKomp, thanks for raising this precise issue.

The need you talk about as been discussed (I think) in #416.

There is an issue opened in distributed: dask/distributed#3141.

I think the best would be to implement an option to be able to use --lifetime worker option to be more graceful and wait for current running task before closing. This way, you would be able say to use a job scheduler walltime of 1h, and a lifetime option of 30minutes if your tasks runs up to 20min each.

It would be very welcomed if you could contribute on this, as this is clearly a nice feature for HPC systems.

@EvanKomp
Copy link
Author

@guillaumeeb I think that is a reasonable strategy, and pretty flexible. If you end up with an extreme case like mine where tasks can take order of magnitude differences in time, your proposed solution could be pushed to the extreme and have the "lifetime" (with additional waiting for task completion) to be just a few minutes, and all but guarantee that any task that is not the lower most extreme of run times will cause the worker to end after the task completes.

I can draft up a solution at some point, but I am new to dask so could use help specifically identifying the safest mechanism to actually send the signal to kill a worker gracefully from within the worker. I initially tried the same one that the lifetime argument uses, eg. worker.io_loop.call_later(time, worker.close_gracefully but that ended up causing tasks to be duplicated an unacceptable number of times, I think because it was for some reason preventing worker from faithfully communicating that the task was done, wasting its work.

@guillaumeeb
Copy link
Member

I think you should post these questions directly in the corresponding distributed issue, I've never taken a look at the lifetime option mechanism, so I won't be able to help you a lot.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants