Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiple ThreadPoolExecutors #4655

Closed
mrocklin opened this issue Mar 31, 2021 · 14 comments · Fixed by #4869
Closed

Multiple ThreadPoolExecutors #4655

mrocklin opened this issue Mar 31, 2021 · 14 comments · Fixed by #4869

Comments

@mrocklin
Copy link
Member

mrocklin commented Mar 31, 2021

(I think that I've raised this before, but I couldn't find it. I suspect that it was part of commentary on an issue rather than a standalone issue itself)

Today we run all tasks in a ThreadPoolExecutor living at Worker.executor. We default the size of this executor to the number of logical CPU cores on a machine. This works great most of the time, but there are some cases where we would like something different.

  1. I/O related tasks we could consider running on the event loop itself, or with a separate Tornado based AsyncExecutor
  2. For GPU related tasks we would prefer to have a separate executor with a single thread (or in the near future a few threads)
  3. For noxious tasks that leak memory folks have asked for a separate ProcessPoolExecutor
  4. Some folks have asked for a special executor for restricted resource tasks
  5. Actors run today on their own executor

In practice, the GPU pool is probably the most common case today.

So perhaps we should encode multiple executors into the Worker, and have tasks split between them based on annotations/resources/gpu flags.

executor = self.executors[task.executor or "cpu"]
self.submit_on_executor(executor, task, *args, **kwargs)

cc @dask/gpu

@jakirkham
Copy link
Member

Would this alleviate the need for secede/rejoin as well given another thread could always pick up new work? Or would that not work for some reason?

@mrocklin
Copy link
Member Author

No. This is unrelated to secede rejoin, which is useful for tasks submitting other tasks.

@gjoseph92
Copy link
Collaborator

I/O related tasks we could consider running on the event loop itself, or with a separate Tornado based AsyncExecutor

I'm personally interested in this. It would be nice if you could use async/coroutines for IO alongside CPU-bound tasks for processing, and not have to manually juggle the event loop. For example, I'd like to use aiocogeo to read GeoTIFFs into ndarrays, then process the arrays in a normal threadpool.

What would the semantics be for controlling which tasks run in which executors? For async in particular, would we want to have logic that automatically awaits tasks before passing them into non-coroutines, or would we require users/collections to control that manually?

@mrocklin
Copy link
Member Author

mrocklin commented Apr 2, 2021

For things like GPUs there might be a few different mechanisms. There is a related GPU issue on this at #4656 . For async tasks I think that we can probably identify these pretty easily by seeing if the function is an async function or not. For other executors probably we rely on annotations?

@jakirkham
Copy link
Member

FWIW there is a coroutine executor. Not sure if that is helpful for the IO use case

@jakirkham
Copy link
Member

One thing to be aware of is we have been exploring per-thread default stream (PTDS) ( rapidsai/dask-cuda#96 ) ( rapidsai/dask-cuda#517 ) This maps closer to what we have today as using multiple threads translates into using multiple streams on the GPU

@mrocklin
Copy link
Member Author

mrocklin commented Jun 4, 2021

FWIW there is a coroutine executor. Not sure if that is helpful for the IO use case

That's actually a really interesting point. cc'ing @martindurant who has thought about this in the past.

I suspect that if we had layers that were strictly IO, and then marked those layers with the appropriate annotation, then everything here might just work? (I think that high level fusion doesn't cross differences in annotation)

@jakirkham
Copy link
Member

Was also thinking about that in the context of spilling

@martindurant
Copy link
Member

layers that were strictly IO

This is almost never the case. Not only does loading bytes often need some CPU (e.g., gzip decompression of HTTP calls; which may be offloaded to a thread maybe), and for dask only usually form part of a given task. For the simplest example, a zarr load may fetch several chunks concurrently on the event loop, and then decode them synchronously on the worker thread (this can be quite a speed-up). Other loaders like CSV and parquet do not even use fsspec's async layer directly or fetch bytes concurrently

cf dask/dask#7557 : loads multiple pieces of parquet in each task, but the backend is calling open/read synchronously. The improvement is in skipping pd.concat

cf dask/fastparquet#619 which can explicitly fetch multiple file metadata chunks concurrently by calling fsspec (for a backend that supports it, currently HTTP/S3).

@jakirkham
Copy link
Member

This might be a harebrained idea, but haven't quite shaken it. We might want to explore this new functionality Mads has added with a custom Executor using CUDA streams. Wrote it up as issue ( rapidsai/dask-cuda#641 ) if others have thoughts.

@mrocklin
Copy link
Member Author

mrocklin commented Jun 4, 2021

I suspect that if we had layers that were strictly IO

This is almost never the case

Yeah, to be clear, I'm saying that if we were to change how dask collections handle IO, by moving read_bytes calls into fully separable tasks, then we could take advantage of this. You had mentioned this in the past I think.

It wouldn't work for Zarr, you're right, because that abstraction hides I/O from us, but it could work for Parquet, CVS, and others if we wanted to make that explicit split. I'm not suggesting that we do this today, or any time in the moderate future.

@jakirkham
Copy link
Member

It wouldn't work for Zarr, you're right, because that abstraction hides I/O from us, but it could work for Parquet, CVS, and others if we wanted to make that explicit split. I'm not suggesting that we do this today, or any time in the moderate future.

It might work if we supplied that in a MutableMapping or fsspec based object that Zarr could consume

@martindurant
Copy link
Member

@jakirkham : that's already the case, and you could set the fsspec backend's loop to be the one it needs to be; but zarr will still do its part decoding synchronously. You'd have to pass the filters down to the storage layer and replicate the work there - but then it would no longer be pure IO.

I think a rewrite in which we can fetch multiple blocks of bytes in a single task and pass to a separate dataframe-making task (without concat!) would work well for CSV. Parquet and just about anything else where we don't pass bytes around is more complicated. Fastparquet, for example, isn't interested in running in multiple threads like arrow can because "dask can solve that case" (not that it does a good job of releasing the GIL).

Note that the PR I linked above for fastparquet improved dataset open time by 10x for on s3 and without _metadata (one of the test datasets with many files).

@martindurant
Copy link
Member

Recent example of increasing the chunk size (of the dask task - same on disk) in zarr: https://nbviewer.jupyter.org/gist/rsignell-usgs/9ccb9c18d4c1bf2205561387837d6868

Time went 30s->20s; can't readily tell on the surface how much was IO/latency.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants