Skip to content

Commit

Permalink
[SPARK-43562][SPARK-43870][PS] Remove APIs from DataFrame and Series
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This PR proposes to remove DataFrame/Series APIs that removed from [pandas 2](https://pandas.pydata.org/docs/dev/whatsnew/v2.0.0.html) and above.

### Why are the changes needed?

To match the behavior to pandas.

### Does this PR introduce _any_ user-facing change?

(DataFrame|Series).(iteritems|mad|append) will be removed.

### How was this patch tested?

Enabling the existing tests.

Closes apache#42268 from itholic/pandas_remove_df_api.

Authored-by: itholic <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
  • Loading branch information
itholic authored and HyukjinKwon committed Aug 4, 2023
1 parent 26ed4fb commit 678f472
Show file tree
Hide file tree
Showing 15 changed files with 41 additions and 556 deletions.
11 changes: 11 additions & 0 deletions python/docs/source/migration_guide/pyspark_upgrade.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,17 @@
Upgrading PySpark
==================

Upgrading from PySpark 3.5 to 4.0
---------------------------------

* In Spark 4.0, ``DataFrame.iteritems`` has been removed from pandas API on Spark, use ``DataFrame.items`` instead.
* In Spark 4.0, ``Series.iteritems`` has been removed from pandas API on Spark, use ``Series.items`` instead.
* In Spark 4.0, ``DataFrame.append`` has been removed from pandas API on Spark, use ``ps.concat`` instead.
* In Spark 4.0, ``Series.append`` has been removed from pandas API on Spark, use ``ps.concat`` instead.
* In Spark 4.0, ``DataFrame.mad`` has been removed from pandas API on Spark.
* In Spark 4.0, ``Series.mad`` has been removed from pandas API on Spark.


Upgrading from PySpark 3.3 to 3.4
---------------------------------

Expand Down
3 changes: 0 additions & 3 deletions python/docs/source/reference/pyspark.pandas/frame.rst
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ Indexing, iteration
DataFrame.iloc
DataFrame.insert
DataFrame.items
DataFrame.iteritems
DataFrame.iterrows
DataFrame.itertuples
DataFrame.keys
Expand Down Expand Up @@ -155,7 +154,6 @@ Computations / Descriptive Stats
DataFrame.ewm
DataFrame.kurt
DataFrame.kurtosis
DataFrame.mad
DataFrame.max
DataFrame.mean
DataFrame.min
Expand Down Expand Up @@ -252,7 +250,6 @@ Combining / joining / merging
.. autosummary::
:toctree: api/

DataFrame.append
DataFrame.assign
DataFrame.merge
DataFrame.join
Expand Down
1 change: 0 additions & 1 deletion python/docs/source/reference/pyspark.pandas/groupby.rst
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ Computations / Descriptive Stats
GroupBy.filter
GroupBy.first
GroupBy.last
GroupBy.mad
GroupBy.max
GroupBy.mean
GroupBy.median
Expand Down
3 changes: 0 additions & 3 deletions python/docs/source/reference/pyspark.pandas/series.rst
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ Indexing, iteration
Series.keys
Series.pop
Series.items
Series.iteritems
Series.item
Series.xs
Series.get
Expand Down Expand Up @@ -148,7 +147,6 @@ Computations / Descriptive Stats
Series.ewm
Series.filter
Series.kurt
Series.mad
Series.max
Series.mean
Series.min
Expand Down Expand Up @@ -247,7 +245,6 @@ Combining / joining / merging
.. autosummary::
:toctree: api/

Series.append
Series.compare
Series.replace
Series.update
Expand Down
204 changes: 1 addition & 203 deletions python/pyspark/pandas/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -1880,11 +1880,9 @@ def items(self) -> Iterator[Tuple[Name, "Series"]]:
polar bear 22000
koala marsupial 80000
>>> for label, content in df.iteritems():
>>> for label, content in df.items():
... print('label:', label)
... print('content:', content.to_string())
...
... # doctest: +SKIP
label: species
content: panda bear
polar bear
Expand Down Expand Up @@ -2057,20 +2055,6 @@ def extract_kv_from_spark_row(row: Row) -> Tuple[Name, Any]:
):
yield tuple(([k] if index else []) + list(v))

def iteritems(self) -> Iterator[Tuple[Name, "Series"]]:
"""
This is an alias of ``items``.
.. deprecated:: 3.4.0
iteritems is deprecated and will be removed in a future version.
Use .items instead.
"""
warnings.warn(
"Deprecated in 3.4.0, and will be removed in 4.0.0. Use DataFrame.items instead.",
FutureWarning,
)
return self.items()

def to_clipboard(self, excel: bool = True, sep: Optional[str] = None, **kwargs: Any) -> None:
"""
Copy object to the system clipboard.
Expand Down Expand Up @@ -8837,91 +8821,6 @@ def combine_first(self, other: "DataFrame") -> "DataFrame":
)
return DataFrame(internal)

def append(
self,
other: "DataFrame",
ignore_index: bool = False,
verify_integrity: bool = False,
sort: bool = False,
) -> "DataFrame":
"""
Append rows of other to the end of caller, returning a new object.
Columns in other that are not in the caller are added as new columns.
.. deprecated:: 3.4.0
Parameters
----------
other : DataFrame or Series/dict-like object, or list of these
The data to append.
ignore_index : boolean, default False
If True, do not use the index labels.
verify_integrity : boolean, default False
If True, raise ValueError on creating index with duplicates.
sort : boolean, default False
Currently not supported.
Returns
-------
appended : DataFrame
Examples
--------
>>> df = ps.DataFrame([[1, 2], [3, 4]], columns=list('AB'))
>>> df.append(df)
A B
0 1 2
1 3 4
0 1 2
1 3 4
>>> df.append(df, ignore_index=True)
A B
0 1 2
1 3 4
2 1 2
3 3 4
"""
warnings.warn(
"The DataFrame.append method is deprecated "
"and will be removed in 4.0.0. "
"Use pyspark.pandas.concat instead.",
FutureWarning,
)
if isinstance(other, ps.Series):
raise TypeError("DataFrames.append() does not support appending Series to DataFrames")
if sort:
raise NotImplementedError("The 'sort' parameter is currently not supported")

if not ignore_index:
index_scols = self._internal.index_spark_columns
if len(index_scols) != other._internal.index_level:
raise ValueError("Both DataFrames have to have the same number of index levels")

if (
verify_integrity
and len(index_scols) > 0
and (
self._internal.spark_frame.select(index_scols)
.intersect(
other._internal.spark_frame.select(other._internal.index_spark_columns)
)
.count()
)
> 0
):
raise ValueError("Indices have overlapping values")

# Lazy import to avoid circular dependency issues
from pyspark.pandas.namespace import concat

return cast(DataFrame, concat([self, other], ignore_index=ignore_index))

# TODO: add 'filter_func' and 'errors' parameter
def update(self, other: "DataFrame", join: str = "left", overwrite: bool = True) -> None:
"""
Expand Down Expand Up @@ -12719,107 +12618,6 @@ def explode(self, column: Name, ignore_index: bool = False) -> "DataFrame":
result_df: DataFrame = DataFrame(internal)
return result_df.reset_index(drop=True) if ignore_index else result_df

def mad(self, axis: Axis = 0) -> "Series":
"""
Return the mean absolute deviation of values.
.. deprecated:: 3.4.0
Parameters
----------
axis : {index (0), columns (1)}
Axis for the function to be applied on.
Examples
--------
>>> df = ps.DataFrame({'a': [1, 2, 3, np.nan], 'b': [0.1, 0.2, 0.3, np.nan]},
... columns=['a', 'b'])
>>> df.mad()
a 0.666667
b 0.066667
dtype: float64
>>> df.mad(axis=1) # doctest: +SKIP
0 0.45
1 0.90
2 1.35
3 NaN
dtype: float64
"""
warnings.warn(
"The 'mad' method is deprecated and will be removed in 4.0.0. "
"To compute the same result, you may do `(df - df.mean()).abs().mean()`.",
FutureWarning,
)
from pyspark.pandas.series import first_series

axis = validate_axis(axis)

if axis == 0:

def get_spark_column(psdf: DataFrame, label: Label) -> PySparkColumn:
scol = psdf._internal.spark_column_for(label)
col_type = psdf._internal.spark_type_for(label)

if isinstance(col_type, BooleanType):
scol = scol.cast("integer")

return scol

new_column_labels: List[Label] = []
for label in self._internal.column_labels:
# Filtering out only columns of numeric and boolean type column.
dtype = self._psser_for(label).spark.data_type
if isinstance(dtype, (NumericType, BooleanType)):
new_column_labels.append(label)

new_columns = [
F.avg(get_spark_column(self, label)).alias(name_like_string(label))
for label in new_column_labels
]

mean_data = self._internal.spark_frame.select(*new_columns).first()

new_columns = [
F.avg(
F.abs(get_spark_column(self, label) - mean_data[name_like_string(label)])
).alias(name_like_string(label))
for label in new_column_labels
]

