-
Notifications
You must be signed in to change notification settings - Fork 19
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: adapt to new Task spec in dask, now used in blockwise #556
base: main
Are you sure you want to change the base?
Conversation
I asked on the dask PR whether there are specific migration instructions associated with their change. |
@martindurant There must be some way to just pop out a good old dictionary from the Task class. |
We will likely need to put a |
If I'm understanding this correctly a |
for more information, see https://pre-commit.ci
for more information, see https://pre-commit.ci
Some docs on that method would be very useful. |
for more information, see https://pre-commit.ci
:) |
Is this something I'll need to get on? |
@martindurant seems like it - but it also looks like |
delayed < - > dask-awkward interactions really only happen in HEP when someone is passing a big correction into the task graph where we use delayed to wrap the function. So I'd argue it should be materialized because in such a case you'd need to pass the typetracer to the function in any case. I think this is just an essential limitation of "going across the fence" between dak and specifically delayed since you don't know what you're getting when you materialize it. Otherwise it significantly complicates the delayed interface of other packages. Though maybe it is a wrong choice to do so? |
Let's try to first get the status quo re-established since that was at least reasonable behavior. If we can do that we'll have more understanding of what's going wrong, as well. I am mostly arguing this from the fact that it is still broken when running the unoptimized graph, where it should materialize for sure. |
Lindsey Gray ***@***.***> writes:
delayed < - > dask-awkward interactions really only happen in HEP when
someone is passing a big correction into the task graph where we use
delayed to wrap the function.
So I'd argue it *should* be materialized because in such a case you'd
need to pass the typetracer to the function in any case.
Yeah the more I think about it the more it makes sense to have whatever
is represented by the Delayed go ahead and "exist" when combined with
the typetracer during dak's optimization pass. The (potentially
terrible) side effect here is that the Delayed thing gets computed twice
(once for optimization pass, and again for the real compute).
I think this is just an essential limitation of "going across the
fence" between dak and specifically delayed since you don't know what
you're getting when you materialize it. Otherwise it significantly
complicates the delayed interface of other packages.
Though maybe it is a wrong choice to do so?
Maybe "questionable" instead of "wrong" :P. certainly useful feature,
but a complex to get it right for all cases. When I first starting
working on Delayed <--> dak stuff, I took inspiration from dask.array
and dask.dataframe; for going between the different collections one
needed to go ahead and at least run the optimizations for the "original"
collection before shoving the (now optimized graph) into the "new"
collection (all the to_delayed functions have an optimize_graph argument
that defaults to true). So optimizations have always been something to
explicitly worry about in this area. Conversions are one thing (I'd say
simpler), but combined operations is another. I wonder if perhaps the
new expression stuff solves that complexity. All that said I agree
getting the status quo working again is absolutely highest priority.
|
Started to poke around the new Task stuff and found Looks like dask-awkward/src/dask_awkward/lib/core.py Lines 1930 to 1931 in 1d4d4e9
needs to be wrapped in diff --git a/src/dask_awkward/lib/core.py b/src/dask_awkward/lib/core.py
index d7c1a4e..4c8cd45 100644
--- a/src/dask_awkward/lib/core.py
+++ b/src/dask_awkward/lib/core.py
@@ -1910,6 +1910,7 @@ def partitionwise_layer(
The Dask HighLevelGraph Blockwise layer.
"""
+ from dask._task_spec import Alias
pairs: list[Any] = []
numblocks: dict[str, tuple[int, ...]] = {}
for arg in args:
@@ -1928,7 +1929,7 @@ def partitionwise_layer(
pairs.extend([arg.name, "i"])
numblocks[arg.name] = (1,)
elif isinstance(arg, Delayed):
- pairs.extend([arg.key, None])
+ pairs.extend([Alias(arg.key), None])
elif is_dask_collection(arg):
raise DaskAwkwardNotImplemented(
"Use of Array with other Dask collections is currently unsupported." and got a different error with default compute (e.g. Need to look more into how the Alias impacts the typetracer graph pass In [1]: import dask_awkward as dak
...: from dask.delayed import delayed
...: import awkward as ak
...:
...:
In [2]: x = ak.Array([[1,2,3], [4]])
In [3]: a = dak.from_awkward(x, npartitions=2)
In [4]: @delayed
...: def one():
...: return 1
...:
In [5]: import operator
In [6]: b = a.map_partitions(operator.mul, one(), meta=a._meta)
In [7]: b.compute(optimize_graph=False)
Out[7]: <Array [[1, 2, 3], [4]] type='2 * var * int64'>
In [8]: b.compute()
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
Cell In[8], line 1
----> 1 b.compute()
File ~/software/repos/dask-awkward/.venv/lib/python3.13/site-packages/dask/base.py:372, in DaskMethodsMixin.compute(self, **kwargs)
348 def compute(self, **kwargs):
349 """Compute this dask collection
350
351 This turns a lazy Dask collection into its in-memory equivalent.
(...)
370 dask.compute
371 """
--> 372 (result,) = compute(self, traverse=False, **kwargs)
373 return result
File ~/software/repos/dask-awkward/.venv/lib/python3.13/site-packages/dask/base.py:660, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
657 postcomputes.append(x.__dask_postcompute__())
659 with shorten_traceback():
--> 660 results = schedule(dsk, keys, **kwargs)
662 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
File ~/software/repos/dask-awkward/.venv/lib/python3.13/site-packages/dask/local.py:455, in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
452 rerun_exceptions_locally = config.get("rerun_exceptions_locally", False)
454 if state["waiting"] and not state["ready"]:
--> 455 raise ValueError("Found no accessible jobs in dask")
457 def fire_tasks(chunksize):
458 """Fire off a task to the thread pool"""
ValueError: Found no accessible jobs in dask |
@pfackeldey @douglasdavis @fjetter If I comment out https://github.com/dask-contrib/dask-awkward/blob/main/src/dask_awkward/lib/optimize.py#L52 we get some interesting outcomes. HLG with optimize.py#52 commented out:
with L52 restored:
The test passes with |
With
where with
if I then dig into the
and So, how do we keep the delayed node in the graph? |
With dask 2024.11.x the results are as follows: no
and that one layer contains:
It executes as expected. with
and the last layer contains:
It also executes as expected, since here the delayed object remains in the graph! So the old @martindurant thoughts? or @fjetter is this an expected new behavior or bug in optimize_blockwise? |
Ah, my bad - it's after the cull step:
So somehow we're not retaining the information in the graph that marks the delayed layer as necessary. |
Indeed, if I skip the cull step everything computes as expected. |
So the graph is right, the dependency must be there, else it wouldn't compute - but cull is making some other assumption that we don't meet. |
I have found the source of the difference. After dask/dask#11568 the delayed array no longer shows up as a constant dependency in the task graph coming from this loop: I checked it's not another kind of dep, even in old dask the delayed-wrapped array is only a constant. If I add code to correctly deal with ...
if isinstance(arg, Alias):
arg = arg.target.key
... Then everything works as expected again. |
@fjetter is this skipping of |
I also find that if I use Understanding of what is correct would be appreciated. For the time being I will change the dask_awkward code to use a |
for more information, see https://pre-commit.ci
for more information, see https://pre-commit.ci
OK - only failing tests are over in uproot where we'll have to patch things up to deal with Tasks there as well! |
However, we should settle these correct usage issues and get bugfixes in the right places if they are necessary. We then wait on further input and guidance from @fjetter as to correct/expected usage. |
It's also a bit weird that the Judging from that I guess we'd want |
@fjetter when you have time, we would appreciate your commentary so that we can resolve this. |
@fjetter just a ping |
@@ -37,7 +37,8 @@ classifiers = [ | |||
] | |||
dependencies = [ | |||
"awkward >=2.5.1", | |||
"dask >=2023.04.0", | |||
"dask >=2024.12.0;python_version>'3.9'", | |||
"dask >=2023.04.0;python_version<'3.10'", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In live discussions, we were tending towards dropping backward compatibility here, which means dropping py3.9 support (which dask and numpy already have). Users of py3.9 will not have hit the original problem, since the new dask was not released for them.
This would also save about half the lOC in this PR.
Adapts one bit of dask_awkward code that makes a graph to use a task object instead.
c.f.:
dask/dask#11568