-
-
Notifications
You must be signed in to change notification settings - Fork 27
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Absorb column projections in other BlockwiseIO
expressions
#247
Changes from 6 commits
58315e9
3ba1724
c26224f
7926400
ded1648
49e4771
ef61645
668a6b1
f640bb5
e66c637
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,7 +28,7 @@ | |
from dask.utils import M, apply, funcname, import_required, is_arraylike | ||
from tlz import merge_sorted, unique | ||
|
||
from dask_expr._util import _tokenize_deterministic | ||
from dask_expr._util import _tokenize_deterministic, _tokenize_partial | ||
|
||
replacement_rules = [] | ||
|
||
|
@@ -768,20 +768,9 @@ def _find_similar_operations(self, root: Expr, ignore: list | None = None): | |
# No other operations of the same type. Early return | ||
return [] | ||
|
||
def _tokenize(rp): | ||
# Helper function to "tokenize" the operands | ||
# that are not in the `ignore` list | ||
return _tokenize_deterministic( | ||
*[ | ||
op | ||
for i, op in enumerate(rp.operands) | ||
if i >= len(rp._parameters) or rp._parameters[i] not in ignore | ||
] | ||
) | ||
|
||
# Return subset of `alike` with the same "token" | ||
token = _tokenize(self) | ||
return [item for item in alike if _tokenize(item) == token] | ||
token = _tokenize_partial(self, ignore) | ||
return [item for item in alike if _tokenize_partial(item, ignore) == token] | ||
|
||
def _node_label_args(self): | ||
"""Operands to include in the node label by `visualize`""" | ||
|
@@ -1085,8 +1074,11 @@ def _combine_similar(self, root: Expr): | |
|
||
if push_up_op: | ||
# Add operations back in the same order | ||
for op in reversed(operations): | ||
for i, op in enumerate(reversed(operations)): | ||
common = common[op] | ||
if i > 0: | ||
# Combine stacked projections | ||
common = common._simplify_down() or common | ||
return common | ||
return None | ||
|
||
|
@@ -1191,6 +1183,10 @@ class DropnaFrame(Blockwise): | |
_keyword_only = ["how", "subset", "thresh"] | ||
operation = M.dropna | ||
|
||
@property | ||
def _projection_passthrough(self): | ||
return self.subset is None | ||
|
||
def _simplify_up(self, parent): | ||
if self.subset is not None: | ||
columns = set(parent.columns).union(self.subset) | ||
|
@@ -1202,6 +1198,14 @@ def _simplify_up(self, parent): | |
type(self)(self.frame[sorted(columns)], *self.operands[1:]), | ||
*parent.operands[1:], | ||
) | ||
elif isinstance(parent, Projection): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can't do this. We drop rows if any column contains a missing value. The operation will change if we remove columns that could potentially contain NA values There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, Good catch! I don't know what I was thinking :) |
||
if set(parent.columns) == set(self.frame.columns): | ||
# Don't add unnecessary Projections | ||
return | ||
return type(parent)( | ||
self.substitute_parameters({"frame": self.frame[parent.columns]}), | ||
*parent.operands[1:], | ||
) | ||
|
||
|
||
class CombineFirst(Blockwise): | ||
|
@@ -1615,6 +1619,9 @@ def _simplify_down(self): | |
if ( | ||
str(self.frame.columns) == str(self.columns) | ||
and self._meta.ndim == self.frame._meta.ndim | ||
and not ( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not sure I understand why this is necessary. Can you elaborate? Why can't we return self.frame if the columns are the same anyway? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My original thinking here was that if With that said, you are correct that "absorbing" the projection should not really change anything if the first two criteria of this |
||
isinstance(self.frame, BlockwiseIO) and self.frame._absorb_projections | ||
) | ||
): | ||
# TODO: we should get more precise around Expr.columns types | ||
return self.frame | ||
|
@@ -2130,7 +2137,12 @@ def are_co_aligned(*exprs): | |
"""Do inputs come from different parents, modulo blockwise?""" | ||
exprs = [expr for expr in exprs if not is_broadcastable(expr)] | ||
ancestors = [set(non_blockwise_ancestors(e)) for e in exprs] | ||
return len(set(flatten(ancestors, container=set))) == 1 | ||
unique_ancestors = { | ||
# Account for column projection within IO expressions | ||
_tokenize_partial(item, ["columns", "_series"]) | ||
for item in flatten(ancestors, container=set) | ||
} | ||
return len(unique_ancestors) == 1 | ||
|
||
|
||
## Utilites for Expr fusion | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,16 +1,27 @@ | ||
import functools | ||
import operator | ||
|
||
from dask_expr._util import _convert_to_list | ||
from dask_expr.io.io import BlockwiseIO, PartitionsFiltered | ||
|
||
|
||
class ReadCSV(PartitionsFiltered, BlockwiseIO): | ||
_parameters = ["filename", "usecols", "header", "_partitions", "storage_options"] | ||
_parameters = [ | ||
"filename", | ||
"columns", | ||
"header", | ||
"_partitions", | ||
"storage_options", | ||
"_series", | ||
] | ||
_defaults = { | ||
"usecols": None, | ||
"columns": None, | ||
"header": "infer", | ||
"_partitions": None, | ||
"storage_options": None, | ||
"_series": False, | ||
} | ||
_absorb_projections = False | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a TODO for the future? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, forgot about this. Just submitted #268 |
||
|
||
@functools.cached_property | ||
def _ddf(self): | ||
|
@@ -19,14 +30,28 @@ def _ddf(self): | |
|
||
return dd.read_csv( | ||
self.filename, | ||
usecols=self.usecols, | ||
usecols=_convert_to_list(self.operand("columns")), | ||
header=self.header, | ||
storage_options=self.storage_options, | ||
) | ||
|
||
@property | ||
def _meta(self): | ||
return self._ddf._meta | ||
meta = self._ddf._meta | ||
if self.columns: | ||
return meta[self.columns[0]] if self._series else meta[self.columns] | ||
return meta | ||
|
||
@functools.cached_property | ||
def columns(self): | ||
columns_operand = self.operand("columns") | ||
if columns_operand is None: | ||
try: | ||
return list(self._ddf._meta.columns) | ||
except AttributeError: | ||
return [] | ||
else: | ||
return _convert_to_list(columns_operand) | ||
|
||
def _divisions(self): | ||
return self._ddf.divisions | ||
|
@@ -36,4 +61,6 @@ def _tasks(self): | |
return list(self._ddf.dask.to_dict().values()) | ||
|
||
def _filtered_task(self, index: int): | ||
if self._series: | ||
return (operator.getitem, self._tasks[index], self.columns[0]) | ||
return self._tasks[index] |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,10 +3,19 @@ | |
import functools | ||
import math | ||
|
||
from dask.dataframe.core import is_dataframe_like | ||
from dask.dataframe.io.io import sorted_division_locations | ||
|
||
from dask_expr._expr import Blockwise, Expr, Lengths, Literal, PartitionsFiltered | ||
from dask_expr._expr import ( | ||
Blockwise, | ||
Expr, | ||
Lengths, | ||
Literal, | ||
PartitionsFiltered, | ||
Projection, | ||
) | ||
from dask_expr._reductions import Len | ||
from dask_expr._util import _convert_to_list | ||
|
||
|
||
class IO(Expr): | ||
|
@@ -39,19 +48,109 @@ def _layer(self): | |
|
||
|
||
class BlockwiseIO(Blockwise, IO): | ||
pass | ||
_absorb_projections = False | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note that it probably makes sense to work with a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that's fine for now |
||
|
||
def _simplify_up(self, parent): | ||
if ( | ||
self._absorb_projections | ||
and isinstance(parent, Projection) | ||
and is_dataframe_like(self._meta) | ||
): | ||
# Column projection | ||
parent_columns = parent.operand("columns") | ||
proposed_columns = _convert_to_list(parent_columns) | ||
make_series = isinstance(parent_columns, (str, int)) and not self._series | ||
if set(proposed_columns) == set(self.columns) and not make_series: | ||
# Already projected | ||
return | ||
substitutions = {"columns": _convert_to_list(parent_columns)} | ||
if make_series: | ||
substitutions["_series"] = True | ||
return self.substitute_parameters(substitutions) | ||
|
||
def _combine_similar(self, root: Expr): | ||
if self._absorb_projections: | ||
# For BlockwiseIO expressions with "columns"/"_series" | ||
# attributes (`_absorb_projections == True`), we can avoid | ||
# redundant file-system access by aggregating multiple | ||
# operations with different column projections into the | ||
# same operation. | ||
alike = self._find_similar_operations(root, ignore=["columns", "_series"]) | ||
if alike: | ||
# We have other BlockwiseIO operations (of the same | ||
# sub-type) in the expression graph that can be combined | ||
# with this one. | ||
|
||
# Find the column-projection union needed to combine | ||
# the qualified BlockwiseIO operations | ||
columns_operand = self.operand("columns") | ||
if columns_operand is None: | ||
columns_operand = self.columns | ||
columns = set(columns_operand) | ||
ops = [self] + alike | ||
for op in alike: | ||
op_columns = op.operand("columns") | ||
if op_columns is None: | ||
op_columns = op.columns | ||
columns |= set(op_columns) | ||
columns = sorted(columns) | ||
|
||
# Can bail if we are not changing columns or the "_series" operand | ||
if columns_operand == columns and ( | ||
len(columns) > 1 or not self._series | ||
): | ||
return | ||
|
||
# Check if we have the operation we want elsewhere in the graph | ||
for op in ops: | ||
if op.columns == columns and not op.operand("_series"): | ||
return ( | ||
op[columns_operand[0]] | ||
if self._series | ||
else op[columns_operand] | ||
) | ||
|
||
# Create the "combined" ReadParquet operation | ||
subs = {"columns": columns} | ||
if self._series: | ||
subs["_series"] = False | ||
new = self.substitute_parameters(subs) | ||
return new[columns_operand[0]] if self._series else new[columns_operand] | ||
|
||
return | ||
|
||
|
||
class FromPandas(PartitionsFiltered, BlockwiseIO): | ||
"""The only way today to get a real dataframe""" | ||
|
||
_parameters = ["frame", "npartitions", "sort", "_partitions"] | ||
_defaults = {"npartitions": 1, "sort": True, "_partitions": None} | ||
_parameters = ["frame", "npartitions", "sort", "columns", "_partitions", "_series"] | ||
_defaults = { | ||
"npartitions": 1, | ||
"sort": True, | ||
"columns": None, | ||
"_partitions": None, | ||
"_series": False, | ||
} | ||
_pd_length_stats = None | ||
_absorb_projections = True | ||
|
||
@property | ||
def _meta(self): | ||
return self.frame.head(0) | ||
meta = self.frame.head(0) | ||
if self.columns: | ||
return meta[self.columns[0]] if self._series else meta[self.columns] | ||
return meta | ||
|
||
@functools.cached_property | ||
def columns(self): | ||
columns_operand = self.operand("columns") | ||
if columns_operand is None: | ||
try: | ||
return list(self.frame.columns) | ||
except AttributeError: | ||
return [] | ||
else: | ||
return _convert_to_list(columns_operand) | ||
|
||
@functools.cached_property | ||
def _divisions_and_locations(self): | ||
|
@@ -93,6 +192,9 @@ def _simplify_up(self, parent): | |
if _lengths: | ||
return Literal(sum(_lengths)) | ||
|
||
if isinstance(parent, Projection): | ||
return super()._simplify_up(parent) | ||
|
||
def _divisions(self): | ||
return self._divisions_and_locations[0] | ||
|
||
|
@@ -101,9 +203,16 @@ def _locations(self): | |
|
||
def _filtered_task(self, index: int): | ||
start, stop = self._locations()[index : index + 2] | ||
return self.frame.iloc[start:stop] | ||
part = self.frame.iloc[start:stop] | ||
if self.columns: | ||
return part[self.columns[0]] if self._series else part[self.columns] | ||
return part | ||
|
||
def __str__(self): | ||
if self._absorb_projections and self.operand("columns"): | ||
if self._series: | ||
return f"df[{self.columns[0]}]" | ||
return f"df[{self.columns}]" | ||
return "df" | ||
|
||
__repr__ = __str__ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here.