sdf = self._internal.spark_frame.select(
*[F.lit(None).cast(StringType()).alias(SPARK_DEFAULT_INDEX_NAME)], *new_columns
)

# The data is expected to be small so it's fine to transpose/use the default index.
with ps.option_context("compute.max_rows", 1):
internal = InternalFrame(
spark_frame=sdf,
index_spark_columns=[scol_for(sdf, SPARK_DEFAULT_INDEX_NAME)],
column_labels=new_column_labels,
column_label_names=self._internal.column_label_names,
)
return first_series(DataFrame(internal).transpose())

else:

@pandas_udf(returnType=DoubleType()) # type: ignore[call-overload]
def calculate_columns_axis(*cols: pd.Series) -> pd.Series:
return pd.concat(cols, axis=1).mad(axis=1)

internal = self._internal.copy(
column_labels=[None],
data_spark_columns=[
calculate_columns_axis(*self._internal.data_spark_columns).alias(
SPARK_DEFAULT_SERIES_NAME
)
],
data_fields=[None],
column_label_names=None,
)
return first_series(DataFrame(internal))

def mode(self, axis: Axis = 0, numeric_only: bool = False, dropna: bool = True) -> "DataFrame":
"""
Get the mode(s) of each element along the selected axis.
Expand Down
81 changes: 0 additions & 81 deletions python/pyspark/pandas/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -991,87 +991,6 @@ def skew(self) -> FrameLike:
bool_to_numeric=True,
)

# TODO: 'axis', 'skipna', 'level' parameter should be implemented.
def mad(self) -> FrameLike:
"""
Compute mean absolute deviation of groups, excluding missing values.
.. versionadded:: 3.4.0
.. deprecated:: 3.4.0
Examples
--------
>>> df = ps.DataFrame({"A": [1, 2, 1, 1], "B": [True, False, False, True],
... "C": [3, 4, 3, 4], "D": ["a", "b", "b", "a"]})
>>> df.groupby("A").mad()
B C
A
1 0.444444 0.444444
2 0.000000 0.000000
>>> df.B.groupby(df.A).mad()
A
1 0.444444
2 0.000000
Name: B, dtype: float64
See Also
--------
pyspark.pandas.Series.groupby
pyspark.pandas.DataFrame.groupby
"""
warnings.warn(
"The 'mad' method is deprecated and will be removed in a future version. "
"To compute the same result, you may do `(group_df - group_df.mean()).abs().mean()`.",
FutureWarning,
)
groupkey_names = [SPARK_INDEX_NAME_FORMAT(i) for i in range(len(self._groupkeys))]
internal, agg_columns, sdf = self._prepare_reduce(
groupkey_names=groupkey_names,
accepted_spark_types=(NumericType, BooleanType),
bool_to_numeric=False,
)
psdf: DataFrame = DataFrame(internal)

if len(psdf._internal.column_labels) > 0:
window = Window.partitionBy(groupkey_names).rowsBetween(
Window.unboundedPreceding, Window.unboundedFollowing
)
new_agg_scols = {}
new_stat_scols = []
for agg_column in agg_columns:
# it is not able to directly use 'self._reduce_for_stat_function', due to
# 'it is not allowed to use a window function inside an aggregate function'.
# so we need to create temporary columns to compute the 'abs(x - avg(x))' here.
agg_column_name = agg_column._internal.data_spark_column_names[0]
new_agg_column_name = verify_temp_column_name(
psdf._internal.spark_frame, "__tmp_agg_col_{}__".format(agg_column_name)
)
casted_agg_scol = F.col(agg_column_name).cast("double")
new_agg_scols[new_agg_column_name] = F.abs(
casted_agg_scol - F.avg(casted_agg_scol).over(window)
)
new_stat_scols.append(F.avg(F.col(new_agg_column_name)).alias(agg_column_name))

sdf = (
psdf._internal.spark_frame.withColumns(new_agg_scols)
.groupby(groupkey_names)
.agg(*new_stat_scols)
)
else:
sdf = sdf.select(*groupkey_names).distinct()

internal = internal.copy(
spark_frame=sdf,
index_spark_columns=[scol_for(sdf, col) for col in groupkey_names],
data_spark_columns=[scol_for(sdf, col) for col in internal.data_spark_column_names],
data_fields=None,
)

return self._prepare_return(DataFrame(internal))

def sem(self, ddof: int = 1) -> FrameLike:
"""
Compute standard error of the mean of groups, excluding missing values.
Expand Down
Loading

0 comments on commit 678f472

Please sign in to comment.