-
-
Notifications
You must be signed in to change notification settings - Fork 27
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make meta calculcation for merge more efficient #284
base: main
Are you sure you want to change the base?
Conversation
dask_expr/_merge.py
Outdated
@@ -37,6 +37,7 @@ class Merge(Expr): | |||
"suffixes", | |||
"indicator", | |||
"shuffle_backend", | |||
"_meta", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes me nervous. I think that it's a good principle to only have necessary operands. Any derived state should be computed.
For example, if an optimization were to change some parameter here, like suffixes
or something, I wouldn't want to worry about also modifying meta
at the same time. It's nice to be able to rely on this invariant across the project.
If we want to include some other state in a custom constructor I would be more ok with that (although still nervous). In that case I'd want to make sure that the constructor always passed type(self)(*self.operands) == self
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The downside with this is that we are stuck with the repeated computation, caching objects and so on won't help here, since meta will change, e.g. caching is not useful. We genuinely change the object when we re-create it, which means that we will always trigger a fresh computation of meta. Which by itself isn't bad, but non-empty meta computations are relatively expensive (empty meta won't work here).
For example, if an optimization were to change some parameter here, like suffixes or something, I wouldn't want to worry about also modifying meta at the same time. It's nice to be able to rely on this invariant across the project
We can simply not pass meta in this case which would trigger a fresh computation, this is only a fast path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We genuinely change the object when we re-create it, which means that we will always trigger a fresh computation of meta
If the inputs to the pd.merge call are going to change then it seems like we need to recompute meta anyway. If the inputs aren't changing but we're recomputing then maybe that is a sign that we're caching on the wrong things. Maybe we should have a staticfunction
or something instead.
We can simply not pass meta in this case which would trigger a fresh computation, this is only a fast path
Imagine an optimization which did something like the following:
def subs(obj, old, new):
operands = [
new if operand == old else operand # substitute old for new in all operands
for operand in obj.operands
]
return type(obj)(*operands)
This fails if we store derived state in operands, because _meta
is in there and really we should choose to not include it any more.
I'm ok keeping derived state on the class. I just don't think that it should be in operands. This probably requires custom constructors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are optimisations where meta changes but we compute the necessary information anyway, so we can simply adjust meta if we want to, Projections are a good example for this.
Lower is an example where meta won't change in case of merge, but you can't properly cache it either. We might introduce a shuffle which means caching will fail for most cases.
I'll add a custom constructor here that should work as well. Then we can see how that looks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lower is an example where meta won't change in case of merge, but you can't properly cache it either. We might introduce a shuffle which means caching will fail for most cases.
What about caching not on the object, but somewhere else? That way anyone that asks "what is the meta for these inputs?" will get the cached result, regardless of what object they're calling from? (this was my staticmethod
suggestion from above)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, my comment wasn't clear enough probably. That's how I understood what you were saying, but the problem is as follows:
- That won't work if we introduce a Projection (which is something we could accept, although that wouldn't make me happy in this particular case)
- Meta won't change when we lower the graph, but we will introduce a shuffle, so the inputs of the merge computation will change while lowering, which means that the cache wouldn't work anymore
Caching won't help in either of these two cases, which is where most of the time is spent unfortunately
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Feels like we're maybe talking past each other here. Happy to chat live if you're free later.
def __init__(self, *args, _precomputed_meta=None, **kwargs): | ||
super().__init__(*args, **kwargs) | ||
self._precomputed_meta = _precomputed_meta |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I would strongly prefer if we use a global key-value cache in the same way we cache dataset info for parquet. In fact, we should probably formalize this caching approach to avoid repeating the same kind of logic in multiple places.
It seems like a unique meta depends on a token like...
@functools.cached_property
def _meta_cache_token(self):
return _tokenize_deterministic(
self.left._meta,
self.right._meta,
self.how,
self.left_on,
self.right_on,
self.left_index,
self.right_index,
self.suffixes,
self.indicator,
)
If self.left._meta
or self.right._meta
were to change (due to column projection), we would need to recalculate meta
anyway. However, if the Merge
object was responsible for pushing down the column projection, we could always update the cache within the simplify logic (since we would already know how the meta needs to change).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don’t see a global need for this yet. The slowdown in merge goes back to the nonempty meta objects, not the actual computation on empty objects.
some of the operations in Lower have side effects, which makes adjusting the meta objects of left and right bothersome and complicated.
I am open to adjusting the implementation if we run into this in more places, but as long as we need it only for merge I’d prefer this solution since we keep the complexity in here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fwiw I am also not too big of a fan of relying on meta in hashes, there are too many things in pandas that might mutate this unexpectedly, which would break this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To clarify, I don't really mind if we implement stand-alone caching logic in _merge.py
for now. The thing I'm unsure about in this PR is that we are overriding __init__
so that we can effectively cache _meta
without adding _meta
as an official operand.
It may be the case that this is exactly how we should be attacking this problem in Merge
(and maybe everywhere). For example, maybe we will eventually have a special known_meta=
kwarg in Expr
, which all expression objects could leverage. However, since it is not a proper operand, this mechanism feels a bit confusing and fragile to me.
The slowdown in merge goes back to the nonempty meta objects, not the actual computation on empty objects.
I don't think I understand your point here. Either way we are effectively caching the output of _meta
, no?
some of the operations in Lower have side effects, which makes adjusting the meta objects of left and right bothersome and complicated.
I don't see how this is any different for _precomputed_meta
? In any case where you are confident defining _precomputed_meta
, you could also just add the "lowered" object to the global cache before returning it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fwiw I am also not too big of a fan of relying on meta in hashes, there are too many things in pandas that might mutate this unexpectedly, which would break this
Interesting. I'd say that should be a fundamental concern for dask-expr in then. What would be then most reliable way to hash the schema?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think I understand your point here. Either way we are effectively caching the output of _meta, no?
Yes but the nature of the slowdown makes me think that we need it only in merge and not in other places as of now. I am open to rewriting this here as well if this turns out differently.
I don't see how this is any different for _precomputed_meta? In any case where you are confident defining _precomputed_meta, you could also just add the "lowered" object to the global cache before returning it.
pandas has some caveats that might change your dtype in meta but not on the actual df. Relying on the initial meta seems saver to me. But this might also be totally wrong.
To clarify, I don't really mind if we implement stand-alone caching logic in _merge.py for now. The thing I'm unsure about in this PR is that we are overriding init so that we can effectively cache _meta without adding _meta as an official operand.
@mrocklin and I chatted offline and landed on this solution. One motivating factor was the last part here: #284 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting. I'd say that should be a fundamental concern for dask-expr in then. What would be then most reliable way to hash the schema?
I don't have a good answer for that. Meta is still the best bet, but it has some limitations. This will get more stable in the future since we are deprecating all of these caveats at the moment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes but the nature of the slowdown makes me think that we need it only in merge and not in other places as of now. I am open to rewriting this here as well if this turns out differently.
Okay, I think you are are talking about the decision not to write general Expr
-wide caching code here, which is all good with me. I was only thinking about the decision to use _precomputed_meta
instead of a simple k/v cache.
Possible problems with the k/v cache approach:
- The meta-hashing issue you mentioned
- We would be keeping the cached meta in memory even after we need it (also a problem for parquet)
Possible problems with _precomputed_meta
:
- I suppose we are breaking with convention a bit (seems okay)
- Any
substitute_parameters
call will drop the information, even if you aren't changing information that is relevant to meta
One motivating factor was the last part here ...
Right, I agree that it would be a mistake to make _precomputed_meta
a proper operand.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We would also have to patch meta of left and right in the HashJoin layer because that adds a column
substitute_parameters
parameters is annoying, we could override but that's not great either.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for adding all this discussion. I do get that you are focusing on the Merge
-specific meta
issue at hand.
I'm just doing my best to keep the big picture in mind - It seems like we are going to keep running into cases where we would benefit from caching information outside the Expr
object itself. Therefore, it would be nice if we could design a formal system where a collection of different caches can be managed in one place.
That said, I definitely don't think we need to do something like that right now.
We would also have to patch meta of left and right in the HashJoin layer because that adds a column
Right, HashJoinP2P._meta_cache_token
does need to drop the hash columns.
# Conflicts: # dask_expr/_merge.py
The non-empty nature of the meta creation is a bottleneck. Avoiding these repeated calculations cuts the optimization time of some of the tpch queries by 60 percent (excluding the read from remote storage part)
sits on top of #283