From 2622fad0323b83168779fee4903f327c9b4788ab Mon Sep 17 00:00:00 2001 From: Argenis Leon Date: Sun, 24 Nov 2019 22:37:03 -0600 Subject: [PATCH] Create two abstract class for outliers --- ...utliers.py => abstract_outliers_bounds.py} | 7 +- .../outliers/abstract_outliers_threshold.py | 90 +++++++++++++++++++ optimus/outliers/mad.py | 16 ++-- optimus/outliers/modified_z_score.py | 8 +- optimus/outliers/tukey.py | 19 ++-- optimus/outliers/z_score.py | 31 ++++++- 6 files changed, 149 insertions(+), 22 deletions(-) rename optimus/outliers/{abstract_outliers.py => abstract_outliers_bounds.py} (92%) create mode 100644 optimus/outliers/abstract_outliers_threshold.py diff --git a/optimus/outliers/abstract_outliers.py b/optimus/outliers/abstract_outliers_bounds.py similarity index 92% rename from optimus/outliers/abstract_outliers.py rename to optimus/outliers/abstract_outliers_bounds.py index 2bd69eda..f6c4bcd4 100644 --- a/optimus/outliers/abstract_outliers.py +++ b/optimus/outliers/abstract_outliers_bounds.py @@ -8,7 +8,7 @@ from optimus.helpers.filters import dict_filter -class AbstractOutlier(ABC): +class AbstractOutlierBounds(ABC): """ This is a template class to expand the outliers methods Also you need to add the function to outliers.py @@ -86,10 +86,11 @@ def non_outliers_count(self): :return: """ col_name = self.col_name - return self.df.rows.select((F.col(col_name) <= self.upper_bound) | (F.col(col_name) >= self.lower_bound)).count() + return self.df.rows.select( + (F.col(col_name) <= self.upper_bound) | (F.col(col_name) >= self.lower_bound)).count() @abstractmethod - def info(self): + def info(self, output: str = "dict"): """ Get whiskers, iqrs and outliers and non outliers count :return: diff --git a/optimus/outliers/abstract_outliers_threshold.py b/optimus/outliers/abstract_outliers_threshold.py new file mode 100644 index 00000000..76e84a3f --- /dev/null +++ b/optimus/outliers/abstract_outliers_threshold.py @@ -0,0 +1,90 @@ +from abc import ABC, abstractmethod + +from pyspark.sql import functions as F + +from optimus.helpers.check import is_dataframe +from optimus.helpers.columns import parse_columns +from optimus.helpers.converter import one_list_to_val +from optimus.helpers.filters import dict_filter + + +class AbstractOutlierThreshold(ABC): + """ + This is a template class to expand the outliers methods + Also you need to add the function to outliers.py + """ + + def __init__(self, df, col_name): + """ + + :param df: Spark Dataframe + :param col_name: column name + """ + if not is_dataframe(df): + raise TypeError("Spark Dataframe expected") + + self.df = df + self.col_name = one_list_to_val(parse_columns(df, col_name)) + + def select(self): + """ + Select outliers rows using the selected column + :return: + """ + + col_name = self.col_name + upper_bound, lower_bound = dict_filter(self.whiskers(), ["upper_bound", "lower_bound"]) + + return self.df.rows.select((F.col(col_name) > upper_bound) | (F.col(col_name) < lower_bound)) + + def drop(self): + """ + Drop outliers rows using the selected column + :return: + """ + + col_name = self.col_name + upper_bound, lower_bound = dict_filter(self.whiskers(), ["upper_bound", "lower_bound"]) + print(upper_bound, lower_bound) + return self.df.rows.drop((F.col(col_name) > upper_bound) | (F.col(col_name) < lower_bound)) + + def count_lower_bound(self, bound): + """ + Count outlier in the lower bound + :return: + """ + col_name = self.col_name + return self.df.rows.select(self.df[col_name] < bound).count() + + def count_upper_bound(self, bound): + """ + Count outliers in the upper bound + :return: + """ + col_name = self.col_name + return self.df.rows.select(self.df[col_name] > bound).count() + + def count(self): + """ + Count the outliers rows using the selected column + :return: + """ + col_name = self.col_name + return self.df.rows.select((F.col(col_name) > self.upper_bound) | (F.col(col_name) < self.lower_bound)).count() + + def non_outliers_count(self): + """ + Count non outliers rows using the selected column + :return: + """ + col_name = self.col_name + return self.df.rows.select( + (F.col(col_name) <= self.upper_bound) | (F.col(col_name) >= self.lower_bound)).count() + + @abstractmethod + def info(self, output: str = "dict"): + """ + Get whiskers, iqrs and outliers and non outliers count + :return: + """ + pass diff --git a/optimus/outliers/mad.py b/optimus/outliers/mad.py index 98001627..62ec08e9 100644 --- a/optimus/outliers/mad.py +++ b/optimus/outliers/mad.py @@ -1,9 +1,10 @@ from optimus.helpers.constants import RELATIVE_ERROR from optimus.helpers.filters import dict_filter -from optimus.outliers.abstract_outliers import AbstractOutlier +from optimus.helpers.json import dump_json +from optimus.outliers.abstract_outliers_bounds import AbstractOutlierBounds -class MAD(AbstractOutlier): +class MAD(AbstractOutlierBounds): """ Handle outliers using mad """ @@ -35,7 +36,7 @@ def whiskers(self): return {"lower_bound": lower_bound, "upper_bound": upper_bound} - def info(self): + def info(self, output: str = "dict"): """ Get whiskers, iqrs and outliers and non outliers count :return: @@ -43,6 +44,9 @@ def info(self): upper_bound, lower_bound, = dict_filter(self.whiskers(), ["upper_bound", "lower_bound"]) - return {"count_outliers": self.count(), "count_non_outliers": self.non_outliers_count(), - "lower_bound": lower_bound, "lower_bound_count": self.count_lower_bound(lower_bound), - "upper_bound": upper_bound, "upper_bound_count": self.count_upper_bound(upper_bound)} + result = {"count_outliers": self.count(), "count_non_outliers": self.non_outliers_count(), + "lower_bound": lower_bound, "lower_bound_count": self.count_lower_bound(lower_bound), + "upper_bound": upper_bound, "upper_bound_count": self.count_upper_bound(upper_bound)} + if output == "json": + result = dump_json(result) + return result diff --git a/optimus/outliers/modified_z_score.py b/optimus/outliers/modified_z_score.py index d768ee93..e35ca1e7 100644 --- a/optimus/outliers/modified_z_score.py +++ b/optimus/outliers/modified_z_score.py @@ -4,10 +4,10 @@ from optimus.helpers.columns import parse_columns, name_col from optimus.helpers.constants import RELATIVE_ERROR from optimus.helpers.converter import one_list_to_val -from optimus.outliers.abstract_outliers import AbstractOutlier +from optimus.outliers.abstract_outliers_threshold import AbstractOutlierThreshold -class ModifiedZScore(AbstractOutlier): +class ModifiedZScore(AbstractOutlierThreshold): """ Handle outliers from a DataFrame using modified z score Reference: http://colingorrie.github.io/outlier-detection.html#modified-z-score-method @@ -21,7 +21,6 @@ def __init__(self, df, col_name, threshold, relative_error=RELATIVE_ERROR): :param col_name: :param threshold: """ - super().__init__(df, col_name) if not is_dataframe(df): raise TypeError("Spark Dataframe expected") @@ -34,9 +33,10 @@ def __init__(self, df, col_name, threshold, relative_error=RELATIVE_ERROR): self.df = df self.threshold = threshold self.relative_error = relative_error - self.col_name = one_list_to_val(parse_columns(df, col_name)) + super().__init__(df, col_name) + def _m_z_score(self): df = self.df col_name = self.col_name diff --git a/optimus/outliers/tukey.py b/optimus/outliers/tukey.py index 4344c881..fe47bf0b 100644 --- a/optimus/outliers/tukey.py +++ b/optimus/outliers/tukey.py @@ -1,8 +1,9 @@ from optimus.helpers.filters import dict_filter -from optimus.outliers.abstract_outliers import AbstractOutlier +from optimus.helpers.json import dump_json +from optimus.outliers.abstract_outliers_bounds import AbstractOutlierBounds -class Tukey(AbstractOutlier): +class Tukey(AbstractOutlierBounds): """ Handle outliers using inter quartile range """ @@ -31,7 +32,7 @@ def whiskers(self): return {"lower_bound": lower_bound, "upper_bound": upper_bound, "iqr1": iqr["q1"], "iqr3": iqr["q3"]} - def info(self): + def info(self, output: str = "dict"): """ Get whiskers, iqrs and outliers and non outliers count :return: @@ -41,7 +42,11 @@ def info(self): iqr1 = self.iqr1 iqr3 = self.iqr3 - return {"count_outliers": self.count(), "count_non_outliers": self.non_outliers_count(), - "lower_bound": lower_bound, "lower_bound_count": self.count_lower_bound(lower_bound), - "upper_bound": upper_bound, "upper_bound_count": self.count_upper_bound(upper_bound), - "iqr1": iqr1, "iqr3": iqr3} + result = {"count_outliers": self.count(), "count_non_outliers": self.non_outliers_count(), + "lower_bound": lower_bound, "lower_bound_count": self.count_lower_bound(lower_bound), + "upper_bound": upper_bound, "upper_bound_count": self.count_upper_bound(upper_bound), + "iqr1": iqr1, "iqr3": iqr3} + + if output == "json": + result = dump_json(result) + return result diff --git a/optimus/outliers/z_score.py b/optimus/outliers/z_score.py index 12136b26..0f3e2725 100644 --- a/optimus/outliers/z_score.py +++ b/optimus/outliers/z_score.py @@ -1,10 +1,12 @@ +from pyspark.sql import functions as F + from optimus.helpers.check import is_dataframe, is_numeric from optimus.helpers.columns import parse_columns, name_col from optimus.helpers.converter import one_list_to_val -from optimus.outliers.abstract_outliers import AbstractOutlier +from optimus.outliers.abstract_outliers_threshold import AbstractOutlierThreshold -class ZScore(AbstractOutlier): +class ZScore(AbstractOutlierThreshold): """ Handle outliers using z Score """ @@ -27,6 +29,31 @@ def __init__(self, df, col_name, threshold): self.col_name = one_list_to_val(parse_columns(df, col_name)) + super().__init__(df, col_name) + + def drop(self): + col_name = self.col_name + z_col_name = name_col(col_name, "z_score") + threshold = self.threshold + + return self.df.cols.z_score(col_name, z_col_name) \ + .rows.drop(F.col(z_col_name) > threshold) \ + .cols.drop(z_col_name) + + def select(self): + col_name = self.col_name + z_col_name = name_col(col_name, "z_score") + + return self.df.cols.z_score(col_name, z_col_name) \ + .rows.select(F.col(z_col_name) > self.threshold) \ + .cols.drop(z_col_name) + + def non_outliers_count(self): + return self.drop().count() + + def count(self): + return self.select().count() + def info(self): col_name = self.col_name z_col_name = name_col(col_name, "z_score")