Skip to content

Commit

Permalink
Fix threshold outliers functions
Browse files Browse the repository at this point in the history
  • Loading branch information
argenisleon committed Nov 27, 2019
1 parent f6c3903 commit a57300d
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 53 deletions.
28 changes: 11 additions & 17 deletions optimus/outliers/abstract_outliers_threshold.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@
from pyspark.sql import functions as F

from optimus.helpers.check import is_dataframe
from optimus.helpers.columns import parse_columns
from optimus.helpers.columns import parse_columns, name_col
from optimus.helpers.converter import one_list_to_val
from optimus.helpers.filters import dict_filter


class AbstractOutlierThreshold(ABC):
Expand All @@ -14,7 +13,7 @@ class AbstractOutlierThreshold(ABC):
Also you need to add the function to outliers.py
"""

def __init__(self, df, col_name):
def __init__(self, df, col_name, prefix):
"""
:param df: Spark Dataframe
Expand All @@ -25,28 +24,25 @@ def __init__(self, df, col_name):

self.df = df
self.col_name = one_list_to_val(parse_columns(df, col_name))
self.tmp_col = name_col(self.col_name, prefix)

def select(self):
"""
Select outliers rows using the selected column
:return:
"""

col_name = self.col_name
upper_bound, lower_bound = dict_filter(self.whiskers(), ["upper_bound", "lower_bound"])

return self.df.rows.select((F.col(col_name) > upper_bound) | (F.col(col_name) < lower_bound))
df = self.df
return df.rows.select(F.col(self.tmp_col) > self.threshold).cols.drop(self.tmp_col)

def drop(self):
"""
Drop outliers rows using the selected column
:return:
"""

col_name = self.col_name
upper_bound, lower_bound = dict_filter(self.whiskers(), ["upper_bound", "lower_bound"])
print(upper_bound, lower_bound)
return self.df.rows.drop((F.col(col_name) > upper_bound) | (F.col(col_name) < lower_bound))
df = self.df
return df.rows.drop(F.col(self.tmp_col) >= self.threshold).cols.drop(self.tmp_col)

def count_lower_bound(self, bound):
"""
Expand All @@ -62,24 +58,22 @@ def count_upper_bound(self, bound):
:return:
"""
col_name = self.col_name
return self.df.rows.select(self.df[col_name] > bound).count()
return self.df.rows.select(self.df[col_name] >= bound).count()

def count(self):
"""
Count the outliers rows using the selected column
:return:
"""
col_name = self.col_name
return self.df.rows.select((F.col(col_name) > self.upper_bound) | (F.col(col_name) < self.lower_bound)).count()
return self.select().count()

def non_outliers_count(self):
"""
Count non outliers rows using the selected column
:return:
"""
col_name = self.col_name
return self.df.rows.select(
(F.col(col_name) <= self.upper_bound) | (F.col(col_name) >= self.lower_bound)).count()
df = self.df
return df.rows.select(F.col(self.tmp_col) < self.threshold).cols.drop(self.tmp_col).count()

@abstractmethod
def info(self, output: str = "dict"):
Expand Down
4 changes: 2 additions & 2 deletions optimus/outliers/modified_z_score.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ def __init__(self, df, col_name, threshold, relative_error=RELATIVE_ERROR):
self.threshold = threshold
self.relative_error = relative_error
self.col_name = one_list_to_val(parse_columns(df, col_name))

super().__init__(df, col_name)
self.df_score = self._m_z_score()
super().__init__(self.df_score, col_name, "modified_z_score")

def _m_z_score(self):
df = self.df
Expand Down
44 changes: 13 additions & 31 deletions optimus/outliers/z_score.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,52 +14,34 @@ class ZScore(AbstractOutlierThreshold):
def __init__(self, df, col_name, threshold):
"""
:param df: Spark Dataframe
:para df:
:param col_name:
:param threshold:
"""

if not is_dataframe(df):
raise TypeError("Spark Dataframe expected")

self.df = df

if not is_numeric(threshold):
raise TypeError("Numeric expected")
self.threshold = threshold

self.df = df
self.threshold = threshold
self.col_name = one_list_to_val(parse_columns(df, col_name))
self.tmp_col = name_col(col_name, "z_score")
self.df_score = self.z_score()
super().__init__(self.df_score, col_name, "z_score")

super().__init__(df, col_name)

def drop(self):
def z_score(self):
df = self.df
col_name = self.col_name
z_col_name = name_col(col_name, "z_score")
threshold = self.threshold

return self.df.cols.z_score(col_name, z_col_name) \
.rows.drop(F.col(z_col_name) > threshold) \
.cols.drop(z_col_name)

def select(self):
col_name = self.col_name
z_col_name = name_col(col_name, "z_score")

return self.df.cols.z_score(col_name, z_col_name) \
.rows.select(F.col(z_col_name) > self.threshold) \
.cols.drop(z_col_name)

def non_outliers_count(self):
return self.drop().count()

def count(self):
return self.select().count()
return df.cols.z_score(col_name, output_cols=self.tmp_col)

def info(self):
col_name = self.col_name
z_col_name = name_col(col_name, "z_score")
self.tmp_col = name_col(self.col_name, "z_score")

max_z_score = self.df.cols.z_score(col_name, z_col_name) \
.cols.max(z_col_name)
df = self.z_score()
max_z_score = df.rows.select(F.col(self.tmp_col) > self.threshold).cols.max(self.tmp_col)

return {"count_outliers": self.count(), "count_non_outliers": self.non_outliers_count(),
"max_z_score": max_z_score}
6 changes: 3 additions & 3 deletions tests/test_df_outliers.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def test_outliers_modified_z_score_count():
actual_df =source_df.outliers.modified_z_score('height(ft)',0.5,10000).count()
actual_df =json_enconding(actual_df)
expected_value =json_enconding(3)
assert (expected_value == actual_df)
assert(expected_value == actual_df)
@staticmethod
def test_outliers_modified_z_score_drop():
actual_df =source_df.outliers.modified_z_score('height(ft)',0.5,10000).drop()
Expand All @@ -56,13 +56,13 @@ def test_outliers_modified_z_score_info():
actual_df =source_df.outliers.modified_z_score('height(ft)',0.5,10000).info()
actual_df =json_enconding(actual_df)
expected_value =json_enconding({'count_outliers': 3, 'count_non_outliers': 2, 'max_m_z_score': 21.20928})
assert (expected_value == actual_df)
assert(expected_value == actual_df)
@staticmethod
def test_outliers_modified_z_score_non_outliers_count():
actual_df =source_df.outliers.modified_z_score('height(ft)',0.5,10000).non_outliers_count()
actual_df =json_enconding(actual_df)
expected_value =json_enconding(2)
assert (expected_value == actual_df)
assert(expected_value == actual_df)
@staticmethod
def test_outliers_modified_z_score_select():
actual_df =source_df.outliers.modified_z_score('height(ft)',0.5,10000).select()
Expand Down

0 comments on commit a57300d

Please sign in to comment.