Skip to content

Commit

Permalink
Updated outliers functions
Browse files Browse the repository at this point in the history
* Create Abstract class for outliers
* Added outliers count for lower and upper bound
  • Loading branch information
argenisleon committed Nov 24, 2019
1 parent 33eea4f commit 36ec821
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 203 deletions.
97 changes: 97 additions & 0 deletions optimus/outliers/abstract_outliers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
from abc import ABC, abstractmethod

from pyspark.sql import functions as F

from optimus.helpers.check import is_dataframe
from optimus.helpers.columns import parse_columns
from optimus.helpers.converter import one_list_to_val
from optimus.helpers.filters import dict_filter


class AbstractOutlier(ABC):
"""
This is a template class to expand the outliers methods
Also you need to add the function to outliers.py
"""

def __init__(self, df, col_name):
"""
:param df: Spark Dataframe
:param col_name: column name
"""
if not is_dataframe(df):
raise TypeError("Spark Dataframe expected")

self.df = df
self.col_name = one_list_to_val(parse_columns(df, col_name))

@abstractmethod
def whiskers(self):
"""
Get the whiskers and IQR
:return:
"""
pass

def select(self):
"""
Select outliers rows using the selected column
:return:
"""

col_name = self.col_name
upper_bound, lower_bound = dict_filter(self.whiskers(), ["upper_bound", "lower_bound"])

return self.df.rows.select((F.col(col_name) > upper_bound) | (F.col(col_name) < lower_bound))

def drop(self):
"""
Drop outliers rows using the selected column
:return:
"""

col_name = self.col_name
upper_bound, lower_bound = dict_filter(self.whiskers(), ["upper_bound", "lower_bound"])
print(upper_bound, lower_bound)
return self.df.rows.drop((F.col(col_name) > upper_bound) | (F.col(col_name) < lower_bound))

def count_lower_bound(self, bound):
"""
Count outlier in the lower bound
:return:
"""
col_name = self.col_name
return self.df.rows.select(self.df[col_name] < bound).count()

def count_upper_bound(self, bound):
"""
Count outliers in the upper bound
:return:
"""
col_name = self.col_name
return self.df.rows.select(self.df[col_name] > bound).count()

def count(self):
"""
Count the outliers rows using the selected column
:return:
"""
col_name = self.col_name
return self.df.rows.select((F.col(col_name) > self.upper_bound) | (F.col(col_name) < self.lower_bound)).count()

def non_outliers_count(self):
"""
Count non outliers rows using the selected column
:return:
"""
col_name = self.col_name
return self.df.rows.select((F.col(col_name) <= self.upper_bound) | (F.col(col_name) >= self.lower_bound)).count()

@abstractmethod
def info(self):
"""
Get whiskers, iqrs and outliers and non outliers count
:return:
"""
pass
49 changes: 12 additions & 37 deletions optimus/outliers/mad.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,28 @@
from pyspark.sql import functions as F

from optimus.helpers.filters import dict_filter
from optimus.helpers.constants import RELATIVE_ERROR
from optimus.helpers.filters import dict_filter
from optimus.outliers.abstract_outliers import AbstractOutlier

class MAD:

class MAD(AbstractOutlier):
"""
Handle outliers using mad
"""

def __init__(self, df, col_name, threshold, relative_error=RELATIVE_ERROR):
def __init__(self, df, col_name, threshold: int, relative_error: int = RELATIVE_ERROR):
"""
:param df:
:param col_name:
:type threshold: object
:type relative_error: object
"""
self.df = df
self.col_name = col_name
self.threshold = threshold
self.relative_error = relative_error
self.upper_bound, self.lower_bound = dict_filter(self.whiskers(), ["upper_bound", "lower_bound"])

super().__init__(df, col_name)

def whiskers(self):
"""
Expand All @@ -30,36 +35,6 @@ def whiskers(self):

return {"lower_bound": lower_bound, "upper_bound": upper_bound}

def drop(self):
col_name = self.col_name
upper_bound, lower_bound = dict_filter(self.whiskers(), ["upper_bound", "lower_bound"])
return self.df.rows.drop((F.col(col_name) > upper_bound) | (F.col(col_name) < lower_bound))

def select(self):
"""
Select outliers rows using the selected column
:return:
"""
col_name = self.col_name
upper_bound, lower_bound = dict_filter(self.whiskers(), ["upper_bound", "lower_bound"])
return self.df.rows.select((F.col(col_name) > upper_bound) | (F.col(col_name) < lower_bound))

def count(self):
"""
Count the outliers rows using the selected column
:return:
"""

return self.select().count()

def non_outliers_count(self):
"""
Count non outliers rows using the selected column
:return:
"""

return self.drop().count()

def info(self):
"""
Get whiskers, iqrs and outliers and non outliers count
Expand All @@ -69,5 +44,5 @@ def info(self):
["upper_bound", "lower_bound"])

return {"count_outliers": self.count(), "count_non_outliers": self.non_outliers_count(),
"lower_bound": lower_bound,
"upper_bound": upper_bound, }
"lower_bound": lower_bound, "lower_bound_count": self.count_lower_bound(lower_bound),
"upper_bound": upper_bound, "upper_bound_count": self.count_upper_bound(upper_bound)}
22 changes: 3 additions & 19 deletions optimus/outliers/modified_z_score.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
from optimus.helpers.columns import parse_columns, name_col
from optimus.helpers.constants import RELATIVE_ERROR
from optimus.helpers.converter import one_list_to_val
from optimus.outliers.abstract_outliers import AbstractOutlier


class ModifiedZScore:
class ModifiedZScore(AbstractOutlier):
"""
Handle outliers from a DataFrame using modified z score
Reference: http://colingorrie.github.io/outlier-detection.html#modified-z-score-method
Expand All @@ -20,6 +21,7 @@ def __init__(self, df, col_name, threshold, relative_error=RELATIVE_ERROR):
:param col_name:
:param threshold:
"""
super().__init__(df, col_name)
if not is_dataframe(df):
raise TypeError("Spark Dataframe expected")

Expand All @@ -44,24 +46,6 @@ def _m_z_score(self):

return df.withColumn(m_z_col_name, F.abs(0.6745 * (F.col(col_name) - mad["median"]) / mad["mad"]))

def select(self):

m_z_col_name = name_col(self.col_name, "modified_z_score")
df = self._m_z_score()
return df.rows.select(F.col(m_z_col_name) > self.threshold).cols.drop(m_z_col_name)

def drop(self):

m_z_col_name = name_col(self.col_name, "modified_z_score")
df = self._m_z_score()
return df.rows.drop(F.col(m_z_col_name) > self.threshold).cols.drop(m_z_col_name)

def non_outliers_count(self):
return self.drop().count()

def count(self):
return self.select().count()

def info(self):
m_z_col_name = name_col(self.col_name, "modified_z_score")

Expand Down
66 changes: 0 additions & 66 deletions optimus/outliers/template_outlier_class.py

This file was deleted.

68 changes: 15 additions & 53 deletions optimus/outliers/tukey.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
from pyspark.sql import functions as F

from optimus.helpers.check import is_dataframe
from optimus.helpers.converter import one_list_to_val
from optimus.helpers.columns import parse_columns
from optimus.helpers.filters import dict_filter
from optimus.outliers.abstract_outliers import AbstractOutlier


class Tukey:
class Tukey(AbstractOutlier):
"""
Handle outliers using inter quartile range
"""
Expand All @@ -17,15 +13,16 @@ def __init__(self, df, col_name):
:param df: Spark Dataframe
:param col_name: column name
"""
if not is_dataframe(df):
raise TypeError("Spark Dataframe expected")

self.df = df
self.col_name = one_list_to_val(parse_columns(df, col_name))
self.col_name = col_name
self.upper_bound, self.lower_bound, self.iqr1, self.iqr3 = dict_filter(self.whiskers(),
["upper_bound", "lower_bound", "iqr1",
"iqr3"])
super().__init__(df, col_name)

def whiskers(self):
"""
Get the whiskers and IQR
Get the whiskers and IQR
:return:
"""
iqr = self.df.cols.iqr(self.col_name, more=True)
Expand All @@ -34,52 +31,17 @@ def whiskers(self):

return {"lower_bound": lower_bound, "upper_bound": upper_bound, "iqr1": iqr["q1"], "iqr3": iqr["q3"]}

def select(self):
"""
Select outliers rows using the selected column
:return:
"""

col_name = self.col_name
upper_bound, lower_bound = dict_filter(self.whiskers(), ["upper_bound", "lower_bound"])

return self.df.rows.select((F.col(col_name) > upper_bound) | (F.col(col_name) < lower_bound))

def drop(self):
"""
Drop outliers rows using the selected column
:return:
"""

col_name = self.col_name
upper_bound, lower_bound = dict_filter(self.whiskers(), ["upper_bound", "lower_bound"])

return self.df.rows.drop((F.col(col_name) > upper_bound) | (F.col(col_name) < lower_bound))

def count(self):
"""
Count the outliers rows using the selected column
:return:
"""

return self.df.select().count()

def non_outliers_count(self):
"""
Count non outliers rows using the selected column
:return:
"""

return self.drop().count()

def info(self):
"""
Get whiskers, iqrs and outliers and non outliers count
:return:
"""
upper_bound, lower_bound, iqr1, iqr3 = dict_filter(self.whiskers(),
["upper_bound", "lower_bound", "iqr1", "iqr3"])
lower_bound = self.lower_bound
upper_bound = self.upper_bound
iqr1 = self.iqr1
iqr3 = self.iqr3

return {"count_outliers": self.count(), "count_non_outliers": self.non_outliers_count(),
"lower_bound": lower_bound,
"upper_bound": upper_bound, "iqr1": iqr1, "iqr3": iqr3}
"lower_bound": lower_bound, "lower_bound_count": self.count_lower_bound(lower_bound),
"upper_bound": upper_bound, "upper_bound_count": self.count_upper_bound(upper_bound),
"iqr1": iqr1, "iqr3": iqr3}
Loading

0 comments on commit 36ec821

Please sign in to comment.