-
-
Notifications
You must be signed in to change notification settings - Fork 27
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DNM: Full branch_id implementation #896
base: main
Are you sure you want to change the base?
Conversation
…ation_2 # Conflicts: # dask_expr/_core.py # dask_expr/_expr.py
…ranch_id_implementation_shuffle
…ation_shuffle # Conflicts: # dask_expr/tests/test_shuffle.py
…_shuffle # Conflicts: # dask_expr/tests/test_reuse.py
…ation # Conflicts: # dask_expr/tests/test_shuffle.py
…_shuffle # Conflicts: # dask_expr/_reductions.py
@@ -29,6 +29,10 @@ | |||
] | |||
|
|||
|
|||
class BranchId(NamedTuple): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason you prefer a NamedTuple
over a NewType('BranchId', int)
here?
@@ -39,3 +41,12 @@ def assert_eq(a, b, *args, serialize_graph=True, **kwargs): | |||
|
|||
# Use `dask.dataframe.assert_eq` | |||
return dd_assert_eq(a, b, *args, **kwargs) | |||
|
|||
|
|||
def _check_consumer_node(expr, expected, consumer_node=IO, branch_id_counter=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What exactly is a "consumer" node in this context?
_name = inst._name | ||
if _name in Expr._instances: | ||
return Expr._instances[_name] | ||
|
||
Expr._instances[_name] = inst | ||
return inst | ||
|
||
@classmethod | ||
def _check_branch_id_given(cls, args, _branch_id): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
def _check_branch_id_given(cls, args, _branch_id): | |
def _maybe_check_branch_id_given(cls, args, _branch_id): |
return | ||
return self._bubble_branch_id_down() | ||
|
||
def _bubble_branch_id_down(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
def _bubble_branch_id_down(self): | |
def _propagate_branch_id_down(self): |
# is used during optimization to capture the dependents of any given | ||
# expression. A reuse consumer will have the same dependents independently | ||
# of the branch_id parameter, since we want to reuse everything that comes | ||
# before us and split branches up everything that is processed after |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there's a word missing, maybe for
?
# before us and split branches up everything that is processed after | |
# before us and split branches up for everything that is processed after |
if not common_subplan_elimination: | ||
out = result.rewrite("reuse", cache={}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While we have cleaned up the meaning of common_subplan_elimination
, it's still weird to me that we execute the reuse
rule to avoid CSE. Maybe we should rename it to something like avoid_common_subplan_elimination
?
@@ -2805,6 +2824,10 @@ def optimize(expr: Expr, fuse: bool = True) -> Expr: | |||
Input expression to optimize | |||
fuse: | |||
whether or not to turn on blockwise fusion | |||
common_subplan_elimination : bool, default False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that common_subplan_elimination
does not mean no CSE but rather less CSE. We may want to reflect that in the docstring.
return ( | ||
funcname(type(self)).lower() | ||
+ "-" | ||
+ _tokenize_deterministic(*self.operands, self._branch_id) | ||
) | ||
|
||
@functools.cached_property | ||
def _dep_name(self): | ||
# The name identifies every expression uniquely. The dependents name | ||
# is used during optimization to capture the dependents of any given | ||
# expression. A reuse consumer will have the same dependents independently | ||
# of the branch_id parameter, since we want to reuse everything that comes | ||
# before us and split branches up everything that is processed after | ||
# us. So we have to ignore the branch_id from tokenization for those | ||
# nodes. | ||
if not self._reuse_consumer: | ||
return self._name | ||
return ( | ||
funcname(type(self)).lower() + "-" + _tokenize_deterministic(*self.operands) | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This feels prone to errors/inconsistencies when subclassing. Would it make sense to define a property _dep_name_tokens
that could be overriden and a property _name_tokens
that just always adds branch_id
to the _dep_name_tokens
? This could then feed into a common function used to generate the name using the tokens as input.
For example, FromGraph
already implements a new _name
but not a new _dep_name
.
@@ -43,9 +47,17 @@ class Expr: | |||
_parameters = [] | |||
_defaults = {} | |||
_instances = weakref.WeakValueDictionary() | |||
_branch_id_required = False | |||
_reuse_consumer = False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This name is ambiguous. Can we come up with something more descriptive?
] | ||
return type(self)(*ops) | ||
|
||
def _substitute_branch_id(self, branch_id): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
def _substitute_branch_id(self, branch_id): | |
def _maybe_substitute_branch_id(self, branch_id): |
or something else that highlights the conditionality.
expected = expected.a + expected.a.sum() | ||
pd.testing.assert_series_equal(x.sort_index(), expected) | ||
|
||
# Check that we have 1 shuffle barrier but 20 p2pshuffle tasks for the output |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, this PR introduces functionality that relies on being able to read shuffle outputs several times. The fact that this seems to work with disk-based P2P is a lucky implementation detail but not guaranteed to work, let alone tested. Before releasing, we should at the very least test this. (It will also not work with in-memory P2P, but that's currently not supported in dask-expr
anyway.)
One general comment: This PR introduces many different names for seemingly similar things, e.g., branch vs. subplan, reuse vs. elimination. We may want to clean this up before merging to make it easier to grasp concepts. |
# Ensure that shuffles with different branch_ids have the same barrier | ||
token = self._dep_name.split("-")[-1] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm pretty strongly -1 for this. The fact that this works is purely coincidental. The barrier should be treated as an internal implementation detail since way too much logic depends on this. If we want/need this functionality, it should be supported as a proper API of the extension
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see the benefit of reusing results that are written to disk, but I agree that the fact that this works is purely coincidental and very brittle wrt to changes.
From what I see, this might be useful if it were
- well-tested (also on the P2P side)
- not hidden as what looks like an implementation detail within the
_layer
I'm not sure if this is something we can implement within the P2P extension. Maybe we can do this after we have the scheduler integration? I could also see this become an optimization pass that makes this very explicit.
No description provided.