-
-
Notifications
You must be signed in to change notification settings - Fork 719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Multiple ThreadPoolExecutors #4655
Comments
Would this alleviate the need for secede/rejoin as well given another thread could always pick up new work? Or would that not work for some reason? |
No. This is unrelated to secede rejoin, which is useful for tasks submitting other tasks. |
I'm personally interested in this. It would be nice if you could use async/coroutines for IO alongside CPU-bound tasks for processing, and not have to manually juggle the event loop. For example, I'd like to use What would the semantics be for controlling which tasks run in which executors? For async in particular, would we want to have logic that automatically |
For things like GPUs there might be a few different mechanisms. There is a related GPU issue on this at #4656 . For async tasks I think that we can probably identify these pretty easily by seeing if the function is an async function or not. For other executors probably we rely on annotations? |
FWIW there is a coroutine executor. Not sure if that is helpful for the IO use case |
One thing to be aware of is we have been exploring per-thread default stream (PTDS) ( rapidsai/dask-cuda#96 ) ( rapidsai/dask-cuda#517 ) This maps closer to what we have today as using multiple threads translates into using multiple streams on the GPU |
That's actually a really interesting point. cc'ing @martindurant who has thought about this in the past. I suspect that if we had layers that were strictly IO, and then marked those layers with the appropriate annotation, then everything here might just work? (I think that high level fusion doesn't cross differences in annotation) |
Was also thinking about that in the context of spilling |
This is almost never the case. Not only does loading bytes often need some CPU (e.g., gzip decompression of HTTP calls; which may be offloaded to a thread maybe), and for dask only usually form part of a given task. For the simplest example, a zarr load may fetch several chunks concurrently on the event loop, and then decode them synchronously on the worker thread (this can be quite a speed-up). Other loaders like CSV and parquet do not even use fsspec's async layer directly or fetch bytes concurrently cf dask/dask#7557 : loads multiple pieces of parquet in each task, but the backend is calling open/read synchronously. The improvement is in skipping pd.concat cf dask/fastparquet#619 which can explicitly fetch multiple file metadata chunks concurrently by calling fsspec (for a backend that supports it, currently HTTP/S3). |
This might be a harebrained idea, but haven't quite shaken it. We might want to explore this new functionality Mads has added with a custom |
Yeah, to be clear, I'm saying that if we were to change how dask collections handle IO, by moving It wouldn't work for Zarr, you're right, because that abstraction hides I/O from us, but it could work for Parquet, CVS, and others if we wanted to make that explicit split. I'm not suggesting that we do this today, or any time in the moderate future. |
It might work if we supplied that in a |
@jakirkham : that's already the case, and you could set the fsspec backend's loop to be the one it needs to be; but zarr will still do its part decoding synchronously. You'd have to pass the filters down to the storage layer and replicate the work there - but then it would no longer be pure IO. I think a rewrite in which we can fetch multiple blocks of bytes in a single task and pass to a separate dataframe-making task (without concat!) would work well for CSV. Parquet and just about anything else where we don't pass bytes around is more complicated. Fastparquet, for example, isn't interested in running in multiple threads like arrow can because "dask can solve that case" (not that it does a good job of releasing the GIL). Note that the PR I linked above for fastparquet improved dataset open time by 10x for on s3 and without _metadata (one of the test datasets with many files). |
Recent example of increasing the chunk size (of the dask task - same on disk) in zarr: https://nbviewer.jupyter.org/gist/rsignell-usgs/9ccb9c18d4c1bf2205561387837d6868 Time went 30s->20s; can't readily tell on the surface how much was IO/latency. |
(I think that I've raised this before, but I couldn't find it. I suspect that it was part of commentary on an issue rather than a standalone issue itself)
Today we run all tasks in a ThreadPoolExecutor living at
Worker.executor
. We default the size of this executor to the number of logical CPU cores on a machine. This works great most of the time, but there are some cases where we would like something different.In practice, the GPU pool is probably the most common case today.
So perhaps we should encode multiple executors into the Worker, and have tasks split between them based on annotations/resources/gpu flags.
cc @dask/gpu
The text was updated successfully, but these errors were encountered: