Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wrong token generation with dak.map_partitions #553

Open
pfackeldey opened this issue Dec 2, 2024 · 4 comments · May be fixed by #555
Open

wrong token generation with dak.map_partitions #553

pfackeldey opened this issue Dec 2, 2024 · 4 comments · May be fixed by #555
Labels
bug Something isn't working

Comments

@pfackeldey
Copy link
Collaborator

pfackeldey commented Dec 2, 2024

Currently the token generation for the HLG layer with dak.map_partitions seems to be broken, i.e. it changes everytime, e.g.:

import awkward as ak
import dask_awkward as dak

dak_array = dak.from_awkward(ak.Array([1, 2, 3]), 1)

def fun(x):
  return x + 1

dak.map_partitions(fun, dak_array).dask
# HighLevelGraph with 2 layers.
# <dask.highlevelgraph.HighLevelGraph object at 0x10577cd30>
# 0. from-awkward-8fdcc693b11857efa42a556ced92dfbc
# 1. <dask-awkward.lib.core.ArgsKwargsPackedFunction ob-7ae414b04e6de108142215f755b2eb3e

dak.map_partitions(fun, dak_array).dask
# HighLevelGraph with 2 layers.
# <dask.highlevelgraph.HighLevelGraph object at 0x10595ae90>
# 0. from-awkward-8fdcc693b11857efa42a556ced92dfbc
# 1. <dask-awkward.lib.core.ArgsKwargsPackedFunction ob-51ec44d7d3f220ecde45b63a6fd614d0
 
dak.map_partitions(fun, dak_array).dask
# HighLevelGraph with 2 layers.
# <dask.highlevelgraph.HighLevelGraph object at 0x112fde680>
# 0. from-awkward-8fdcc693b11857efa42a556ced92dfbc
# 1. <dask-awkward.lib.core.ArgsKwargsPackedFunction ob-eb6d01930cd4e0e1e190e3dda45e2f17

Here, one can see that the token of the second layer always changes: 7ae414b04e6de108142215f755b2eb3e -> 51ec44d7d3f220ecde45b63a6fd614d0 -> eb6d01930cd4e0e1e190e3dda45e2f17. This token does not change if one directly applies fun, i.e. fun(dak_array).dask.

The reason for different tokens is that each time fun is wrapped inside a new instance of ArgsKwargsPackedFunction, and this instance is passed to the tokenizer.

This also results in never hitting the dak_cache, which is likely unwanted: https://github.com/dask-contrib/dask-awkward/blob/main/src/dask_awkward/lib/core.py#L2011-L2012.

Passing the original fun (instead of the wrapped one) to dask.base.tokenize fixes this issue, but leads to a lot of failing tests (probably because the caches are hit now and that doesn't work as expected?).

How should this token generation be treated correctly @martindurant ?

I'm happy to provide a fix (given that I understand this issue here correctly), because I think that this mechanism can be used to speed up graph building in general.


My end goal is to make it possible to give token= directly to uproot.dask such that we can diminish the amount of layer building:

# `b.dask` can reuse the layers of `a.dask` from the `dak_cache` if the `token` match:
a = uproot.dask("DY.root", token="MC")
b = uproot.dask("ttbar.root", token="MC")

Here, the graph would still be built per .root file but the internal layer (dak.from_map) is just constructed once for a and could then be reused from cache for b. All consecutive steps (other dak.map_partitions or dak.mapfilter calls) could be cached then aswell in a similar way.
In combination with dak.mapfilter, which allows to skip typetracing entirely when providing the correct needs and meta, one could speed up graph building significantly.

(cc @lgray and @jpivarski because this goes a bit in the direction of duplicating graphs for datasets that should be treated in the same way, but rather through a caching mechanism that exists already in dask-awkward.)

@pfackeldey pfackeldey added the bug Something isn't working label Dec 2, 2024
@lgray
Copy link
Collaborator

lgray commented Dec 2, 2024

@pfackeldey As you noted, it's this part ofmap_partitions:
https://github.com/dask-contrib/dask-awkward/blob/main/src/dask_awkward/lib/core.py#L2180-L2185

It might be worth it to override the __dask_tokenize__ of the ArgsKwargsPackedFunction and give it a try?

However, since the inputs to everything going through b, your ttbar dataset, would generate new hashes, you wouldn't hit the cache in any case.

The only time you'd hit the cache is if you have multiple uproot.open calls to the same file, which is a pattern we don't want to encourage anyway.

@pfackeldey
Copy link
Collaborator Author

I did a workaround (in def _map_partitions):

-    token = token or tokenize(fn, *args, output_divisions, **kwargs)
+    token_fn = fn
+    # unpack because otherwise the token will be different everytime
+    if isinstance(fn, ArgsKwargsPackedFunction):
+        token_fn = fn.fn
+    token = token or tokenize(token_fn, *args, output_divisions, **kwargs)

but as mentioned in the issue: this lead to many failures of the current test suite - which made me curious if I'm misunderstanding something here.
You're right though: one would rather want to overwrite __dask_tokenize__ than applying my diff.

However, since the inputs to everything going through b, your ttbar dataset, would generate new hashes, you wouldn't hit the cache in any case.

It wouldn't create new ones (I think), because of token = token or tokenize(...), which won't generate new ones if token is provided in the first place - as far as I understand it currently.
Oh wait, am I misunderstanding how uproot.dask is used in coffea in the first place...?

@martindurant
Copy link
Collaborator

token = token or ...

but it shouldn't be identical, right? Maybe a deterministic derivative.

@pfackeldey
Copy link
Collaborator Author

pfackeldey commented Dec 2, 2024

token = token or ...

but it shouldn't be identical, right? Maybe a deterministic derivative.

You're right, it shouldn't.
But I think something like this (a parametrization of a graph based on the inputs) is needed for duplicating full graphs for different input datasets. I agree though that my idea of using the dak_cache for this does not solve this - for some reason I thought the token and layer name building was independent of the arguments/inputs to the layer (which is not the case).

Coming back to the issue: the current behavior of dak.map_partitions still is not looking correct to me and it seems to break a lot of test cases when I change it. (I'll try to investigate if this is due to the dak_cache or some other reasons that I don't see yet.)

@pfackeldey pfackeldey linked a pull request Dec 3, 2024 that will close this issue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants