Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: adapt to new Task spec in dask, now used in blockwise #556

Open
wants to merge 29 commits into
base: main
Choose a base branch
from

Conversation

lgray
Copy link
Collaborator

@lgray lgray commented Dec 4, 2024

Adapts one bit of dask_awkward code that makes a graph to use a task object instead.

c.f.:
dask/dask#11568

@martindurant
Copy link
Collaborator

I asked on the dask PR whether there are specific migration instructions associated with their change.

@lgray
Copy link
Collaborator Author

lgray commented Dec 4, 2024

@martindurant rewrite_layer_chains needs to be adjusted, along with various checking that's happening during the tests. The latter is all localized in one function for the most part so hopefully a reasonable fix.

There must be some way to just pop out a good old dictionary from the Task class.

@lgray
Copy link
Collaborator Author

lgray commented Dec 4, 2024

We will likely need to put a dask >= 2024.12.0 requirement on the next release, and a dask < 2024.12.0 for python < 3.10.

@pfackeldey
Copy link
Collaborator

pfackeldey commented Dec 4, 2024

If I'm understanding this correctly a Task allows to .substitute dependencies? This could be useful for graph cloning and then replacing the IO layer - or am I misunderstanding this?

@martindurant
Copy link
Collaborator

Task allows to .substitute itself

Some docs on that method would be very useful.

@martindurant
Copy link
Collaborator

appease mypy's insatiable lust for perfect correctness

:)

@martindurant
Copy link
Collaborator

rewrite_layer_chains needs to be adjusted, along with various checking that's happening during the tests

Is this something I'll need to get on?

@lgray
Copy link
Collaborator Author

lgray commented Dec 4, 2024

@martindurant seems like it - but it also looks like rewrite_layer_chains get significantly more easy to read using the new fuse/substitute interfaces in Task.

@lgray
Copy link
Collaborator Author

lgray commented Dec 6, 2024

delayed < - > dask-awkward interactions really only happen in HEP when someone is passing a big correction into the task graph where we use delayed to wrap the function.

So I'd argue it should be materialized because in such a case you'd need to pass the typetracer to the function in any case.

I think this is just an essential limitation of "going across the fence" between dak and specifically delayed since you don't know what you're getting when you materialize it. Otherwise it significantly complicates the delayed interface of other packages.

Though maybe it is a wrong choice to do so?

@lgray
Copy link
Collaborator Author

lgray commented Dec 6, 2024

Let's try to first get the status quo re-established since that was at least reasonable behavior. If we can do that we'll have more understanding of what's going wrong, as well.

I am mostly arguing this from the fact that it is still broken when running the unoptimized graph, where it should materialize for sure.

@douglasdavis
Copy link
Collaborator

douglasdavis commented Dec 6, 2024 via email

@douglasdavis
Copy link
Collaborator

Started to poke around the new Task stuff and found

https://github.com/dask/dask/blob/02185bedf2e6aba70ac2d3987c4f739998da835e/dask/blockwise.py#L388-L404

Looks like

elif isinstance(arg, Delayed):
pairs.extend([arg.key, None])

needs to be wrapped in dask._task_spec.Alias. I blindly tried it:

