Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implemented dataframe.cov #2142

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 111 additions & 0 deletions databricks/koalas/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -4325,6 +4325,117 @@ def op(kser):

return self._apply_series_op(op)

def cov(self, min_periods: Optional[int] = None, ddof: Optional[int] = 1) -> "DataFrame":
"""
Compute pairwise covariance of columns, excluding NA/null values.
Compute the pairwise covariance among the series of a DataFrame.
The returned data frame is the `covariance matrix
<https://en.wikipedia.org/wiki/Covariance_matrix>`__ of the columns
of the DataFrame.
Both NA and null values are automatically excluded from the
calculation. (See the note below about bias from missing values.)
A threshold can be set for the minimum number of
observations for each value created. Comparisons with observations
below this threshold will be returned as ``NaN``.
This method is generally used for the analysis of time series data to
understand the relationship between different measures
across time.

.. note:: This method should only be used if the resulting pandas DataFrame is expected
to be small, as all the data is loaded into the driver's memory.

Parameters
----------
min_periods : int, optional
Minimum number of observations required per pair of columns
to have a valid result.
ddof : int, default 1
Delta degrees of freedom. The divisor used in calculations
is ``N - ddof``, where ``N`` represents the number of elements.

Returns
-------
DataFrame
The covariance matrix of the series of the DataFrame.
See Also
--------
Series.cov : Compute covariance with another Series.
core.window.ExponentialMovingWindow.cov: Exponential weighted sample covariance.
core.window.Expanding.cov : Expanding sample covariance.
core.window.Rolling.cov : Rolling sample covariance.
Notes
-----
Returns the covariance matrix of the DataFrame's time series.
The covariance is normalized by N-ddof.
For DataFrames that have Series that are missing data (assuming that
data is `missing at random
<https://en.wikipedia.org/wiki/Missing_data#Missing_at_random>`__)
the returned covariance matrix will be an unbiased estimate
of the variance and covariance between the member Series.
However, for many applications this estimate may not be acceptable
because the estimate covariance matrix is not guaranteed to be positive
semi-definite. This could lead to estimate correlations having
absolute values which are greater than one, and/or a non-invertible
covariance matrix. See `Estimation of covariance matrices
<https://en.wikipedia.org/w/index.php?title=Estimation_of_covariance_
matrices>`__ for more details.
Examples
--------
>>> kdf = ks.DataFrame([(1, 2), (0, 3), (2, 0), (1, 1)],
... columns=['dogs', 'cats'])
>>> kdf.cov()
dogs cats
dogs 0.666667 -1.000000
cats -1.000000 1.666667
>>> np.random.seed(42)
>>> kdf = ks.DataFrame(np.random.randn(1000, 5),
... columns=['a', 'b', 'c', 'd', 'e'])
>>> kdf.cov()
a b c d e
a 0.998438 -0.020161 0.059277 -0.008943 0.014144
b -0.020161 1.059352 -0.008543 -0.024738 0.009826
c 0.059277 -0.008543 1.010670 -0.001486 -0.000271
d -0.008943 -0.024738 -0.001486 0.921297 -0.013692
e 0.014144 0.009826 -0.000271 -0.013692 0.977795

**Minimum number of periods**

This method also supports an optional ``min_periods`` keyword
that specifies the required minimum number of non-NA observations for
each column pair in order to have a valid result:
>>> np.random.seed(42)
>>> kdf = ks.DataFrame(np.random.randn(20, 3),
... columns=['a', 'b', 'c'])
>>> kdf.loc[:5, 'a'] = np.nan
>>> kdf.loc[5:10, 'b'] = np.nan
>>> kdf.cov(min_periods=12)
a b c
a 0.331350 NaN -0.148156
b NaN 1.008785 0.164205
c -0.148156 0.164205 0.895202
"""
num_cols = [
label
for label in self._internal.column_labels
if isinstance(self._internal.spark_type_for(label), (NumericType))
]
kdf = self[num_cols]
names = [name for t in num_cols for name in t]
mat = kdf.to_pandas().to_numpy(dtype=float, copy=False)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid using to_pandas() without any restriction is not a good idea. It will cause OOM if the data side doesn't fit in a driver's memory.

if DataFrame(mat).notna().all().all():
if min_periods is not None and min_periods > len(mat):
base_cov = np.empty((mat.shape[1], mat.shape[1]))
base_cov.fill(np.nan)
else:
base_cov = np.cov(mat.T, ddof=ddof)
base_cov = base_cov.reshape((len(num_cols), len(num_cols)))
else:
from pandas.core.frame import libalgos

base_cov = libalgos.nancorr(mat, cov=True, minp=min_periods)

return DataFrame(base_cov, index=names, columns=names)

def _mark_duplicates(self, subset=None, keep="first"):
if subset is None:
subset = self._internal.column_labels
Expand Down
1 change: 0 additions & 1 deletion databricks/koalas/missing/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ class _MissingPandasLikeDataFrame(object):
compare = _unsupported_function("compare")
convert_dtypes = _unsupported_function("convert_dtypes")
corrwith = _unsupported_function("corrwith")
cov = _unsupported_function("cov")
ewm = _unsupported_function("ewm")
infer_objects = _unsupported_function("infer_objects")
interpolate = _unsupported_function("interpolate")
Expand Down
28 changes: 28 additions & 0 deletions databricks/koalas/tests/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -5543,3 +5543,31 @@ def test_at_time(self):
kdf = ks.DataFrame({"A": [1, 2, 3, 4]})
with self.assertRaisesRegex(TypeError, "Index must be DatetimeIndex"):
kdf.at_time("0:15")

def test_cov(self):
pdf = pd.DataFrame([(1, 2), (0, 3), (2, 0), (1, 1)], columns=["dogs", "cats"])
kdf = ks.from_pandas(pdf)
self.assert_eq(
pdf.cov(), kdf.cov(),
)

np.random.seed(42)
pdf = pd.DataFrame(np.random.randn(25, 5), columns=["a", "b", "c", "d", "e"])
kdf = ks.from_pandas(pdf)
self.assert_eq(
pdf.cov(), kdf.cov(), almost=True,
)

pdf = pd.DataFrame(np.random.randn(20, 3), columns=["a", "b", "c"])
pdf.loc[:5, "a"] = np.nan
pdf.loc[5:10, "b"] = np.nan
kdf = ks.from_pandas(pdf)
self.assert_eq(
pdf.cov(min_periods=12), kdf.cov(min_periods=12),
)
if LooseVersion(pd.__version__) > LooseVersion("1.1.0"):
df = pd.DataFrame(np.random.rand(10, 2), columns=["a", "b"])
kdf = ks.from_pandas(pdf)
self.assert_eq(
pdf.cov(ddof=2), kdf.cov(ddof=2), almost=True,
)