diff --git a/optimus/outliers/abstract_outliers.py b/optimus/outliers/abstract_outliers.py new file mode 100644 index 00000000..2bd69eda --- /dev/null +++ b/optimus/outliers/abstract_outliers.py @@ -0,0 +1,97 @@ +from abc import ABC, abstractmethod + +from pyspark.sql import functions as F + +from optimus.helpers.check import is_dataframe +from optimus.helpers.columns import parse_columns +from optimus.helpers.converter import one_list_to_val +from optimus.helpers.filters import dict_filter + + +class AbstractOutlier(ABC): + """ + This is a template class to expand the outliers methods + Also you need to add the function to outliers.py + """ + + def __init__(self, df, col_name): + """ + + :param df: Spark Dataframe + :param col_name: column name + """ + if not is_dataframe(df): + raise TypeError("Spark Dataframe expected") + + self.df = df + self.col_name = one_list_to_val(parse_columns(df, col_name)) + + @abstractmethod + def whiskers(self): + """ + Get the whiskers and IQR + :return: + """ + pass + + def select(self): + """ + Select outliers rows using the selected column + :return: + """ + + col_name = self.col_name + upper_bound, lower_bound = dict_filter(self.whiskers(), ["upper_bound", "lower_bound"]) + + return self.df.rows.select((F.col(col_name) > upper_bound) | (F.col(col_name) < lower_bound)) + + def drop(self): + """ + Drop outliers rows using the selected column + :return: + """ + + col_name = self.col_name + upper_bound, lower_bound = dict_filter(self.whiskers(), ["upper_bound", "lower_bound"]) + print(upper_bound, lower_bound) + return self.df.rows.drop((F.col(col_name) > upper_bound) | (F.col(col_name) < lower_bound)) + + def count_lower_bound(self, bound): + """ + Count outlier in the lower bound + :return: + """ + col_name = self.col_name + return self.df.rows.select(self.df[col_name] < bound).count() + + def count_upper_bound(self, bound): + """ + Count outliers in the upper bound + :return: + """ + col_name = self.col_name + return self.df.rows.select(self.df[col_name] > bound).count() + + def count(self): + """ + Count the outliers rows using the selected column + :return: + """ + col_name = self.col_name + return self.df.rows.select((F.col(col_name) > self.upper_bound) | (F.col(col_name) < self.lower_bound)).count() + + def non_outliers_count(self): + """ + Count non outliers rows using the selected column + :return: + """ + col_name = self.col_name + return self.df.rows.select((F.col(col_name) <= self.upper_bound) | (F.col(col_name) >= self.lower_bound)).count() + + @abstractmethod + def info(self): + """ + Get whiskers, iqrs and outliers and non outliers count + :return: + """ + pass diff --git a/optimus/outliers/mad.py b/optimus/outliers/mad.py index 0a842a12..98001627 100644 --- a/optimus/outliers/mad.py +++ b/optimus/outliers/mad.py @@ -1,23 +1,28 @@ -from pyspark.sql import functions as F - -from optimus.helpers.filters import dict_filter from optimus.helpers.constants import RELATIVE_ERROR +from optimus.helpers.filters import dict_filter +from optimus.outliers.abstract_outliers import AbstractOutlier -class MAD: + +class MAD(AbstractOutlier): """ Handle outliers using mad """ - def __init__(self, df, col_name, threshold, relative_error=RELATIVE_ERROR): + def __init__(self, df, col_name, threshold: int, relative_error: int = RELATIVE_ERROR): """ :param df: :param col_name: + :type threshold: object + :type relative_error: object """ self.df = df self.col_name = col_name self.threshold = threshold self.relative_error = relative_error + self.upper_bound, self.lower_bound = dict_filter(self.whiskers(), ["upper_bound", "lower_bound"]) + + super().__init__(df, col_name) def whiskers(self): """ @@ -30,36 +35,6 @@ def whiskers(self): return {"lower_bound": lower_bound, "upper_bound": upper_bound} - def drop(self): - col_name = self.col_name - upper_bound, lower_bound = dict_filter(self.whiskers(), ["upper_bound", "lower_bound"]) - return self.df.rows.drop((F.col(col_name) > upper_bound) | (F.col(col_name) < lower_bound)) - - def select(self): - """ - Select outliers rows using the selected column - :return: - """ - col_name = self.col_name - upper_bound, lower_bound = dict_filter(self.whiskers(), ["upper_bound", "lower_bound"]) - return self.df.rows.select((F.col(col_name) > upper_bound) | (F.col(col_name) < lower_bound)) - - def count(self): - """ - Count the outliers rows using the selected column - :return: - """ - - return self.select().count() - - def non_outliers_count(self): - """ - Count non outliers rows using the selected column - :return: - """ - - return self.drop().count() - def info(self): """ Get whiskers, iqrs and outliers and non outliers count @@ -69,5 +44,5 @@ def info(self): ["upper_bound", "lower_bound"]) return {"count_outliers": self.count(), "count_non_outliers": self.non_outliers_count(), - "lower_bound": lower_bound, - "upper_bound": upper_bound, } + "lower_bound": lower_bound, "lower_bound_count": self.count_lower_bound(lower_bound), + "upper_bound": upper_bound, "upper_bound_count": self.count_upper_bound(upper_bound)} diff --git a/optimus/outliers/modified_z_score.py b/optimus/outliers/modified_z_score.py index 16c2a472..d768ee93 100644 --- a/optimus/outliers/modified_z_score.py +++ b/optimus/outliers/modified_z_score.py @@ -4,9 +4,10 @@ from optimus.helpers.columns import parse_columns, name_col from optimus.helpers.constants import RELATIVE_ERROR from optimus.helpers.converter import one_list_to_val +from optimus.outliers.abstract_outliers import AbstractOutlier -class ModifiedZScore: +class ModifiedZScore(AbstractOutlier): """ Handle outliers from a DataFrame using modified z score Reference: http://colingorrie.github.io/outlier-detection.html#modified-z-score-method @@ -20,6 +21,7 @@ def __init__(self, df, col_name, threshold, relative_error=RELATIVE_ERROR): :param col_name: :param threshold: """ + super().__init__(df, col_name) if not is_dataframe(df): raise TypeError("Spark Dataframe expected") @@ -44,24 +46,6 @@ def _m_z_score(self): return df.withColumn(m_z_col_name, F.abs(0.6745 * (F.col(col_name) - mad["median"]) / mad["mad"])) - def select(self): - - m_z_col_name = name_col(self.col_name, "modified_z_score") - df = self._m_z_score() - return df.rows.select(F.col(m_z_col_name) > self.threshold).cols.drop(m_z_col_name) - - def drop(self): - - m_z_col_name = name_col(self.col_name, "modified_z_score") - df = self._m_z_score() - return df.rows.drop(F.col(m_z_col_name) > self.threshold).cols.drop(m_z_col_name) - - def non_outliers_count(self): - return self.drop().count() - - def count(self): - return self.select().count() - def info(self): m_z_col_name = name_col(self.col_name, "modified_z_score") diff --git a/optimus/outliers/template_outlier_class.py b/optimus/outliers/template_outlier_class.py deleted file mode 100644 index f2bc451b..00000000 --- a/optimus/outliers/template_outlier_class.py +++ /dev/null @@ -1,66 +0,0 @@ -# TODO: We should implement this as an interface -class TemplateOutlierClass: - """ - This is a template class to expand the outliers methods - Also you need to add the function to outliers.py - """ - - def __init__(self, df, columns, any_param): - """ - - :param df: - :param columns: - :param any_param: - """ - self.df = df - self.columns = columns - self.any_param = any_param - - def select(self): - """ - This must return the rows - :return: - """ - - df = self.df - - # Calculate the outliers to filter the rows - - # Return here the df the outliers rows - return df - - def drop(self): - """ - This must drop the rows with outliers - :return: - """ - df = self.df - - # Calculate the outliers to filter the rows - - # Return here the df without the outliers rows - return df - - def count(self): - """ - Count the outliers rows using the selected column - :return: - """ - - return self.df.select().count() - - def non_outliers_count(self): - """ - Count non outliers rows using the selected column - :return: - """ - - return self.drop().count() - - def info(self): - """ - Get whiskers, iqrs and outliers and non outliers count - :return: - """ - - return {} diff --git a/optimus/outliers/tukey.py b/optimus/outliers/tukey.py index 878aeffa..4344c881 100644 --- a/optimus/outliers/tukey.py +++ b/optimus/outliers/tukey.py @@ -1,12 +1,8 @@ -from pyspark.sql import functions as F - -from optimus.helpers.check import is_dataframe -from optimus.helpers.converter import one_list_to_val -from optimus.helpers.columns import parse_columns from optimus.helpers.filters import dict_filter +from optimus.outliers.abstract_outliers import AbstractOutlier -class Tukey: +class Tukey(AbstractOutlier): """ Handle outliers using inter quartile range """ @@ -17,15 +13,16 @@ def __init__(self, df, col_name): :param df: Spark Dataframe :param col_name: column name """ - if not is_dataframe(df): - raise TypeError("Spark Dataframe expected") - self.df = df - self.col_name = one_list_to_val(parse_columns(df, col_name)) + self.col_name = col_name + self.upper_bound, self.lower_bound, self.iqr1, self.iqr3 = dict_filter(self.whiskers(), + ["upper_bound", "lower_bound", "iqr1", + "iqr3"]) + super().__init__(df, col_name) def whiskers(self): """ - Get the whiskers and IQR + Get the whiskers and IQR :return: """ iqr = self.df.cols.iqr(self.col_name, more=True) @@ -34,52 +31,17 @@ def whiskers(self): return {"lower_bound": lower_bound, "upper_bound": upper_bound, "iqr1": iqr["q1"], "iqr3": iqr["q3"]} - def select(self): - """ - Select outliers rows using the selected column - :return: - """ - - col_name = self.col_name - upper_bound, lower_bound = dict_filter(self.whiskers(), ["upper_bound", "lower_bound"]) - - return self.df.rows.select((F.col(col_name) > upper_bound) | (F.col(col_name) < lower_bound)) - - def drop(self): - """ - Drop outliers rows using the selected column - :return: - """ - - col_name = self.col_name - upper_bound, lower_bound = dict_filter(self.whiskers(), ["upper_bound", "lower_bound"]) - - return self.df.rows.drop((F.col(col_name) > upper_bound) | (F.col(col_name) < lower_bound)) - - def count(self): - """ - Count the outliers rows using the selected column - :return: - """ - - return self.df.select().count() - - def non_outliers_count(self): - """ - Count non outliers rows using the selected column - :return: - """ - - return self.drop().count() - def info(self): """ Get whiskers, iqrs and outliers and non outliers count :return: """ - upper_bound, lower_bound, iqr1, iqr3 = dict_filter(self.whiskers(), - ["upper_bound", "lower_bound", "iqr1", "iqr3"]) + lower_bound = self.lower_bound + upper_bound = self.upper_bound + iqr1 = self.iqr1 + iqr3 = self.iqr3 return {"count_outliers": self.count(), "count_non_outliers": self.non_outliers_count(), - "lower_bound": lower_bound, - "upper_bound": upper_bound, "iqr1": iqr1, "iqr3": iqr3} + "lower_bound": lower_bound, "lower_bound_count": self.count_lower_bound(lower_bound), + "upper_bound": upper_bound, "upper_bound_count": self.count_upper_bound(upper_bound), + "iqr1": iqr1, "iqr3": iqr3} diff --git a/optimus/outliers/z_score.py b/optimus/outliers/z_score.py index 1c190821..12136b26 100644 --- a/optimus/outliers/z_score.py +++ b/optimus/outliers/z_score.py @@ -1,11 +1,10 @@ -from pyspark.sql import functions as F - from optimus.helpers.check import is_dataframe, is_numeric -from optimus.helpers.converter import one_list_to_val from optimus.helpers.columns import parse_columns, name_col +from optimus.helpers.converter import one_list_to_val +from optimus.outliers.abstract_outliers import AbstractOutlier -class ZScore: +class ZScore(AbstractOutlier): """ Handle outliers using z Score """ @@ -28,29 +27,6 @@ def __init__(self, df, col_name, threshold): self.col_name = one_list_to_val(parse_columns(df, col_name)) - def drop(self): - col_name = self.col_name - z_col_name = name_col(col_name, "z_score") - threshold = self.threshold - - return self.df.cols.z_score(col_name, z_col_name) \ - .rows.drop(F.col(z_col_name) > threshold) \ - .cols.drop(z_col_name) - - def select(self): - col_name = self.col_name - z_col_name = name_col(col_name, "z_score") - - return self.df.cols.z_score(col_name, z_col_name) \ - .rows.select(F.col(z_col_name) > self.threshold) \ - .cols.drop(z_col_name) - - def non_outliers_count(self): - return self.drop().count() - - def count(self): - return self.select().count() - def info(self): col_name = self.col_name z_col_name = name_col(col_name, "z_score") @@ -58,4 +34,5 @@ def info(self): max_z_score = self.df.cols.z_score(col_name, z_col_name) \ .cols.max(z_col_name) - return {"count_outliers": self.count(), "count_non_outliers": self.non_outliers_count(), "max_z_score": max_z_score} + return {"count_outliers": self.count(), "count_non_outliers": self.non_outliers_count(), + "max_z_score": max_z_score}