Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEA] Add compute_with_dask utility for down-stream Merlin libriaries #70

Open
rjzamora opened this issue Apr 11, 2022 · 7 comments
Open
Labels
enhancement New feature or request

Comments

@rjzamora
Copy link
Contributor

Proposal

I propose that we add a single compute_dask_object (or compute_with_dask) utility to merlin.core.utils, and use that utility for all Dask computation within Merlin. The purpose of this utility would be to check the global_dask_client() utility, and utilize the appropriate default client/scheduler for Merlin (and not the default for dask/distributed).

More Background

While starting to look into merlin-models#339, I noticed that there is at least one place where Merlin-models uses a bare compute() statement to compute a Dask collection. I suspect that this is also done in several other places across the Merlin ecosystem.

When there is no global Dask client in the current python context, using compute() will typically result in execution with Dask's "multi-threaded" scheduler. This may be fine for CPU-backed data, but will result in many python threads thrashing the same GPU (device 0) when the data is GPU backed.

For compute operations in NVTabular (which only operate on Delayed Dask objects), the merlin.core.utils.global_dask_client utility is used to query to current Dask client. If this function returns None, the convention is to use the "sychronous" scheduler (compute(scheduler="sychronous")), otherwise the distributed client is used. I propose that this same convention be used everywhere in Merlin (besides special cases where scheduler="synchronous" can be hard coded.

Note that these changes are also required for Merlin's Serial and Distributed context managers to work correctly.

Proposed Implementation

from merlin.core.utils import global_dask_client
from dask.base import is_dask_collection
from dask.delayed import Delayed

def  compute_dask_object(dask_obj):
    """Compute a Dask collection using Merlin's dask-client settings"""

    # Check global client
    dask_client = global_dask_client()

    if is_dask_collection(dask_obj) or isinstance(dask_obj, Delayed):
        # Compute simple Dask collections
        # (Use distributed client, or fall back to "synchronous" scheduler)
        scheduler = dask_client.get if dask_client else  "synchronous"
        return dask_obj.compute(scheduler=scheduler)
    elif isinstance(dask_obj, list):
        # Maybe check that all elements of list are collections or Delayed?
        # Compute entire list at once:
        if dask_client:
            return [r.result() for r in dask_client.compute(dask_obj)]
        else:
            return dask.compute(dask_obj, scheduler="synchronous")[0]
    else:
        raise ValueError
@karlhigley
Copy link
Contributor

This looks good! My only comment here is that I wonder if this function might belong in the io package, which is where most of the Dask-related functionality currently lives.

@karlhigley karlhigley added the enhancement New feature or request label Apr 12, 2022
@rjzamora
Copy link
Contributor Author

This looks good! My only comment here is that I wonder if this function might belong in the io package, which is where most of the Dask-related functionality currently lives.

Computing a Dask object/graph is definitely used extensively outside IO, so my gut feeling is "no". Also, all other utilities related to distributed/serial Dask execution are already defined in merlin.core.utils. That said, I'm open to re-organizing these utilities in some other way if you have something cleaner in mind.

@karlhigley
Copy link
Contributor

Probably fine to add it there for now then. I would like to organize the utils into other more clearly named sub-packages, but that's out of scope for this issue.

@karlhigley
Copy link
Contributor

karlhigley commented Apr 19, 2022

Thinking about it, I wonder if having a dask sub-package outside of utils would make sense? Maybe the thing to do is migrate parts of io that are for Dask but not specific to IO into a new package along with this util.

@rjzamora
Copy link
Contributor Author

Thinking about it, I wonder if having a dask sub-package outside of utils would make sense? Maybe the thing to do is migrate parts of io that are for Dask but not specific to IO into a new package along with this util.

Sure - Do you have an intuition for what would qualify a piece of code as something that belongs under dask/? Would it be code that explicitly imports a dask-related package? Most of the code in io uses dask directly, or is called by dask-specific code, so finding an intuitive division may be a bit tricky (but is probably doable).

@karlhigley
Copy link
Contributor

I don't have a clear line in mind, but having looked at what's in io, it seemed like DaskSubgraph, DataFrameIter, and shuffle.py are pure Dask code that might be applicable outside of io. (I could be totally wrong about this too though, thinking out loud here.)

@rjzamora
Copy link
Contributor Author

I don't have a clear line in mind, but having looked at what's in io, it seemed like DaskSubgraph, DataFrameIter, and shuffle.py are pure Dask code that might be applicable outside of io. (I could be totally wrong about this too though, thinking out loud here.)

Okay - I definitely agree that it makes sense to add a new merlin module for dask-specific code (as long as we all understand that a lot of dask-related code will still need to live in many other places. Perhaps I'll try to throw up a PR with a prospective re-org today or tomorrow.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants