Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Absorb column projections in other BlockwiseIO expressions #247

Merged
merged 10 commits into from
Aug 4, 2023

Conversation

rjzamora
Copy link
Member

@rjzamora rjzamora commented Jul 28, 2023

Follow up to #245, where I noticed that we were not quite capturing optimal column projection behavior for expressions originating from FromPandas. More specifically, since FromPandas does not "absorb" a Projection into it's own "columns" operand, it becomes difficult to produce a "combined" column projection at the optimal position in the expression graph.

For example:

import dask_expr as dx
import pandas as pd

pdf = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [4, 5, 8, 6, 1, 4], "z": 1})
df = dx.from_pandas(pdf, npartitions=3)

df = df.dropna().replace(1, 5)
df = df[df.x > 3][["x", "y"]]

df.optimize(fuse=False).pprint()

Before this PR:

In main, we currently perform DropnaFrame and Replace on all columns of pdf, even though we know that we can drop column "z" immediately.

Filter:
  Projection: columns=['x', 'y']
    Replace: to_replace=1 value=5
      DropnaFrame:
        FromPandas: frame='<pandas>' npartitions=3
  GT: right=3
    Projection: columns='x'
      Replace: to_replace=1 value=5
        DropnaFrame:
          FromPandas: frame='<pandas>' npartitions=3

After this PR:

By allowing FromPandas (and ReadCSV) to absorb column projections, we can produce much more optimal behavior.

Filter:
  Replace: to_replace=1 value=5
    DropnaFrame:
      FromPandas: frame='<pandas>' npartitions=3 columns=['x', 'y']
  GT: right=3
    Projection: columns='x'
      Replace: to_replace=1 value=5
        DropnaFrame:
          FromPandas: frame='<pandas>' npartitions=3 columns=['x', 'y']

@@ -39,19 +48,109 @@ def _layer(self):


class BlockwiseIO(Blockwise, IO):
pass
_absorb_projections = False
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that it probably makes sense to work with a BlockwiseIO subclass here (instead of adding _absorb_projections). However, I used the attribute to explore and haven't bothered to modify the approach yet. I'll be happy to revise.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's fine for now

@@ -1202,6 +1198,14 @@ def _simplify_up(self, parent):
type(self)(self.frame[sorted(columns)], *self.operands[1:]),
*parent.operands[1:],
)
elif isinstance(parent, Projection):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't do this. We drop rows if any column contains a missing value. The operation will change if we remove columns that could potentially contain NA values

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, Good catch! I don't know what I was thinking :)

@@ -1191,6 +1183,10 @@ class DropnaFrame(Blockwise):
_keyword_only = ["how", "subset", "thresh"]
operation = M.dropna

@property
def _projection_passthrough(self):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

Copy link
Collaborator

@phofl phofl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some comments

}
_absorb_projections = False
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a TODO for the future?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, forgot about this. Just submitted #268

@@ -39,19 +48,109 @@ def _layer(self):


class BlockwiseIO(Blockwise, IO):
pass
_absorb_projections = False
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's fine for now

@@ -1615,6 +1619,9 @@ def _simplify_down(self):
if (
str(self.frame.columns) == str(self.columns)
and self._meta.ndim == self.frame._meta.ndim
and not (
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure I understand why this is necessary. Can you elaborate?

Why can't we return self.frame if the columns are the same anyway?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My original thinking here was that if self.frame is a BlockwiseIO expression that can "absorb" projections, then we want to make sure self.frame actually absorbs the projection (by applying its own simplify_up logic).

With that said, you are correct that "absorbing" the projection should not really change anything if the first two criteria of this if statement are True. Therefore, we probably can/should revert this change. Note that I submitted #267 to do this (where I included a necessary bug fix).

@phofl
Copy link
Collaborator

phofl commented Aug 4, 2023

@rjzamora I've removed the dropna changes, but the rest is good as is for now. We can address my remaining comments after you are back

@phofl phofl merged commit 1988177 into dask:main Aug 4, 2023
@rjzamora rjzamora deleted the absorb-projections branch August 15, 2023 17:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants