Skip to content

Commit

Permalink
SNOW-1176072: Add partial SeriesGroupBy.apply support (snowflakedb#1508)
Browse files Browse the repository at this point in the history
Please answer these questions before submitting your pull requests.
Thanks!

1. What GitHub issue is this PR addressing? Make sure that there is an
accompanying issue to your PR.

   Fixes SNOW-1176072

2. Fill out the following pre-review checklist:

- [x] I am adding a new automated test(s) to verify correctness of my
new code
   - [ ] I am adding new logging messages
   - [ ] I am adding a new telemetry message
   - [ ] I am adding new credentials
   - [ ] I am adding a new dependency

3. Please describe how your code solves the related issue.

   Add partial SeriesGroupBy.apply support.
  • Loading branch information
sfc-gh-helmeleegy authored May 7, 2024
1 parent 6e0e1da commit d3b3bcc
Show file tree
Hide file tree
Showing 7 changed files with 309 additions and 69 deletions.
7 changes: 4 additions & 3 deletions docs/source/modin/supported/groupby_supported.rst
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,7 @@ Function application
| ``apply`` | P | ``axis`` other than 0 is not | ``Y`` if the following are true, otherwise ``N``: |
| | | implemented. | - ``func`` is a callable that always returns |
| | | | either a pandas DataFrame, a pandas Series, or |
| | | ``SeriesGroupBy.apply`` is not | objects that are neither DataFrame nor Series. |
| | | implemented. | - ``apply`` called on DataFrameGroupBy, not |
| | | | SeriesGroupBy |
| | | | objects that are neither DataFrame nor Series. |
| | | | - grouping on axis=0 |
| | | | - Not applying transform to a dataframe with a |
| | | | non-unique index |
Expand All @@ -58,6 +56,9 @@ Function application
| | | | row at a given position |
| | | | - Not applying ``func`` that returns two |
| | | | Series that have different names |
| | | | - Not grouping by an "external" by, i.e. an |
| | | | object that is not a label for a column or |
| | | | level of the dataframe |
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
| ``filter`` | N | | |
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
Expand Down
24 changes: 22 additions & 2 deletions src/snowflake/snowpark/modin/pandas/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ def apply(self, func, *args, **kwargs):
groupby_kwargs=self._kwargs,
agg_args=args,
agg_kwargs=kwargs,
series_groupby=False,
)
)
if dataframe_result.columns.equals(pandas.Index([MODIN_UNNAMED_SERIES_LABEL])):
Expand Down Expand Up @@ -1171,9 +1172,28 @@ def unique(self):
ErrorMessage.method_not_implemented_error(name="unique", class_="GroupBy")

def apply(self, func, *args, **kwargs):
"""Not implemented yet"""
# TODO: SNOW-1063349: Modin upgrade - modin.pandas.groupby.SeriesGroupBy functions
ErrorMessage.not_implemented("apply is not implemented for SeriesGroupBy")
if not callable(func):
raise NotImplementedError("No support for non-callable `func`")
dataframe_result = pd.DataFrame(
query_compiler=self._query_compiler.groupby_apply(
self._by,
agg_func=func,
axis=self._axis,
groupby_kwargs=self._kwargs,
agg_args=args,
agg_kwargs=kwargs,
# TODO(https://github.com/modin-project/modin/issues/7096):
# upstream the series_groupby param to Modin
series_groupby=True,
)
)
if dataframe_result.columns.equals(pandas.Index([MODIN_UNNAMED_SERIES_LABEL])):
# rename to the last column of self._df
# note that upstream modin does not do this yet due to
# https://github.com/modin-project/modin/issues/7097
return dataframe_result.squeeze(axis=1).rename(self._df.columns[-1])
return dataframe_result


def validate_groupby_args(
Expand Down
3 changes: 3 additions & 0 deletions src/snowflake/snowpark/modin/plugin/PANDAS_CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@
### Behavior Changes
- As a part of the transition to pandas 2.2.1, pandas `df.loc` and `__setitem__` have buggy behavior when a column key is used to assign a DataFrame item to a DataFrame (a scalar column key and DataFrame item are used for assignment (https://github.com/pandas-dev/pandas/issues/58482)). Snowpark pandas deviates from this behavior and will maintain the same behavior as pandas from versions 1.5.x.

### New Features
- Added partial support for `SeriesGroupBy.apply` (where the `SeriesGrouBy` is obtained through `DataFrameGroupBy.__getitem__`).

## 1.14.0a2 (2024-04-18)

### Behavior Changes
Expand Down
130 changes: 89 additions & 41 deletions src/snowflake/snowpark/modin/plugin/_internal/apply_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,14 +385,19 @@ def create_udtf_for_groupby_apply(
input_data_column_types: list[DataType],
input_index_column_types: list[DataType],
session: Session,
series_groupby: bool,
by_types: list[DataType],
) -> UserDefinedTableFunction:
"""
Create a UDTF from the Python function for groupby.apply.
The UDTF takes as input the following columns in the listed order:
1. The original row position within the dataframe (not just within the group)
2. All the index columns
3. All the data columns
2. All the by columns (these are constant across the group, but in the case
# of SeriesGroupBy, we need these so we can name each input series by the
# group label)
3. All the index columns
4. All the data columns
The UDF returns as output the following columns in the listed order. There is
one row per result row and per result column.
Expand Down Expand Up @@ -468,13 +473,19 @@ def create_udtf_for_groupby_apply(
index_column_names: Names of the input dataframe's index
input_data_column_types: Types of the input dataframe's data columns
input_index_column_types: Types of the input dataframe's index columns
series_groupby: Whether we are performing a SeriesGroupBy.apply() instead of DataFrameGroupBy.apply()
by_types: The snowflake types of the by columns.
Returns
-------
A UDTF that will apply the provided function to a group and return a
dataframe representing all of the data and metadata of the result.
"""

# Get the length of this list outside of the vUDTF function because the vUDTF
# doesn't have access to the Snowpark module, which defines these types.
num_by = len(by_types)

class ApplyFunc:
def end_partition(self, df: native_pd.DataFrame): # type: ignore[no-untyped-def] # pragma: no cover: adding type hint causes an error when creating udtf. also, skip coverage for this function because coverage tools can't tell that we're executing this function because we execute it in a UDTF.
"""
Expand All @@ -489,57 +500,92 @@ def end_partition(self, df: native_pd.DataFrame): # type: ignore[no-untyped-def
A dataframe representing the result of applying the user-provided
function to this group.
"""
# First column is row position, save it for later.
row_positions = df.iloc[:, 0]
df = df.iloc[:, 1:]
current_column_position = 0

# First column is row position. Save it for later.
row_position_column_number = 0
row_positions = df.iloc[:, row_position_column_number]
current_column_position = row_position_column_number + 1

# The next columns are the by columns. Since we are only looking at
# one group, every row in the by columns is the same, so get the
# group label from the first row.
group_label = tuple(
df.iloc[0, current_column_position : current_column_position + num_by]
)
current_column_position = current_column_position + num_by
if len(group_label) == 1:
group_label = group_label[0]

df = df.iloc[:, current_column_position:]
# Snowflake names the original columns "ARG1", "ARG2", ... "ARGN".
# the columns after the by columns are the index columns.
df.set_index(
[f"ARG{i}" for i in range(2, len(index_column_names) + 2)], inplace=True
[
f"ARG{i}"
for i in range(
1 + current_column_position,
1 + current_column_position + len(index_column_names),
)
],
inplace=True,
)
df.index.names = index_column_names
df.columns = data_column_index
if series_groupby:
# For SeriesGroupBy, there should be only one data column.
num_columns = len(df.columns)
assert (
num_columns == 1
), f"Internal error: SeriesGroupBy func should apply to series, but input data had {num_columns} columns."
input_object = df.iloc[:, 0].rename(group_label)
else:
input_object = df.set_axis(data_column_index, axis="columns")
# Use infer_objects() because integer columns come as floats
# TODO: file snowpark bug about that. Asked about this here:
# https://github.com/snowflakedb/snowpandas/pull/823/files#r1507286892
input_df = df.infer_objects()
func_result = func(input_df, *args, **kwargs)
input_object = input_object.infer_objects()
func_result = func(input_object, *args, **kwargs)
if isinstance(func_result, native_pd.Series):
# If function returns series, we have to transpose the series
# and change its metadata a little bit, but after that we can
# continue largely as if the function has returned a dataframe.
#
# If the series has a 1-dimensional index, the series name
# becomes the name of the column index. For example, if
# `func` returned the series native_pd.Series([1], name='a'):
#
# 0 1
# Name: a, dtype: int64
#
# The result needs to use the dataframe
# pd.DataFrame([1], columns=pd.Index([0], name='a'):
#
# a 0
# 0 1
#
name = func_result.name
func_result.name = None
func_result_as_frame = func_result.to_frame().T
if func_result_as_frame.columns.nlevels == 1:
func_result_as_frame.columns.name = name
if series_groupby:
func_result_as_frame = func_result.to_frame()
func_result_as_frame.columns = [MODIN_UNNAMED_SERIES_LABEL]
else:
# If function returns series, we have to transpose the series
# and change its metadata a little bit, but after that we can
# continue largely as if the function has returned a dataframe.
#
# If the series has a 1-dimensional index, the series name
# becomes the name of the column index. For example, if
# `func` returned the series native_pd.Series([1], name='a'):
#
# 0 1
# Name: a, dtype: int64
#
# The result needs to use the dataframe
# pd.DataFrame([1], columns=pd.Index([0], name='a'):
#
# a 0
# 0 1
#
name = func_result.name
func_result.name = None
func_result_as_frame = func_result.to_frame().T
if func_result_as_frame.columns.nlevels == 1:
func_result_as_frame.columns.name = name
return convert_groupby_apply_dataframe_result_to_standard_schema(
input_df,
input_object,
func_result_as_frame,
row_positions,
# We don't need to include any information
# about the index of `func_result_as_frame`.
# The series only has one index, and that
# index becomes the columns of
# `func_result_as_frame`.
include_index_columns=False,
# For DataFrameGroupBy, we don't need to include any
# information about the index of `func_result_as_frame`.
# The series only has one index, and that index becomes the
# columns of `func_result_as_frame`. For SeriesGroupBy, we
# do include the result's index in the result.
include_index_columns=series_groupby,
)
if isinstance(func_result, native_pd.DataFrame):
return convert_groupby_apply_dataframe_result_to_standard_schema(
input_df, func_result, row_positions, include_index_columns=True
input_object, func_result, row_positions, include_index_columns=True
)
# At this point, we know the function result was not a DataFrame
# or Series
Expand All @@ -561,9 +607,11 @@ def end_partition(self, df: native_pd.DataFrame): # type: ignore[no-untyped-def
# first input column is the integer row number. the row number integer
# becomes a float inside the UDTF due to SNOW-1184587
LongType(),
# the next columns are the index columns...
# the next columns are the by columns...
*by_types,
# then the index columns for the input dataframe or series...
*input_index_column_types,
# ...then come the data columns.
# ...then the data columns for the input dataframe or series.
*input_data_column_types,
]
return udtf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2752,6 +2752,7 @@ def groupby_apply(
groupby_kwargs: dict[str, Any],
agg_args: Any,
agg_kwargs: dict[str, Any],
series_groupby: bool,
) -> "SnowflakeQueryCompiler":
"""
Group according to `by` and `level`, apply a function to each group, and combine the results.
Expand All @@ -2770,6 +2771,8 @@ def groupby_apply(
Positional arguments to pass to agg_func when applying it to each group.
agg_kwargs:
Keyword arguments to pass to agg_func when applying it to each group.
series_groupby:
Whether we are performing a SeriesGroupBy.apply() instead of a DataFrameGroupBy.apply()

Returns
-------
Expand Down Expand Up @@ -2799,6 +2802,17 @@ def groupby_apply(

snowflake_type_map = self._modin_frame.quoted_identifier_to_snowflake_type()

# For DataFrameGroupBy, `func` operates on this frame in its entirety.
# For SeriesGroupBy, this frame may also include some grouping columns
# that `func` should not take as input. In that case, the only column
# that `func` takes as input is the last data column, so grab just that
# column with a slice starting at index -1 and ending at None.
input_data_column_identifiers = (
self._modin_frame.data_column_snowflake_quoted_identifiers[
slice(-1, None) if series_groupby else slice(None)
]
)

# TODO(SNOW-1210489): When type hints show that `agg_func` returns a
# scalar, we can use a vUDF instead of a vUDTF and we can skip the
# pivot.
Expand All @@ -2810,13 +2824,18 @@ def groupby_apply(
index_column_names=self._modin_frame.index_column_pandas_labels,
input_data_column_types=[
snowflake_type_map[quoted_identifier]
for quoted_identifier in self._modin_frame.data_column_snowflake_quoted_identifiers
for quoted_identifier in input_data_column_identifiers
],
input_index_column_types=[
snowflake_type_map[quoted_identifier]
for quoted_identifier in self._modin_frame.index_column_snowflake_quoted_identifiers
],
session=self._modin_frame.ordered_dataframe.session,
series_groupby=series_groupby,
by_types=[
snowflake_type_map[quoted_identifier]
for quoted_identifier in by_snowflake_quoted_identifiers_list
],
)

new_internal_df = self._modin_frame.ensure_row_position_column()
Expand Down Expand Up @@ -2884,8 +2903,9 @@ def groupby_apply(
*by_snowflake_quoted_identifiers_list,
udtf(
row_position_snowflake_quoted_identifier,
*by_snowflake_quoted_identifiers_list,
*new_internal_df.index_column_snowflake_quoted_identifiers,
*new_internal_df.data_column_snowflake_quoted_identifiers,
*input_data_column_identifiers,
).over(
partition_by=[
*by_snowflake_quoted_identifiers_list,
Expand Down
4 changes: 2 additions & 2 deletions src/snowflake/snowpark/modin/plugin/docstrings/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -795,7 +795,7 @@ def apply():
"""
Apply function ``func`` group-wise and combine the results together.

The function passed to ``apply`` must take a dataframe as its first
The function passed to ``apply`` must take a dataframe or series as its first
argument and return a DataFrame, Series or scalar. ``apply`` will
then take care of combining the results back together into a single
dataframe or series. ``apply`` is therefore a highly flexible
Expand All @@ -810,7 +810,7 @@ def apply():
Parameters
----------
func : callable
A callable that takes a dataframe as its first argument, and
A callable that takes a dataframe or series as its first argument, and
returns a dataframe, a series or a scalar. In addition the
callable may take positional and keyword arguments.
args, kwargs : tuple and dict
Expand Down
Loading

0 comments on commit d3b3bcc

Please sign in to comment.