Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

When adding collumns from 2 dataframes will not compute in some instances, fix for one instance seems to break the other #11395

Open
JimHBeam opened this issue Sep 18, 2024 · 1 comment · May be fixed by dask/dask-expr#1143

Comments

@JimHBeam
Copy link

JimHBeam commented Sep 18, 2024

Describe the issue:
Upgrading to pandas 2, existing code. WE add some collumns together as part of our process some data seems to require a compute mid process but this then causes other data to fail, this seems buggy, there are 2 errors generated in the code below, it seems the fix for one bit of data breaks the process for the other data.

Minimal Complete Verifiable Example:

import pandas as pd
import dask.dataframe as dd

preds1=pd.DataFrame({
    "prediction_probability":[1.0] * 2,
    "prediction":  [1,1],
    "num_runs": [1,1],
    "Idx":[1,4],
})

preds1=preds1.set_index("Idx")


ads1=pd.DataFrame({
    "prediction_probability":[1.0] * 2,
    "prediction":  [1,1],
    "num_runs": [1,1,],
    "Idx":[1,4],
})

ads1=ads1.set_index("Idx")

preds2=pd.DataFrame({
    "prediction_probability":[1.0] * 4,
    "prediction":  [1,1,1,1],
    "num_runs": [1,1,1,1],
    "Idx":[1,2,3,4],
})

preds2=preds2.set_index("Idx")


ads2=pd.DataFrame({
    "prediction_probability":[1.0] * 2,
    "prediction":  [1,1],
    "num_runs": [1,1],
    "Idx":[1,2],
})

ads2=ads2.set_index("Idx")

# computing at end
# this works
preds_dd = dd.from_pandas(preds1)
ads_dd = dd.from_pandas(ads1)
preds_dd["prediction"] = preds_dd.prediction.add(
                    ads_dd.prediction, fill_value=0
)
print(preds_dd.compute())

# this fails
preds_dd = dd.from_pandas(preds2)
ads_dd = dd.from_pandas(ads2)
preds_dd["prediction"] = preds_dd.prediction.add(
                    ads_dd.prediction, fill_value=0
)
print(preds_dd.compute())


# extra compute in the middle on the series 

# this fails
preds_dd = dd.from_pandas(preds1)
ads_dd = dd.from_pandas(ads1)
preds_dd["prediction"] = preds_dd.prediction.add(
                    ads_dd.prediction.compute(), fill_value=0
)

print(preds_dd.compute())

# this works
preds_dd = dd.from_pandas(preds1)
ads_dd = dd.from_pandas(ads1)
preds_dd["prediction"] = preds_dd.prediction.add(
                    ads_dd.prediction.compute(), fill_value=0
)

print(preds_dd.compute())

Anything else we need to know?:
stack trace

---------------------------------------------------------------------------
AssertionError                            Traceback (most recent call last)
Cell In[6], line 6
      2 ads_dd = dd.from_pandas(ads1)
      3 preds_dd["prediction"] = preds_dd.prediction.add(
      4                     ads_dd.prediction.compute(), fill_value=0
      5 )
----> 6 print(preds_dd.compute())

File ~/git/ukho-bathymetry-ml/ukho-bathymetry-ml/.venv/lib/python3.10/site-packages/dask_expr/_collection.py:476, in FrameBase.compute(self, fuse, **kwargs)
    474 if not isinstance(out, Scalar):
    475     out = out.repartition(npartitions=1)
--> 476 out = out.optimize(fuse=fuse)
    477 return DaskMethodsMixin.compute(out, **kwargs)

File ~/git/ukho-bathymetry-ml/ukho-bathymetry-ml/.venv/lib/python3.10/site-packages/dask_expr/_collection.py:591, in FrameBase.optimize(self, fuse)
    573 def optimize(self, fuse: bool = True):
    574     """Optimizes the DataFrame.
    575 
    576     Runs the optimizer with all steps over the DataFrame and wraps the result in a
   (...)
    589         The optimized Dask Dataframe
    590     """
--> 591     return new_collection(self.expr.optimize(fuse=fuse))

File ~/git/ukho-bathymetry-ml/ukho-bathymetry-ml/.venv/lib/python3.10/site-packages/dask_expr/_expr.py:94, in Expr.optimize(self, **kwargs)
     93 def optimize(self, **kwargs):
---> 94     return optimize(self, **kwargs)

File ~/git/ukho-bathymetry-ml/ukho-bathymetry-ml/.venv/lib/python3.10/site-packages/dask_expr/_expr.py:3070, in optimize(expr, fuse)
   3049 """High level query optimization
   3050 
   3051 This leverages three optimization passes:
   (...)
   3066 optimize_blockwise_fusion
   3067 """
   3068 stage: core.OptimizerStage = "fused" if fuse else "simplified-physical"
-> 3070 return optimize_until(expr, stage)

File ~/git/ukho-bathymetry-ml/ukho-bathymetry-ml/.venv/lib/python3.10/site-packages/dask_expr/_expr.py:3031, in optimize_until(expr, stage)
   3028     return expr
   3030 # Lower
-> 3031 expr = expr.lower_completely()
   3032 if stage == "physical":
   3033     return expr

File ~/git/ukho-bathymetry-ml/ukho-bathymetry-ml/.venv/lib/python3.10/site-packages/dask_expr/_core.py:447, in Expr.lower_completely(self)
    445 lowered = {}
    446 while True:
--> 447     new = expr.lower_once(lowered)
    448     if new._name == expr._name:
    449         break

File ~/git/ukho-bathymetry-ml/ukho-bathymetry-ml/.venv/lib/python3.10/site-packages/dask_expr/_core.py:402, in Expr.lower_once(self, lowered)
    399 expr = self
    401 # Lower this node
--> 402 out = expr._lower()
    403 if out is None:
    404     out = expr

File ~/git/ukho-bathymetry-ml/ukho-bathymetry-ml/.venv/lib/python3.10/site-packages/dask_expr/_expr.py:3435, in MaybeAlignPartitions._lower(self)
   3432 def _lower(self):
   3433     # This can be expensive when something that has expensive division
   3434     # calculation is in the Expression
-> 3435     dfs = self.args
   3436     if (
   3437         len(dfs) == 1
   3438         or all(
   (...)
   3441         or len(self.divisions) == 2
   3442     ):
   3443         return self._expr_cls(*self.operands)

File [~/.pyenv/versions/3.10.14/lib/python3.10/functools.py:981](http://localhost:8888/home/honej/.pyenv/versions/3.10.14/lib/python3.10/functools.py#line=980), in cached_property.__get__(self, instance, owner)
    979 val = cache.get(self.attrname, _NOT_FOUND)
    980 if val is _NOT_FOUND:
--> 981     val = self.func(instance)
    982     try:
    983         cache[self.attrname] = val

File ~/git/ukho-bathymetry-ml/ukho-bathymetry-ml/.venv/lib/python3.10/site-packages/dask_expr/_expr.py:3430, in MaybeAlignPartitions.args(self)
   3427 @functools.cached_property
   3428 def args(self):
   3429     dfs = [op for op in self.operands if isinstance(op, Expr)]
-> 3430     return [op for op in dfs if not is_broadcastable(dfs, op)]

File ~/git/ukho-bathymetry-ml/ukho-bathymetry-ml/.venv/lib/python3.10/site-packages/dask_expr/_expr.py:3430, in <listcomp>(.0)
   3427 @functools.cached_property
   3428 def args(self):
   3429     dfs = [op for op in self.operands if isinstance(op, Expr)]
-> 3430     return [op for op in dfs if not is_broadcastable(dfs, op)]

File ~/git/ukho-bathymetry-ml/ukho-bathymetry-ml/.venv/lib/python3.10/site-packages/dask_expr/_expr.py:3086, in is_broadcastable(dfs, s)
   3081     except (TypeError, ValueError):
   3082         return False
   3084 return (
   3085     s.ndim == 1
-> 3086     and s.npartitions == 1
   3087     and s.known_divisions
   3088     and any(compare(s, df) for df in dfs if df.ndim == 2)
   3089     or s.ndim == 0
   3090 )

File ~/git/ukho-bathymetry-ml/ukho-bathymetry-ml/.venv/lib/python3.10/site-packages/dask_expr/_expr.py:398, in Expr.npartitions(self)
    396     return self.operands[idx]
    397 else:
--> 398     return len(self.divisions) - 1

File [~/.pyenv/versions/3.10.14/lib/python3.10/functools.py:981](http://localhost:8888/home/honej/.pyenv/versions/3.10.14/lib/python3.10/functools.py#line=980), in cached_property.__get__(self, instance, owner)
    979 val = cache.get(self.attrname, _NOT_FOUND)
    980 if val is _NOT_FOUND:
--> 981     val = self.func(instance)
    982     try:
    983         cache[self.attrname] = val

File ~/git/ukho-bathymetry-ml/ukho-bathymetry-ml/.venv/lib/python3.10/site-packages/dask_expr/_expr.py:382, in Expr.divisions(self)
    380 @functools.cached_property
    381 def divisions(self):
--> 382     return tuple(self._divisions())

File ~/git/ukho-bathymetry-ml/ukho-bathymetry-ml/.venv/lib/python3.10/site-packages/dask_expr/_expr.py:2633, in Binop._divisions(self)
   2631     return tuple(self.operation(left_divisions, right_divisions))
   2632 else:
-> 2633     return super()._divisions()

File ~/git/ukho-bathymetry-ml/ukho-bathymetry-ml/.venv/lib/python3.10/site-packages/dask_expr/_expr.py:530, in Blockwise._divisions(self)
    528 for arg in dependencies:
    529     if not self._broadcast_dep(arg):
--> 530         assert arg.divisions == dependencies[0].divisions
    531 return dependencies[0].divisions

AssertionError:

Environment:

  • Dask version: 2024.8.2
  • Python version: 3.10.14
  • Operating System:Ubuntu
  • Install method (conda, pip, source):Poetry
@github-actions github-actions bot added the needs triage Needs a response from a contributor label Sep 18, 2024
@JimHBeam JimHBeam changed the title When adding collumns from 2 dataframes will not compute in some instances, incosistent with different data When adding collumns from 2 dataframes will not compute in some instances, fix for one instance seems to break the other Sep 18, 2024
@JimHBeam
Copy link
Author

I've got round this by checking to see if the divisions are equal, if they are not I have to compute the series before I add it, if they are the same that will break it so I have to not compute the series

@phofl phofl linked a pull request Oct 4, 2024 that will close this issue
@phofl phofl added dask-expr and removed needs triage Needs a response from a contributor labels Nov 7, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants