-
-
Notifications
You must be signed in to change notification settings - Fork 27
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement Series.quantile #517
Changes from all commits
a4def92
9cd7e5c
60f67cc
6fdcaaa
fb0b229
2d76a98
73de720
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,133 @@ | ||
import functools | ||
|
||
import numpy as np | ||
from dask.dataframe.dispatch import make_meta, meta_nonempty | ||
from dask.utils import import_required, is_series_like | ||
|
||
from dask_expr._expr import DropnaSeries, Expr | ||
|
||
|
||
def _finalize_scalar_result(cons, *args, **kwargs): | ||
return cons(*args, **kwargs)[0] | ||
|
||
|
||
class SeriesQuantile(Expr): | ||
_parameters = ["frame", "q", "method"] | ||
_defaults = {"method": "default"} | ||
|
||
@functools.cached_property | ||
def q(self): | ||
q = np.array(self.operand("q")) | ||
if q.ndim > 0: | ||
assert len(q) > 0, f"must provide non-empty q={q}" | ||
q.sort(kind="mergesort") | ||
return q | ||
return np.asarray([self.operand("q")]) | ||
|
||
@functools.cached_property | ||
def method(self): | ||
if self.operand("method") == "default": | ||
return "dask" | ||
else: | ||
return self.operand("method") | ||
|
||
@functools.cached_property | ||
def _meta(self): | ||
meta = self.frame._meta | ||
if not is_series_like(self.frame._meta): | ||
meta = meta.to_series() | ||
return make_meta(meta_nonempty(meta).quantile(self.operand("q"))) | ||
|
||
def _divisions(self): | ||
if is_series_like(self._meta): | ||
return (np.min(self.q), np.max(self.q)) | ||
return (None, None) | ||
|
||
@functools.cached_property | ||
def _constructor(self): | ||
meta = self.frame._meta | ||
if not is_series_like(self.frame._meta): | ||
meta = meta.to_series() | ||
return meta._constructor | ||
|
||
@functools.cached_property | ||
def _finalizer(self): | ||
if is_series_like(self._meta): | ||
return lambda tsk: ( | ||
self._constructor, | ||
tsk, | ||
self.q, | ||
None, | ||
self.frame._meta.name, | ||
) | ||
else: | ||
return lambda tsk: (_finalize_scalar_result, self._constructor, tsk, [0]) | ||
|
||
def _lower(self): | ||
frame = DropnaSeries(self.frame) | ||
if self.method == "tdigest": | ||
return SeriesQuantileTdigest( | ||
frame, self.operand("q"), self.operand("method") | ||
) | ||
else: | ||
return SeriesQuantileDask(frame, self.operand("q"), self.operand("method")) | ||
|
||
|
||
class SeriesQuantileTdigest(SeriesQuantile): | ||
@functools.cached_property | ||
def _meta(self): | ||
import_required( | ||
"crick", "crick is a required dependency for using the tdigest method." | ||
) | ||
return super()._meta | ||
|
||
def _layer(self) -> dict: | ||
from dask.array.percentile import _percentiles_from_tdigest, _tdigest_chunk | ||
|
||
dsk = {} | ||
for i in range(self.frame.npartitions): | ||
dsk[("chunk-" + self._name, i)] = ( | ||
_tdigest_chunk, | ||
(getattr, (self.frame._name, i), "values"), | ||
) | ||
|
||
dsk[(self._name, 0)] = self._finalizer( | ||
(_percentiles_from_tdigest, self.q * 100, sorted(dsk)) | ||
) | ||
return dsk | ||
|
||
def _lower(self): | ||
return None | ||
|
||
|
||
class SeriesQuantileDask(SeriesQuantile): | ||
def _layer(self) -> dict: | ||
from dask.array.dispatch import percentile_lookup as _percentile | ||
from dask.array.percentile import merge_percentiles | ||
|
||
dsk = {} | ||
# Add 0 and 100 during calculation for more robust behavior (hopefully) | ||
calc_qs = np.pad(self.q * 100, 1, mode="constant") | ||
calc_qs[-1] = 100 | ||
|
||
for i in range(self.frame.npartitions): | ||
dsk[("chunk-" + self._name, i)] = ( | ||
_percentile, | ||
(self.frame._name, i), | ||
calc_qs, | ||
) | ||
dsk[(self._name, 0)] = self._finalizer( | ||
( | ||
merge_percentiles, | ||
self.q * 100, | ||
[calc_qs] * self.frame.npartitions, | ||
sorted(dsk), | ||
"lower", | ||
None, | ||
False, | ||
) | ||
) | ||
return dsk | ||
|
||
def _lower(self): | ||
return None |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1657,6 +1657,26 @@ def func(x): | |
assert_eq(result, expected, check_index=False) | ||
|
||
|
||
def test_quantile(df): | ||
assert_eq(df.x.quantile(), 49.0) | ||
assert_eq(df.x.quantile(method="dask"), 49.0) | ||
assert_eq( | ||
df.x.quantile(q=[0.2, 0.8]), | ||
lib.Series([19.0, 79.0], index=[0.2, 0.8], name="x"), | ||
) | ||
assert_eq( | ||
df.x.index.quantile(q=[0.2, 0.8]), | ||
lib.Series([19.0, 79.0], index=[0.2, 0.8]), | ||
) | ||
|
||
with pytest.raises(AssertionError): | ||
df.x.quantile(q=[]).compute() | ||
Comment on lines
+1672
to
+1673
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. pandas returns an empty series here. Doubt we care to diverge on such an edge case though. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah I thought this was kind of pointless, but I might put up a follow up anyway |
||
|
||
ser = from_pandas(lib.Series(["a", "b", "c"]), npartitions=2) | ||
with pytest.raises(TypeError, match="on non-numeric"): | ||
ser.quantile() | ||
|
||
|
||
def test_map_overlap_raises(): | ||
def func(x): | ||
x = x + x.sum() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Out of scope: this could get easily confused with the
method
parameter ofpandas.DataFrame.quantile
.We should rename it both here and in dask/dask to
dask_method
or something.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep good point, but I’d rather do this not with all the other changes