From a57300d2227b471a5e043b612c87f29194eeb7c0 Mon Sep 17 00:00:00 2001 From: Argenis Leon Date: Wed, 27 Nov 2019 14:32:19 -0600 Subject: [PATCH] Fix threshold outliers functions --- .../outliers/abstract_outliers_threshold.py | 28 +++++------- optimus/outliers/modified_z_score.py | 4 +- optimus/outliers/z_score.py | 44 ++++++------------- tests/test_df_outliers.py | 6 +-- 4 files changed, 29 insertions(+), 53 deletions(-) diff --git a/optimus/outliers/abstract_outliers_threshold.py b/optimus/outliers/abstract_outliers_threshold.py index 76e84a3f..7c857792 100644 --- a/optimus/outliers/abstract_outliers_threshold.py +++ b/optimus/outliers/abstract_outliers_threshold.py @@ -3,9 +3,8 @@ from pyspark.sql import functions as F from optimus.helpers.check import is_dataframe -from optimus.helpers.columns import parse_columns +from optimus.helpers.columns import parse_columns, name_col from optimus.helpers.converter import one_list_to_val -from optimus.helpers.filters import dict_filter class AbstractOutlierThreshold(ABC): @@ -14,7 +13,7 @@ class AbstractOutlierThreshold(ABC): Also you need to add the function to outliers.py """ - def __init__(self, df, col_name): + def __init__(self, df, col_name, prefix): """ :param df: Spark Dataframe @@ -25,6 +24,7 @@ def __init__(self, df, col_name): self.df = df self.col_name = one_list_to_val(parse_columns(df, col_name)) + self.tmp_col = name_col(self.col_name, prefix) def select(self): """ @@ -32,10 +32,8 @@ def select(self): :return: """ - col_name = self.col_name - upper_bound, lower_bound = dict_filter(self.whiskers(), ["upper_bound", "lower_bound"]) - - return self.df.rows.select((F.col(col_name) > upper_bound) | (F.col(col_name) < lower_bound)) + df = self.df + return df.rows.select(F.col(self.tmp_col) > self.threshold).cols.drop(self.tmp_col) def drop(self): """ @@ -43,10 +41,8 @@ def drop(self): :return: """ - col_name = self.col_name - upper_bound, lower_bound = dict_filter(self.whiskers(), ["upper_bound", "lower_bound"]) - print(upper_bound, lower_bound) - return self.df.rows.drop((F.col(col_name) > upper_bound) | (F.col(col_name) < lower_bound)) + df = self.df + return df.rows.drop(F.col(self.tmp_col) >= self.threshold).cols.drop(self.tmp_col) def count_lower_bound(self, bound): """ @@ -62,24 +58,22 @@ def count_upper_bound(self, bound): :return: """ col_name = self.col_name - return self.df.rows.select(self.df[col_name] > bound).count() + return self.df.rows.select(self.df[col_name] >= bound).count() def count(self): """ Count the outliers rows using the selected column :return: """ - col_name = self.col_name - return self.df.rows.select((F.col(col_name) > self.upper_bound) | (F.col(col_name) < self.lower_bound)).count() + return self.select().count() def non_outliers_count(self): """ Count non outliers rows using the selected column :return: """ - col_name = self.col_name - return self.df.rows.select( - (F.col(col_name) <= self.upper_bound) | (F.col(col_name) >= self.lower_bound)).count() + df = self.df + return df.rows.select(F.col(self.tmp_col) < self.threshold).cols.drop(self.tmp_col).count() @abstractmethod def info(self, output: str = "dict"): diff --git a/optimus/outliers/modified_z_score.py b/optimus/outliers/modified_z_score.py index e35ca1e7..744bba9d 100644 --- a/optimus/outliers/modified_z_score.py +++ b/optimus/outliers/modified_z_score.py @@ -34,8 +34,8 @@ def __init__(self, df, col_name, threshold, relative_error=RELATIVE_ERROR): self.threshold = threshold self.relative_error = relative_error self.col_name = one_list_to_val(parse_columns(df, col_name)) - - super().__init__(df, col_name) + self.df_score = self._m_z_score() + super().__init__(self.df_score, col_name, "modified_z_score") def _m_z_score(self): df = self.df diff --git a/optimus/outliers/z_score.py b/optimus/outliers/z_score.py index 0f3e2725..cd9c78fc 100644 --- a/optimus/outliers/z_score.py +++ b/optimus/outliers/z_score.py @@ -14,52 +14,34 @@ class ZScore(AbstractOutlierThreshold): def __init__(self, df, col_name, threshold): """ - :param df: Spark Dataframe + :para df: :param col_name: + :param threshold: """ - if not is_dataframe(df): raise TypeError("Spark Dataframe expected") - self.df = df - if not is_numeric(threshold): raise TypeError("Numeric expected") - self.threshold = threshold + self.df = df + self.threshold = threshold self.col_name = one_list_to_val(parse_columns(df, col_name)) + self.tmp_col = name_col(col_name, "z_score") + self.df_score = self.z_score() + super().__init__(self.df_score, col_name, "z_score") - super().__init__(df, col_name) - - def drop(self): + def z_score(self): + df = self.df col_name = self.col_name - z_col_name = name_col(col_name, "z_score") - threshold = self.threshold - return self.df.cols.z_score(col_name, z_col_name) \ - .rows.drop(F.col(z_col_name) > threshold) \ - .cols.drop(z_col_name) - - def select(self): - col_name = self.col_name - z_col_name = name_col(col_name, "z_score") - - return self.df.cols.z_score(col_name, z_col_name) \ - .rows.select(F.col(z_col_name) > self.threshold) \ - .cols.drop(z_col_name) - - def non_outliers_count(self): - return self.drop().count() - - def count(self): - return self.select().count() + return df.cols.z_score(col_name, output_cols=self.tmp_col) def info(self): - col_name = self.col_name - z_col_name = name_col(col_name, "z_score") + self.tmp_col = name_col(self.col_name, "z_score") - max_z_score = self.df.cols.z_score(col_name, z_col_name) \ - .cols.max(z_col_name) + df = self.z_score() + max_z_score = df.rows.select(F.col(self.tmp_col) > self.threshold).cols.max(self.tmp_col) return {"count_outliers": self.count(), "count_non_outliers": self.non_outliers_count(), "max_z_score": max_z_score} diff --git a/tests/test_df_outliers.py b/tests/test_df_outliers.py index bf9e027a..17eaecd7 100644 --- a/tests/test_df_outliers.py +++ b/tests/test_df_outliers.py @@ -45,7 +45,7 @@ def test_outliers_modified_z_score_count(): actual_df =source_df.outliers.modified_z_score('height(ft)',0.5,10000).count() actual_df =json_enconding(actual_df) expected_value =json_enconding(3) - assert (expected_value == actual_df) + assert(expected_value == actual_df) @staticmethod def test_outliers_modified_z_score_drop(): actual_df =source_df.outliers.modified_z_score('height(ft)',0.5,10000).drop() @@ -56,13 +56,13 @@ def test_outliers_modified_z_score_info(): actual_df =source_df.outliers.modified_z_score('height(ft)',0.5,10000).info() actual_df =json_enconding(actual_df) expected_value =json_enconding({'count_outliers': 3, 'count_non_outliers': 2, 'max_m_z_score': 21.20928}) - assert (expected_value == actual_df) + assert(expected_value == actual_df) @staticmethod def test_outliers_modified_z_score_non_outliers_count(): actual_df =source_df.outliers.modified_z_score('height(ft)',0.5,10000).non_outliers_count() actual_df =json_enconding(actual_df) expected_value =json_enconding(2) - assert (expected_value == actual_df) + assert(expected_value == actual_df) @staticmethod def test_outliers_modified_z_score_select(): actual_df =source_df.outliers.modified_z_score('height(ft)',0.5,10000).select()