diff --git a/src/dask_awkward/lib/core.py b/src/dask_awkward/lib/core.py
index d7c1a4e..4c8cd45 100644
--- a/src/dask_awkward/lib/core.py
+++ b/src/dask_awkward/lib/core.py
@@ -1910,6 +1910,7 @@ def partitionwise_layer(
         The Dask HighLevelGraph Blockwise layer.
 
     """
+    from dask._task_spec import Alias
     pairs: list[Any] = []
     numblocks: dict[str, tuple[int, ...]] = {}
     for arg in args:
@@ -1928,7 +1929,7 @@ def partitionwise_layer(
             pairs.extend([arg.name, "i"])
             numblocks[arg.name] = (1,)
         elif isinstance(arg, Delayed):
-            pairs.extend([arg.key, None])
+            pairs.extend([Alias(arg.key), None])
         elif is_dask_collection(arg):
             raise DaskAwkwardNotImplemented(
                 "Use of Array with other Dask collections is currently unsupported."

and got a different error with default compute (e.g. a.compute()), but now the no-optimize case works (a.compute(optimize_graph=False)), so that's some positive progress.

Need to look more into how the Alias impacts the typetracer graph pass

In [1]: import dask_awkward as dak
   ...: from dask.delayed import delayed
   ...: import awkward as ak
   ...:
   ...:

In [2]: x = ak.Array([[1,2,3], [4]])

In [3]: a = dak.from_awkward(x, npartitions=2)

In [4]: @delayed
   ...: def one():
   ...:     return 1
   ...:

In [5]: import operator

In [6]: b = a.map_partitions(operator.mul, one(), meta=a._meta)

In [7]: b.compute(optimize_graph=False)
Out[7]: <Array [[1, 2, 3], [4]] type='2 * var * int64'>

In [8]: b.compute()
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
Cell In[8], line 1
----> 1 b.compute()

File ~/software/repos/dask-awkward/.venv/lib/python3.13/site-packages/dask/base.py:372, in DaskMethodsMixin.compute(self, **kwargs)
    348 def compute(self, **kwargs):
    349     """Compute this dask collection
    350
    351     This turns a lazy Dask collection into its in-memory equivalent.
   (...)
    370     dask.compute
    371     """
--> 372     (result,) = compute(self, traverse=False, **kwargs)
    373     return result

File ~/software/repos/dask-awkward/.venv/lib/python3.13/site-packages/dask/base.py:660, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    657     postcomputes.append(x.__dask_postcompute__())
    659 with shorten_traceback():
--> 660     results = schedule(dsk, keys, **kwargs)
    662 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File ~/software/repos/dask-awkward/.venv/lib/python3.13/site-packages/dask/local.py:455, in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
    452     rerun_exceptions_locally = config.get("rerun_exceptions_locally", False)
    454 if state["waiting"] and not state["ready"]:
--> 455     raise ValueError("Found no accessible jobs in dask")
    457 def fire_tasks(chunksize):
    458     """Fire off a task to the thread pool"""

ValueError: Found no accessible jobs in dask

@lgray
Copy link
Collaborator Author

lgray commented Dec 7, 2024

@pfackeldey @douglasdavis @fjetter
After some further poking around it is specifically running the optimize_blockwise pass that causes the graph to throw ValueError: Found no accessible jobs in dask.

If I comment out https://github.com/dask-contrib/dask-awkward/blob/main/src/dask_awkward/lib/optimize.py#L52 we get some interesting outcomes.

HLG with optimize.py#52 commented out:

un-optimized: HighLevelGraph with 3 layers.
<dask.highlevelgraph.HighLevelGraph object at 0x115839310>
 0. a_delayed_array-07e0822d-9a8c-4164-a4b5-26570955e35e
 1. from-awkward-85ae75dd547aa970cf45ae978772e6e4
 2. <dask-awkward.lib.core.ArgsKwargsPackedFunction ob-92230ff6d385deae6aca0cffcabdad9f
 
optimized:    HighLevelGraph with 1 layers.
<dask.highlevelgraph.HighLevelGraph object at 0x117b7d430>
 0. <dask-awkward.lib.core.ArgsKwargsPackedFunction ob-92230ff6d385deae6aca0cffcabdad9f

with L52 restored:

un-optimized: HighLevelGraph with 3 layers.
<dask.highlevelgraph.HighLevelGraph object at 0x1089cc980>
 0. a_delayed_array-0f23670a-dae0-49de-b285-3732450abc70
 1. from-awkward-db6b93afdbe5c3bf69e753378b7fd590
 2. <dask-awkward.lib.core.ArgsKwargsPackedFunction ob-25fddb97bdfc2ebcd68404195ebb0836
 
optimized:    HighLevelGraph with 1 layers.
<dask.highlevelgraph.HighLevelGraph object at 0x108fe31d0>
 0. <dask-awkward.lib.core.ArgsKwargsPackedFunction ob-25fddb97bdfc2ebcd68404195ebb0836

The test passes with optimize_blockwise commented out and at the highest level appears to be producing the same HLG structure, I will dig into it a bit more to try to understand what's packed up differently.

@lgray
Copy link
Collaborator Author

lgray commented Dec 7, 2024

With optimize_blockwise commented out the resulting materialized layer contains this:

[(('<dask-awkward.lib.core.ArgsKwargsPackedFunction ob-6c779eab894c25ed7b3f751615b94fe3',
   0),
  <Task ('<dask-awkward.lib.core.ArgsKwargsPackedFunction ob-6c779eab894c25ed7b3f751615b94fe3', 0) <dask_awkward.lib.core.ArgsKwargsPackedFunction ob(...)>),
 (('<dask-awkward.lib.core.ArgsKwargsPackedFunction ob-6c779eab894c25ed7b3f751615b94fe3',
   1),
  <Task ('<dask-awkward.lib.core.ArgsKwargsPackedFunction ob-6c779eab894c25ed7b3f751615b94fe3', 1) <dask_awkward.lib.core.ArgsKwargsPackedFunction ob(...)>),
 (('from-awkward-d54829cedb4c32f715e92986dfca221f', 0),
  <Task ('from-awkward-d54829cedb4c32f715e92986dfca221f', 0) <dask_awkward.lib.io.io.FromAwkwardFn object at 0x(...)>),
 ('a_delayed_array-1abece2e-5d98-44eb-abcb-8ea21563a7ac',
  (<function __main__.a_delayed_array()>,)),
 (('from-awkward-d54829cedb4c32f715e92986dfca221f', 1),
  <Task ('from-awkward-d54829cedb4c32f715e92986dfca221f', 1) <dask_awkward.lib.io.io.FromAwkwardFn object at 0x(...)>)]

where with optimize_blockwise the resulting layer contains:

[(('<dask-awkward.lib.core.ArgsKwargsPackedFunction ob-1beb66991b79fa2f49330ef901027c85',
   0),
  <Task ('<dask-awkward.lib.core.ArgsKwargsPackedFunction ob-1beb66991b79fa2f49330ef901027c85', 0) _execute_subgraph(...)>),
 (('<dask-awkward.lib.core.ArgsKwargsPackedFunction ob-1beb66991b79fa2f49330ef901027c85',
   1),
  <Task ('<dask-awkward.lib.core.ArgsKwargsPackedFunction ob-1beb66991b79fa2f49330ef901027c85', 1) _execute_subgraph(...)>)]

if I then dig into the Task and look at its dependencies I get the set of keys:

frozenset({'a_delayed_array-ffab6114-4add-4b7c-9004-fd3770ba9771'})

and a_delayed_array is definitely not in the optimized graph!

So, how do we keep the delayed node in the graph?

@lgray
Copy link
Collaborator Author

lgray commented Dec 7, 2024

With dask 2024.11.x the results are as follows:

no optimize_blockwise:

HighLevelGraph with 1 layers.
<dask.highlevelgraph.HighLevelGraph object at 0x10af66ba0>
 0. repacked-instrumented-mul at 0x1093cc540-535da6829bbcce92a42b85b9be259a05

and that one layer contains:

[(('repacked-instrumented-mul at 0x1093cc540-535da6829bbcce92a42b85b9be259a05',
   1),
  ('from-awkward-repacked-instrumented-mul at 0x1093cc540-535da6829bbcce92a42b85b9be259a05',
   1)),
 (('repacked-instrumented-mul at 0x1093cc540-535da6829bbcce92a42b85b9be259a05',
   0),
  ('from-awkward-repacked-instrumented-mul at 0x1093cc540-535da6829bbcce92a42b85b9be259a05',
   0)),
 (('from-awkward-repacked-instrumented-mul at 0x1093cc540-535da6829bbcce92a42b85b9be259a05',
   1),
  (subgraph_callable-fc246caf28f6aec6243873f1810cbd77,
   (subgraph_callable-5f18606dd7d6041a38bfe013d4e97716, (2, 4)),
   'a_delayed_array-cc51865c-c9b1-4111-8e9f-98f655e1ced9')),
 (('from-awkward-repacked-instrumented-mul at 0x1093cc540-535da6829bbcce92a42b85b9be259a05',
   0),
  (subgraph_callable-fc246caf28f6aec6243873f1810cbd77,
   (subgraph_callable-5f18606dd7d6041a38bfe013d4e97716, (0, 2)),
   'a_delayed_array-cc51865c-c9b1-4111-8e9f-98f655e1ced9')),
 ('a_delayed_array-cc51865c-c9b1-4111-8e9f-98f655e1ced9',
  (<function __main__.a_delayed_array()>,))]

It executes as expected.

with optimize_blockwise turned on the resulting graph is:

HighLevelGraph with 2 layers.
<dask.highlevelgraph.HighLevelGraph object at 0x1053fc4d0>
 0. a_delayed_array-aa8cfacb-8634-4dc5-a65d-4a19fd4bd6e2
 1. repacked-instrumented-mul at 0x10412e520-7ed96731e732a0de63e1eff62ca2e023

and the last layer contains:

[(subgraph_callable-19a20e3065e8a2b3047767df7a8e80f3,
  'a_delayed_array-aa8cfacb-8634-4dc5-a65d-4a19fd4bd6e2',
  (0, 2)),
 (subgraph_callable-19a20e3065e8a2b3047767df7a8e80f3,
  'a_delayed_array-aa8cfacb-8634-4dc5-a65d-4a19fd4bd6e2',
  (2, 4))]

It also executes as expected, since here the delayed object remains in the graph!

So the old optimize_blockwise is de-inlining the delayed array into its own node and the new one is completely removing it? Though I think the errant behavior may be in rewrite_layer_chains which shouldn't be touching the Delayed layer in the first place?

@martindurant thoughts?

or

@fjetter is this an expected new behavior or bug in optimize_blockwise?

@lgray
Copy link
Collaborator Author

lgray commented Dec 7, 2024

Ah, my bad - it's after the cull step:

after fuse_roots HighLevelGraph with 2 layers.
<dask.highlevelgraph.HighLevelGraph object at 0x10ee260f0>
 0. a_delayed_array-099cec65-ea25-4b10-8183-48aa676a54b3
 1. repacked-instrumented-mul at 0x10cb6d3a0-070fec221c3aceaa77bd0b44557dd84c

after cull HighLevelGraph with 1 layers.
<dask.highlevelgraph.HighLevelGraph object at 0x10cb098e0>
 0. repacked-instrumented-mul at 0x10cb6d3a0-070fec221c3aceaa77bd0b44557dd84c

So somehow we're not retaining the information in the graph that marks the delayed layer as necessary.

@lgray
Copy link
Collaborator Author

lgray commented Dec 7, 2024

Indeed, if I skip the cull step everything computes as expected.

@martindurant
Copy link
Collaborator

if I skip the cull step everything computes as expected

So the graph is right, the dependency must be there, else it wouldn't compute - but cull is making some other assumption that we don't meet.

@lgray
Copy link
Collaborator Author

lgray commented Dec 7, 2024

Indeed, something has definitely changed:
image

Even though the blockwise_optimized layers claim the Delayed as a dependency (!!) cull drops the Delayed.

@lgray
Copy link
Collaborator Author

lgray commented Dec 7, 2024

I have found the source of the difference.

After dask/dask#11568 the delayed array no longer shows up as a constant dependency in the task graph coming from this loop:
https://github.com/dask/dask/blob/main/dask/blockwise.py#L674-L684

I checked it's not another kind of dep, even in old dask the delayed-wrapped array is only a constant.

If I add code to correctly deal with Aliases to https://github.com/dask/dask/blob/main/dask/blockwise.py#L674-L684, i.e.:

...
            if isinstance(arg, Alias):
                arg = arg.target.key
...

Then everything works as expected again.

@lgray
Copy link
Collaborator Author

lgray commented Dec 7, 2024

@fjetter is this skipping of Aliases as constant dependencies of blockwise graphs desired or expected?

@lgray
Copy link
Collaborator Author

lgray commented Dec 7, 2024

I also find that if I use TaskRef instead of Alias for our Delayed object everything optimizes and executes correctly without issue.

Understanding of what is correct would be appreciated. For the time being I will change the dask_awkward code to use a TaskRef as opposed to an Alias to see what else breaks in our tests.

@lgray
Copy link
Collaborator Author

lgray commented Dec 7, 2024

OK - only failing tests are over in uproot where we'll have to patch things up to deal with Tasks there as well!

@jpivarski

@lgray
Copy link
Collaborator Author

lgray commented Dec 7, 2024

However, we should settle these correct usage issues and get bugfixes in the right places if they are necessary.

We then wait on further input and guidance from @fjetter as to correct/expected usage.

@lgray
Copy link
Collaborator Author

lgray commented Dec 7, 2024

It's also a bit weird that the GraphNode base class's .ref() function returns an Alias. Hmm.

Judging from that I guess we'd want _cull_dependencies to deal with Aliases in some way, but perhaps the most correct fix is elsewhere.

@lgray
Copy link
Collaborator Author

lgray commented Dec 9, 2024

@fjetter when you have time, we would appreciate your commentary so that we can resolve this.

@lgray
Copy link
Collaborator Author

lgray commented Dec 11, 2024

@fjetter just a ping

@martindurant
Copy link
Collaborator

Thanks for all your effort here, @lgray . I hope @fjetter can OK the code now.

@@ -37,7 +37,8 @@ classifiers = [
]
dependencies = [
"awkward >=2.5.1",
"dask >=2023.04.0",
"dask >=2024.12.0;python_version>'3.9'",
"dask >=2023.04.0;python_version<'3.10'",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In live discussions, we were tending towards dropping backward compatibility here, which means dropping py3.9 support (which dask and numpy already have). Users of py3.9 will not have hit the original problem, since the new dask was not released for them.

This would also save about half the lOC in this PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants