Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Series.quantile #517

Merged
merged 7 commits into from
Dec 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions dask_expr/_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
no_default,
)
from dask_expr._merge import JoinRecursive, Merge
from dask_expr._quantile import SeriesQuantile
from dask_expr._quantiles import RepartitionQuantiles
from dask_expr._reductions import (
DropDuplicates,
Expand Down Expand Up @@ -1391,6 +1392,25 @@ def groupby(self, by, **kwargs):
def rename(self, index, sorted_index=False):
return new_collection(expr.RenameSeries(self.expr, index, sorted_index))

def quantile(self, q=0.5, method="default"):
Copy link
Collaborator

@crusaderky crusaderky Dec 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of scope: this could get easily confused with the method parameter of pandas.DataFrame.quantile.
We should rename it both here and in dask/dask to dask_method or something.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep good point, but I’d rather do this not with all the other changes

"""Approximate quantiles of Series

Parameters
----------
q : list/array of floats, default 0.5 (50%)
Iterable of numbers ranging from 0 to 1 for the desired quantiles
method : {'default', 'tdigest', 'dask'}, optional
What method to use. By default will use dask's internal custom
algorithm (``'dask'``). If set to ``'tdigest'`` will use tdigest
for floats and ints and fallback to the ``'dask'`` otherwise.
"""
if not pd.api.types.is_numeric_dtype(self.dtype):
raise TypeError(f"quantile() on non-numeric dtype {self.dtype}")
allowed_methods = ["default", "dask", "tdigest"]
if method not in allowed_methods:
raise ValueError("method can only be 'default', 'dask' or 'tdigest'")
return new_collection(SeriesQuantile(self.expr, q, method))

@property
def is_monotonic_increasing(self):
return new_collection(IsMonotonicIncreasing(self.expr))
Expand Down
133 changes: 133 additions & 0 deletions dask_expr/_quantile.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
import functools

import numpy as np
from dask.dataframe.dispatch import make_meta, meta_nonempty
from dask.utils import import_required, is_series_like

from dask_expr._expr import DropnaSeries, Expr


def _finalize_scalar_result(cons, *args, **kwargs):
return cons(*args, **kwargs)[0]


class SeriesQuantile(Expr):
_parameters = ["frame", "q", "method"]
_defaults = {"method": "default"}

@functools.cached_property
def q(self):
q = np.array(self.operand("q"))
if q.ndim > 0:
assert len(q) > 0, f"must provide non-empty q={q}"
q.sort(kind="mergesort")
return q
return np.asarray([self.operand("q")])

@functools.cached_property
def method(self):
if self.operand("method") == "default":
return "dask"
else:
return self.operand("method")

@functools.cached_property
def _meta(self):
meta = self.frame._meta
if not is_series_like(self.frame._meta):
meta = meta.to_series()
return make_meta(meta_nonempty(meta).quantile(self.operand("q")))

def _divisions(self):
if is_series_like(self._meta):
return (np.min(self.q), np.max(self.q))
return (None, None)

@functools.cached_property
def _constructor(self):
meta = self.frame._meta
if not is_series_like(self.frame._meta):
meta = meta.to_series()
return meta._constructor

@functools.cached_property
def _finalizer(self):
if is_series_like(self._meta):
return lambda tsk: (
self._constructor,
tsk,
self.q,
None,
self.frame._meta.name,
)
else:
return lambda tsk: (_finalize_scalar_result, self._constructor, tsk, [0])

def _lower(self):
frame = DropnaSeries(self.frame)
if self.method == "tdigest":
return SeriesQuantileTdigest(
frame, self.operand("q"), self.operand("method")
)
else:
return SeriesQuantileDask(frame, self.operand("q"), self.operand("method"))


class SeriesQuantileTdigest(SeriesQuantile):
@functools.cached_property
def _meta(self):
import_required(
"crick", "crick is a required dependency for using the tdigest method."
)
return super()._meta

def _layer(self) -> dict:
from dask.array.percentile import _percentiles_from_tdigest, _tdigest_chunk

dsk = {}
for i in range(self.frame.npartitions):
dsk[("chunk-" + self._name, i)] = (
_tdigest_chunk,
(getattr, (self.frame._name, i), "values"),
)

dsk[(self._name, 0)] = self._finalizer(
(_percentiles_from_tdigest, self.q * 100, sorted(dsk))
)
return dsk

def _lower(self):
return None


class SeriesQuantileDask(SeriesQuantile):
def _layer(self) -> dict:
from dask.array.dispatch import percentile_lookup as _percentile
from dask.array.percentile import merge_percentiles

dsk = {}
# Add 0 and 100 during calculation for more robust behavior (hopefully)
calc_qs = np.pad(self.q * 100, 1, mode="constant")
calc_qs[-1] = 100

for i in range(self.frame.npartitions):
dsk[("chunk-" + self._name, i)] = (
_percentile,
(self.frame._name, i),
calc_qs,
)
dsk[(self._name, 0)] = self._finalizer(
(
merge_percentiles,
self.q * 100,
[calc_qs] * self.frame.npartitions,
sorted(dsk),
"lower",
None,
False,
)
)
return dsk

def _lower(self):
return None
20 changes: 20 additions & 0 deletions dask_expr/tests/test_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -1657,6 +1657,26 @@ def func(x):
assert_eq(result, expected, check_index=False)


def test_quantile(df):
assert_eq(df.x.quantile(), 49.0)
assert_eq(df.x.quantile(method="dask"), 49.0)
assert_eq(
df.x.quantile(q=[0.2, 0.8]),
lib.Series([19.0, 79.0], index=[0.2, 0.8], name="x"),
)
assert_eq(
df.x.index.quantile(q=[0.2, 0.8]),
lib.Series([19.0, 79.0], index=[0.2, 0.8]),
)

with pytest.raises(AssertionError):
df.x.quantile(q=[]).compute()
Comment on lines +1672 to +1673
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pandas returns an empty series here. Doubt we care to diverge on such an edge case though.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I thought this was kind of pointless, but I might put up a follow up anyway


ser = from_pandas(lib.Series(["a", "b", "c"]), npartitions=2)
with pytest.raises(TypeError, match="on non-numeric"):
ser.quantile()


def test_map_overlap_raises():
def func(x):
x = x + x.sum()
Expand Down