Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Absorb column projections in other BlockwiseIO expressions #247

Merged
merged 10 commits into from
Aug 4, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions dask_expr/_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -1092,12 +1092,12 @@ def from_dask_dataframe(ddf: _Frame, optimize: bool = True) -> FrameBase:
return from_graph(graph, ddf._meta, ddf.divisions, ddf._name)


def read_csv(path, *args, **kwargs):
def read_csv(path, *args, usecols=None, **kwargs):
from dask_expr.io.csv import ReadCSV

if not isinstance(path, str):
path = stringify_path(path)
return new_collection(ReadCSV(path, *args, **kwargs))
return new_collection(ReadCSV(path, *args, columns=usecols, **kwargs))


def read_parquet(
Expand Down
44 changes: 28 additions & 16 deletions dask_expr/_expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from dask.utils import M, apply, funcname, import_required, is_arraylike
from tlz import merge_sorted, unique

from dask_expr._util import _tokenize_deterministic
from dask_expr._util import _tokenize_deterministic, _tokenize_partial

replacement_rules = []

Expand Down Expand Up @@ -768,20 +768,9 @@ def _find_similar_operations(self, root: Expr, ignore: list | None = None):
# No other operations of the same type. Early return
return []

def _tokenize(rp):
# Helper function to "tokenize" the operands
# that are not in the `ignore` list
return _tokenize_deterministic(
*[
op
for i, op in enumerate(rp.operands)
if i >= len(rp._parameters) or rp._parameters[i] not in ignore
]
)

# Return subset of `alike` with the same "token"
token = _tokenize(self)
return [item for item in alike if _tokenize(item) == token]
token = _tokenize_partial(self, ignore)
return [item for item in alike if _tokenize_partial(item, ignore) == token]

def _node_label_args(self):
"""Operands to include in the node label by `visualize`"""
Expand Down Expand Up @@ -1085,8 +1074,11 @@ def _combine_similar(self, root: Expr):

if push_up_op:
# Add operations back in the same order
for op in reversed(operations):
for i, op in enumerate(reversed(operations)):
common = common[op]
if i > 0:
# Combine stacked projections
common = common._simplify_down() or common
return common
return None

Expand Down Expand Up @@ -1191,6 +1183,10 @@ class DropnaFrame(Blockwise):
_keyword_only = ["how", "subset", "thresh"]
operation = M.dropna

@property
def _projection_passthrough(self):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

return self.subset is None

def _simplify_up(self, parent):
if self.subset is not None:
columns = set(parent.columns).union(self.subset)
Expand All @@ -1202,6 +1198,14 @@ def _simplify_up(self, parent):
type(self)(self.frame[sorted(columns)], *self.operands[1:]),
*parent.operands[1:],
)
elif isinstance(parent, Projection):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't do this. We drop rows if any column contains a missing value. The operation will change if we remove columns that could potentially contain NA values

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, Good catch! I don't know what I was thinking :)

if set(parent.columns) == set(self.frame.columns):
# Don't add unnecessary Projections
return
return type(parent)(
self.substitute_parameters({"frame": self.frame[parent.columns]}),
*parent.operands[1:],
)


class CombineFirst(Blockwise):
Expand Down Expand Up @@ -1615,6 +1619,9 @@ def _simplify_down(self):
if (
str(self.frame.columns) == str(self.columns)
and self._meta.ndim == self.frame._meta.ndim
and not (
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure I understand why this is necessary. Can you elaborate?

Why can't we return self.frame if the columns are the same anyway?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My original thinking here was that if self.frame is a BlockwiseIO expression that can "absorb" projections, then we want to make sure self.frame actually absorbs the projection (by applying its own simplify_up logic).

With that said, you are correct that "absorbing" the projection should not really change anything if the first two criteria of this if statement are True. Therefore, we probably can/should revert this change. Note that I submitted #267 to do this (where I included a necessary bug fix).

isinstance(self.frame, BlockwiseIO) and self.frame._absorb_projections
)
):
# TODO: we should get more precise around Expr.columns types
return self.frame
Expand Down Expand Up @@ -2130,7 +2137,12 @@ def are_co_aligned(*exprs):
"""Do inputs come from different parents, modulo blockwise?"""
exprs = [expr for expr in exprs if not is_broadcastable(expr)]
ancestors = [set(non_blockwise_ancestors(e)) for e in exprs]
return len(set(flatten(ancestors, container=set))) == 1
unique_ancestors = {
# Account for column projection within IO expressions
_tokenize_partial(item, ["columns", "_series"])
for item in flatten(ancestors, container=set)
}
return len(unique_ancestors) == 1


## Utilites for Expr fusion
Expand Down
15 changes: 14 additions & 1 deletion dask_expr/_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,20 @@ def _normalize_lambda(func):
return str(func)


def _tokenize_deterministic(*args, **kwargs):
def _tokenize_deterministic(*args, **kwargs) -> str:
# Utility to be strict about deterministic tokens
with config.set({"tokenize.ensure-deterministic": True}):
return tokenize(*args, **kwargs)


def _tokenize_partial(expr, ignore: list | None = None) -> str:
# Helper function to "tokenize" the operands
# that are not in the `ignore` list
ignore = ignore or []
return _tokenize_deterministic(
*[
op
for i, op in enumerate(expr.operands)
if i >= len(expr._parameters) or expr._parameters[i] not in ignore
]
)
35 changes: 31 additions & 4 deletions dask_expr/io/csv.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,27 @@
import functools
import operator

from dask_expr._util import _convert_to_list
from dask_expr.io.io import BlockwiseIO, PartitionsFiltered


class ReadCSV(PartitionsFiltered, BlockwiseIO):
_parameters = ["filename", "usecols", "header", "_partitions", "storage_options"]
_parameters = [
"filename",
"columns",
"header",
"_partitions",
"storage_options",
"_series",
]
_defaults = {
"usecols": None,
"columns": None,
"header": "infer",
"_partitions": None,
"storage_options": None,
"_series": False,
}
_absorb_projections = False
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a TODO for the future?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, forgot about this. Just submitted #268


@functools.cached_property
def _ddf(self):
Expand All @@ -19,14 +30,28 @@ def _ddf(self):

return dd.read_csv(
self.filename,
usecols=self.usecols,
usecols=_convert_to_list(self.operand("columns")),
header=self.header,
storage_options=self.storage_options,
)

@property
def _meta(self):
return self._ddf._meta
meta = self._ddf._meta
if self.columns:
return meta[self.columns[0]] if self._series else meta[self.columns]
return meta

@functools.cached_property
def columns(self):
columns_operand = self.operand("columns")
if columns_operand is None:
try:
return list(self._ddf._meta.columns)
except AttributeError:
return []
else:
return _convert_to_list(columns_operand)

def _divisions(self):
return self._ddf.divisions
Expand All @@ -36,4 +61,6 @@ def _tasks(self):
return list(self._ddf.dask.to_dict().values())

def _filtered_task(self, index: int):
if self._series:
return (operator.getitem, self._tasks[index], self.columns[0])
return self._tasks[index]
121 changes: 115 additions & 6 deletions dask_expr/io/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,19 @@
import functools
import math

from dask.dataframe.core import is_dataframe_like
from dask.dataframe.io.io import sorted_division_locations

from dask_expr._expr import Blockwise, Expr, Lengths, Literal, PartitionsFiltered
from dask_expr._expr import (
Blockwise,
Expr,
Lengths,
Literal,
PartitionsFiltered,
Projection,
)
from dask_expr._reductions import Len
from dask_expr._util import _convert_to_list


class IO(Expr):
Expand Down Expand Up @@ -39,19 +48,109 @@ def _layer(self):


class BlockwiseIO(Blockwise, IO):
pass
_absorb_projections = False
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that it probably makes sense to work with a BlockwiseIO subclass here (instead of adding _absorb_projections). However, I used the attribute to explore and haven't bothered to modify the approach yet. I'll be happy to revise.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's fine for now


def _simplify_up(self, parent):
if (
self._absorb_projections
and isinstance(parent, Projection)
and is_dataframe_like(self._meta)
):
# Column projection
parent_columns = parent.operand("columns")
proposed_columns = _convert_to_list(parent_columns)
make_series = isinstance(parent_columns, (str, int)) and not self._series
if set(proposed_columns) == set(self.columns) and not make_series:
# Already projected
return
substitutions = {"columns": _convert_to_list(parent_columns)}
if make_series:
substitutions["_series"] = True
return self.substitute_parameters(substitutions)

def _combine_similar(self, root: Expr):
if self._absorb_projections:
# For BlockwiseIO expressions with "columns"/"_series"
# attributes (`_absorb_projections == True`), we can avoid
# redundant file-system access by aggregating multiple
# operations with different column projections into the
# same operation.
alike = self._find_similar_operations(root, ignore=["columns", "_series"])
if alike:
# We have other BlockwiseIO operations (of the same
# sub-type) in the expression graph that can be combined
# with this one.

# Find the column-projection union needed to combine
# the qualified BlockwiseIO operations
columns_operand = self.operand("columns")
if columns_operand is None:
columns_operand = self.columns
columns = set(columns_operand)
ops = [self] + alike
for op in alike:
op_columns = op.operand("columns")
if op_columns is None:
op_columns = op.columns
columns |= set(op_columns)
columns = sorted(columns)

# Can bail if we are not changing columns or the "_series" operand
if columns_operand == columns and (
len(columns) > 1 or not self._series
):
return

# Check if we have the operation we want elsewhere in the graph
for op in ops:
if op.columns == columns and not op.operand("_series"):
return (
op[columns_operand[0]]
if self._series
else op[columns_operand]
)

# Create the "combined" ReadParquet operation
subs = {"columns": columns}
if self._series:
subs["_series"] = False
new = self.substitute_parameters(subs)
return new[columns_operand[0]] if self._series else new[columns_operand]

return


class FromPandas(PartitionsFiltered, BlockwiseIO):
"""The only way today to get a real dataframe"""

_parameters = ["frame", "npartitions", "sort", "_partitions"]
_defaults = {"npartitions": 1, "sort": True, "_partitions": None}
_parameters = ["frame", "npartitions", "sort", "columns", "_partitions", "_series"]
_defaults = {
"npartitions": 1,
"sort": True,
"columns": None,
"_partitions": None,
"_series": False,
}
_pd_length_stats = None
_absorb_projections = True

@property
def _meta(self):
return self.frame.head(0)
meta = self.frame.head(0)
if self.columns:
return meta[self.columns[0]] if self._series else meta[self.columns]
return meta

@functools.cached_property
def columns(self):
columns_operand = self.operand("columns")
if columns_operand is None:
try:
return list(self.frame.columns)
except AttributeError:
return []
else:
return _convert_to_list(columns_operand)

@functools.cached_property
def _divisions_and_locations(self):
Expand Down Expand Up @@ -93,6 +192,9 @@ def _simplify_up(self, parent):
if _lengths:
return Literal(sum(_lengths))

if isinstance(parent, Projection):
return super()._simplify_up(parent)

def _divisions(self):
return self._divisions_and_locations[0]

Expand All @@ -101,9 +203,16 @@ def _locations(self):

def _filtered_task(self, index: int):
start, stop = self._locations()[index : index + 2]
return self.frame.iloc[start:stop]
part = self.frame.iloc[start:stop]
if self.columns:
return part[self.columns[0]] if self._series else part[self.columns]
return part

def __str__(self):
if self._absorb_projections and self.operand("columns"):
if self._series:
return f"df[{self.columns[0]}]"
return f"df[{self.columns}]"
return "df"

__repr__ = __str__
Loading