From a10eb01b94cce23f355b11f4e2972637742d2834 Mon Sep 17 00:00:00 2001 From: Claudio Salvatore Arcidiacono <22871978+ClaudioSalvatoreArcidiacono@users.noreply.github.com> Date: Tue, 10 Dec 2024 11:43:34 +0100 Subject: [PATCH] Refactor target encoding and WOE --- sklearo/encoding/base.py | 194 +++++++++++++++++++++- sklearo/encoding/target.py | 261 ++++++++--------------------- sklearo/encoding/woe.py | 298 ++++++---------------------------- tests/encoding/test_target.py | 7 - tests/encoding/test_woe.py | 170 +++++++++---------- 5 files changed, 392 insertions(+), 538 deletions(-) diff --git a/sklearo/encoding/base.py b/sklearo/encoding/base.py index 50d6434..bcc9f97 100644 --- a/sklearo/encoding/base.py +++ b/sklearo/encoding/base.py @@ -1,9 +1,13 @@ import warnings +from abc import abstractmethod +from collections import defaultdict import narwhals as nw -from narwhals.typing import IntoFrameT +from narwhals.typing import IntoFrameT, IntoSeriesT from sklearo.base import BaseTransformer +from sklearo.utils import infer_type_of_target, select_columns +from sklearo.validation import check_if_fitted, check_X_y class BaseOneToOneEncoder(BaseTransformer): @@ -29,3 +33,191 @@ def _handle_missing_values(self, X: IntoFrameT) -> IntoFrameT: .alias(column) for column in self.columns_ ) + + +class BaseTargetEncoder(BaseOneToOneEncoder): + + @abstractmethod + def _calculate_target_statistic( + self, x_y: IntoFrameT, target_col: str, column: str + ) -> dict[str, float | int | None]: + """Calculate the target statistic for a column.""" + raise NotImplementedError + + @nw.narwhalify + @check_X_y + def fit(self, X: IntoFrameT, y: IntoSeriesT) -> "BaseTargetEncoder": + """Fit the encoder. + + Args: + X (DataFrame): The input data. + y (Series): The target variable. + """ + + self.columns_ = list(select_columns(X, self.columns)) + self.encoding_map_ = {} + + X = self._handle_missing_values(X) + + if not hasattr(self, "type_of_target") or self.type_of_target == "auto": + self.type_of_target_ = infer_type_of_target(y) + else: + self.type_of_target_ = self.type_of_target + + if self.type_of_target_ not in self._allowed_types_of_target: + raise ValueError( + f"Invalid type of target '{self.type_of_target_}'. " + f"Allowed types are {self._allowed_types_of_target}." + ) + + if self.type_of_target_ == "binary": + unique_classes = sorted(y.unique().to_list()) + if unique_classes != [0, 1]: + y = y.replace_strict({unique_classes[0]: 0, unique_classes[1]: 1}) + + if "target" in X.columns: + target_col_name = "__target__" + + else: + target_col_name = "target" + + if not self.columns_: + return self + + X_y = X[self.columns_].with_columns(**{target_col_name: y}) + + if self.type_of_target_ == "multiclass": + unique_classes = y.unique().sort().to_list() + self.unique_classes_ = unique_classes + self.encoding_map_ = defaultdict(dict) + if self.unseen == "fill" and self.fill_value_unseen == "mean": + self.mean_target_ = {} + for class_ in unique_classes: + X_y_binarized = X_y.with_columns( + nw.when(nw.col(target_col_name) == class_) + .then(1) + .otherwise(0) + .alias(target_col_name) + ) + for column in self.columns_: + self.encoding_map_[column][class_] = ( + self._calculate_target_statistic( + X_y_binarized[[column, target_col_name]], + target_col=target_col_name, + column=column, + ) + ) + if self.unseen == "fill" and self.fill_value_unseen == "mean": + self.mean_target_[class_] = X_y_binarized[target_col_name].mean() + + else: + for column in self.columns_: + self.encoding_map_[column] = self._calculate_target_statistic( + X_y[[column, target_col_name]], + target_col=target_col_name, + column=column, + ) + + self.feature_names_in_ = list(X.columns) + return self + + @nw.narwhalify + @check_if_fitted + def transform(self, X: IntoFrameT) -> IntoFrameT: + """Transform the data. + + Args: + X (DataFrame): The input data. + """ + X = self._handle_missing_values(X) + unseen_per_col = {} + for column, mapping in self.encoding_map_.items(): + if self.type_of_target_ in ("binary", "continuous"): + seen_categories = mapping.keys() + else: + seen_categories = next(iter(mapping.values())).keys() + + uniques = X[column].unique() + unseen_cats = uniques.filter( + (~uniques.is_in(seen_categories) & ~uniques.is_null()) + ).to_list() + if unseen_cats: + unseen_per_col[column] = unseen_cats + + if unseen_per_col: + if self.unseen == "raise": + raise ValueError( + f"Unseen categories {unseen_per_col} found during transform. " + "Please handle unseen categories for example by using a RareLabelEncoder. " + "Alternatively, set unseen to 'ignore'." + ) + else: + warnings.warn( + f"Unseen categories {unseen_per_col} found during transform. " + "Please handle unseen categories for example by using a RareLabelEncoder. " + f"These categories will be encoded as {self.fill_value_unseen}." + ) + + if self.type_of_target_ in ("binary", "continuous"): + return self._transform_binary_continuous(X, unseen_per_col) + + else: # multiclass + return self._transform_multiclass(X, unseen_per_col) + + @check_if_fitted + def get_feature_names_out(self) -> list[str]: + if self.type_of_target_ in ("binary", "continuous"): + return self.feature_names_in_ + + else: # multiclass + return [ + feat for feat in self.feature_names_in_ if feat not in self.columns_ + ] + [ + f"{column}_{self._encoder_name}_class_{class_}" + for column in self.columns_ + for class_ in self.unique_classes_ + ] + + def _transform_binary_continuous( + self, X: nw.DataFrame, unseen_per_col: dict + ) -> IntoFrameT: + fill_value_unseen = ( + self.fill_value_unseen + if self.fill_value_unseen != "mean" or self.unseen != "fill" + else self.mean_target_ + ) + return X.with_columns( + nw.col(column).replace_strict( + { + **mapping, + **{ + cat: fill_value_unseen for cat in unseen_per_col.get(column, []) + }, + } + ) + for column, mapping in self.encoding_map_.items() + ) + + def _transform_multiclass( + self, X: nw.DataFrame, unseen_per_col: dict + ) -> IntoFrameT: + fill_value_unseen = ( + {class_: self.fill_value_unseen for class_ in self.unique_classes_} + if self.fill_value_unseen != "mean" or self.unseen != "fill" + else self.mean_target_ + ) + return X.with_columns( + nw.col(column) + .replace_strict( + { + **mapping, + **{ + cat: fill_value_unseen[class_] + for cat in unseen_per_col.get(column, []) + }, + } + ) + .alias(f"{column}_{self._encoder_name}_class_{class_}") + for column, class_mapping in self.encoding_map_.items() + for class_, mapping in class_mapping.items() + ).drop(self.columns_) diff --git a/sklearo/encoding/target.py b/sklearo/encoding/target.py index 680f6f0..7d91153 100644 --- a/sklearo/encoding/target.py +++ b/sklearo/encoding/target.py @@ -7,12 +7,79 @@ from narwhals.typing import IntoFrameT, IntoSeriesT from pydantic import validate_call -from sklearo.encoding.base import BaseOneToOneEncoder +from sklearo.encoding.base import BaseTargetEncoder from sklearo.utils import infer_type_of_target, select_columns from sklearo.validation import check_if_fitted, check_X_y -class TargetEncoder(BaseOneToOneEncoder): +class TargetEncoder(BaseTargetEncoder): + """ + Target Encoder for categorical features. + + This class provides functionality to encode categorical features using the Target Encoding + technique. Target Encoding replaces each category with the mean of the target variable for that + category. This method is particularly useful for handling categorical variables in machine + learning models, especially when the number of categories is large. + + Args: + columns (str, list[str], list[nw.typing.DTypes]): List of columns to encode. + - If a list of strings is passed, it is treated as a list of column names to encode. + - If a single string is passed instead, it is treated as a regular expression pattern to + match column names. + - If a list of + [`narwhals.typing.DTypes`](https://narwhals-dev.github.io/narwhals/api-reference/dtypes/) + is passed, it will select all columns matching the specified dtype. + + unseen (str): Strategy to handle categories that appear during the `transform` step but were + never encountered in the `fit` step. + - If `'raise'`, an error is raised when unseen categories are found. + - If `'ignore'`, the unseen categories are encoded with the fill_value_unseen. + + fill_value_unseen (int, float, None | Literal["mean"]): Fill value to use for unseen + categories. Defaults to `"mean"`, which will use the mean of the target variable. + + missing_values (str): Strategy to handle missing values. + - If `'encode'`, missing values are initially replaced with a specified fill value and + the mean is computed as if it were a regular category. + - If `'ignore'`, missing values are left as is. + - If `'raise'`, an error is raised when missing values are found. + + type_of_target (str): Type of the target variable. + - If `'auto'`, the type is inferred from the target variable. + - If `'binary'`, the target variable is binary. + - If `'multiclass'`, the target variable is multiclass. + - If `'continuous'`, the target variable is continuous. + + Attributes: + columns_ (list[str]): List of columns to be encoded, learned during fit. + encoding_map_ (dict[str, float]): Mapping of categories to their mean target values, learned + during fit. + + Examples: + ```python + import pandas as pd + from sklearo.encoding import TargetEncoder + data = { + "category": ["A", "A", "B", "B", "C", "C"], + "target": [1, 0, 1, 0, 1, 0], + } + df = pd.DataFrame(data) + encoder = TargetEncoder() + encoder.fit(df[["category"]], df["target"]) + encoded = encoder.transform(df[["category"]]) + print(encoded) + category + 0 0.5 + 1 0.5 + 2 0.5 + 3 0.5 + 4 0.5 + 5 0.5 + ``` + """ + + _encoder_name = "mean_target" + _allowed_types_of_target = ["binary", "multiclass", "continuous"] @validate_call(config=dict(arbitrary_types_allowed=True)) def __init__( @@ -26,16 +93,16 @@ def __init__( missing_values: Literal["encode", "ignore", "raise"] = "encode", type_of_target: Literal["auto", "binary", "multiclass", "continuous"] = "auto", ) -> None: + self.columns = columns self.missing_values = missing_values self.unseen = unseen self.fill_value_unseen = fill_value_unseen self.type_of_target = type_of_target - def _calculate_mean_target( - self, x_y: IntoFrameT, target_col: Sequence[str], column: str + def _calculate_target_statistic( + self, x_y: IntoFrameT, target_col: str, column: str ) -> dict: - debug_df = x_y.to_native() mean_target_all_categories = ( x_y.group_by(column).agg(nw.col(target_col).mean()).rows(named=True) ) @@ -46,187 +113,3 @@ def _calculate_mean_target( ] return mean_target - - @nw.narwhalify - @check_X_y - def fit(self, X: IntoFrameT, y: IntoSeriesT) -> "TargetEncoder": - """Fit the encoder. - - Args: - X (DataFrame): The input data. - y (Series): The target variable. - """ - - self.columns_ = list(select_columns(X, self.columns)) - self.encoding_map_ = {} - - X = self._handle_missing_values(X) - - if self.type_of_target == "auto": - self.type_of_target_ = infer_type_of_target(y) - else: - self.type_of_target_ = self.type_of_target - - if self.type_of_target_ == "binary": - unique_classes = sorted(y.unique().to_list()) - try: - greatest_class_as_int = int(unique_classes[1]) - except ValueError: - self.is_zero_one_target_ = False - else: - if greatest_class_as_int == 1: - self.is_zero_one_target_ = True - else: - self.is_zero_one_target_ = False - - if not self.is_zero_one_target_: - y = y.replace_strict({unique_classes[0]: 0, unique_classes[1]: 1}) - - else: - self.is_zero_one_target_ = False - - if "target" in X.columns: - target_col_name = "__target__" - - else: - target_col_name = "target" - - if not self.columns_: - return self - - X_y = X[self.columns_].with_columns(**{target_col_name: y}) - - if self.type_of_target_ == "multiclass": - unique_classes = y.unique().sort().to_list() - self.unique_classes_ = unique_classes - self.encoding_map_ = defaultdict(dict) - if self.unseen == "fill" and self.fill_value_unseen == "mean": - self.mean_target_ = {} - for class_ in unique_classes: - X_y_binarized = X_y.with_columns( - nw.when(nw.col(target_col_name) == class_) - .then(1) - .otherwise(0) - .alias(target_col_name) - ) - for column in self.columns_: - debug_df = X_y_binarized[[column, target_col_name]].to_native() - self.encoding_map_[column][class_] = self._calculate_mean_target( - X_y_binarized[[column, target_col_name]], - target_col=target_col_name, - column=column, - ) - if self.unseen == "fill" and self.fill_value_unseen == "mean": - self.mean_target_[class_] = X_y_binarized[target_col_name].mean() - - else: - for column in self.columns_: - self.encoding_map_[column] = self._calculate_mean_target( - X_y[[column, target_col_name]], - target_col=target_col_name, - column=column, - ) - - self.feature_names_in_ = list(X.columns) - return self - - def _transform_binary_continuous( - self, X: nw.DataFrame, unseen_per_col: dict - ) -> IntoFrameT: - fill_value_unseen = ( - self.fill_value_unseen - if self.fill_value_unseen != "mean" or self.unseen != "fill" - else self.mean_target_ - ) - return X.with_columns( - nw.col(column).replace_strict( - { - **mapping, - **{ - cat: fill_value_unseen for cat in unseen_per_col.get(column, []) - }, - } - ) - for column, mapping in self.encoding_map_.items() - ) - - def _transform_multiclass( - self, X: nw.DataFrame, unseen_per_col: dict - ) -> IntoFrameT: - fill_value_unseen = ( - {class_: self.fill_value_unseen for class_ in self.unique_classes_} - if self.fill_value_unseen != "mean" or self.unseen != "fill" - else self.mean_target_ - ) - return X.with_columns( - nw.col(column) - .replace_strict( - { - **mapping, - **{ - cat: fill_value_unseen[class_] - for cat in unseen_per_col.get(column, []) - }, - } - ) - .alias(f"{column}_mean_target_class_{class_}") - for column, class_mapping in self.encoding_map_.items() - for class_, mapping in class_mapping.items() - ).drop(self.columns_) - - @check_if_fitted - def get_feature_names_out(self) -> list[str]: - if self.type_of_target_ in ("binary", "continuous"): - return self.feature_names_in_ - - else: # multiclass - return [ - feat for feat in self.feature_names_in_ if feat not in self.columns_ - ] + [ - f"{column}_mean_target_class_{class_}" - for column in self.columns_ - for class_ in self.unique_classes_ - ] - - @nw.narwhalify - @check_if_fitted - def transform(self, X: IntoFrameT) -> IntoFrameT: - """Transform the data. - - Args: - X (DataFrame): The input data. - """ - X = self._handle_missing_values(X) - unseen_per_col = {} - for column, mapping in self.encoding_map_.items(): - if self.type_of_target_ in ("binary", "continuous"): - seen_categories = mapping.keys() - else: - seen_categories = next(iter(mapping.values())).keys() - - uniques = X[column].unique() - unseen_cats = uniques.filter( - (~uniques.is_in(seen_categories) & ~uniques.is_null()) - ).to_list() - if unseen_cats: - unseen_per_col[column] = unseen_cats - - if unseen_per_col: - if self.unseen == "raise": - raise ValueError( - f"Unseen categories {unseen_per_col} found during transform. " - "Please handle unseen categories for example by using a RareLabelEncoder. " - "Alternatively, set unseen to 'ignore'." - ) - else: - warnings.warn( - f"Unseen categories {unseen_per_col} found during transform. " - "Please handle unseen categories for example by using a RareLabelEncoder. " - f"These categories will be encoded as {self.fill_value_unseen}." - ) - - if self.type_of_target_ in ("binary", "continuous"): - return self._transform_binary_continuous(X, unseen_per_col) - - else: # multiclass - return self._transform_multiclass(X, unseen_per_col) diff --git a/sklearo/encoding/woe.py b/sklearo/encoding/woe.py index abd68f2..958a242 100644 --- a/sklearo/encoding/woe.py +++ b/sklearo/encoding/woe.py @@ -7,12 +7,12 @@ from narwhals.typing import IntoFrameT, IntoSeriesT from pydantic import validate_call -from sklearo.encoding.base import BaseOneToOneEncoder -from sklearo.utils import select_columns +from sklearo.encoding.base import BaseTargetEncoder +from sklearo.utils import infer_type_of_target, select_columns from sklearo.validation import check_if_fitted, check_type_of_target, check_X_y -class WOEEncoder(BaseOneToOneEncoder): +class WOEEncoder(BaseTargetEncoder): """Weight of Evidence (WOE) Encoder with support for multiclass classification. This class provides functionality to encode categorical features using the Weight of Evidence @@ -82,8 +82,8 @@ class WOEEncoder(BaseOneToOneEncoder): - If `'raise'`, an error is raised when unseen categories are found. - If `'ignore'`, the unseen categories are encoded with the fill_value_unseen. - fill_value_unseen (int, float, None): Fill value to use for unseen categories. Only used when - `unseen='ignore'`. + fill_value_unseen (int, float, None): Fill value to use for unseen categories. Only used + when `unseen='ignore'`. missing_values (str): Strategy to handle missing values. @@ -126,6 +126,9 @@ class WOEEncoder(BaseOneToOneEncoder): ``` """ + _encoder_name = "WOE" + _allowed_types_of_target = ["binary", "multiclass"] + @validate_call(config=dict(arbitrary_types_allowed=True)) def __init__( self, @@ -134,12 +137,12 @@ def __init__( nw.String, ), underrepresented_categories: Literal["raise", "fill"] = "raise", - fill_values_underrepresented: Sequence[int | float | None] = ( + fill_values_underrepresented: Sequence[float | None] = ( -999.0, 999.0, ), unseen: Literal["raise", "ignore"] = "raise", - fill_value_unseen: int | float | None = 0.0, + fill_value_unseen: float | None = 0.0, missing_values: Literal["encode", "ignore", "raise"] = "encode", ) -> None: self.columns = columns @@ -149,259 +152,50 @@ def __init__( self.unseen = unseen self.fill_value_unseen = fill_value_unseen - def _calculate_woe( - self, x: IntoSeriesT, y: IntoSeriesT, unique_classes: list[Any] + def _calculate_target_statistic( + self, x_y: IntoFrameT, target_col: str, column: str ) -> dict[str, dict[str, float | int | None]]: """Calculate the Weight of Evidence for a column.""" - unique_categories = x.unique().to_list() - if x.name == "target": - target_col_name = "target_" - else: - target_col_name = "target" - - categories_class_info_as_rows = ( - x.to_frame() - .with_columns(**{target_col_name: y}) - .with_columns( - total_events_per_class=nw.col(x.name).count().over(target_col_name), - total_elements_per_category=nw.col(target_col_name) - .count() - .over(x.name), - ) - .group_by(x.name, target_col_name) + total_number_of_events = x_y[target_col].sum() + total_number_of_non_events = x_y.shape[0] - total_number_of_events + total_number_of_events_per_category = ( + x_y.group_by(column, drop_null_keys=True) .agg( - n_events_per_category=nw.col(target_col_name).count(), - total_events_per_class=nw.col("total_events_per_class").max(), - total_elements_per_category=nw.col("total_elements_per_category").max(), - ) - .with_columns( - distribution_of_events_per_category=nw.col("n_events_per_category") - / nw.col("total_events_per_class"), - n_non_events_per_category=nw.col("total_elements_per_category") - - nw.col("n_events_per_category"), - total_number_of_non_events=x.shape[0] - nw.col("n_events_per_category"), - ) - .with_columns( - distribution_of_non_events_per_category=nw.col( - "n_non_events_per_category" - ) - / nw.col("total_number_of_non_events"), - ) - .with_columns( - dist_ratio=nw.col("distribution_of_events_per_category") - / nw.col("distribution_of_non_events_per_category"), - ) - .select( - [ - x.name, - target_col_name, - "dist_ratio", - "n_events_per_category", - "n_non_events_per_category", - ] + n_events=nw.col(target_col).sum(), n_elements=nw.col(target_col).count() ) .rows(named=True) ) - categories_class_info_as_dict = defaultdict(dict) - - for row in categories_class_info_as_rows: - categories_class_info_as_dict[row[x.name]][row[target_col_name]] = { - "dist_ratio": row["dist_ratio"], - "n_events_per_category": row["n_events_per_category"], - "n_non_events_per_category": row["n_non_events_per_category"], - } - # categories_class_info_as_dict = dict(categories_class_info_as_dict) - # categories_class_info_as_dict - woe_dict_per_category = defaultdict(dict) - underrepresented_category_per_class = list() - - for category in sorted(cat for cat in unique_categories if cat is not None): - for class_ in sorted(unique_classes): - category_class_info = categories_class_info_as_dict[category].get( - class_, {} - ) - if not category_class_info: - # This means that the n_events_per_category is 0 - # and that we have only non-events in this category - # the dist_ratio is 0 which would mean a woe of -inf - if self.underrepresented_categories == "raise": - underrepresented_category_per_class.append( - { - "category": category, - "class": class_, - } - ) - else: # fill - woe_dict_per_category[class_][category] = ( - self.fill_values_underrepresented[0] - ) - underrepresented_category_per_class.append( - { - "category": category, - "class": class_, - "fill_value": self.fill_values_underrepresented[0], - } - ) - elif category_class_info["n_non_events_per_category"] == 0: - # This means that the n_non_events_per_category is 0 - # and that we have only events in this category - # the dist_ratio (and woe) would be infinite - if self.underrepresented_categories == "raise": - underrepresented_category_per_class.append( - { - "category": category, - "class": class_, - } - ) - else: # fill - woe_dict_per_category[class_][category] = ( - self.fill_values_underrepresented[1] - ) - underrepresented_category_per_class.append( - { - "category": category, - "class": class_, - "fill_value": self.fill_values_underrepresented[1], - } - ) - else: - woe_dict_per_category[class_][category] = math.log( - category_class_info["dist_ratio"] + woe_dict = {} + for row in total_number_of_events_per_category: + n_events = row["n_events"] + n_non_events = row["n_elements"] - n_events + + if n_events == 0: + # the dist_ratio is 0 which would mean a woe of -inf + if self.underrepresented_categories == "raise": + raise ValueError( + f"Underrepresented category {row[column]} found for the column {column}. " + "Please handle underrepresented categories for example by using a " + "RareLabelEncoder. Alternatively, set underrepresented_categories to " + "'fill'." + ) + else: # fill + woe_dict[row[column]] = self.fill_values_underrepresented[0] + elif n_non_events == 0: + # the dist_ratio (and woe) would be infinite + if self.underrepresented_categories == "raise": + raise ValueError( + f"Underrepresented category {row[column]} found for the column {column}. " + "Please handle underrepresented categories for example by using a " + "RareLabelEncoder. Alternatively, set underrepresented_categories to " + "'fill'." ) - if underrepresented_category_per_class: - if self.underrepresented_categories == "raise": - raise ValueError( - f"Underrepresented categories {underrepresented_category_per_class} found for " - f"the column {x.name}. " - "Please handle underrepresented categories for example by using a " - "RareLabelEncoder. Alternatively, set underrepresented_categories to 'fill'." - ) - else: # Fill - warnings.warn( - f"Underrepresented categories found for the column {x.name}. " - "Please handle underrepresented categories for example by using a " - "RareLabelEncoder. These categories will be encoded using the fill value as: \n" - f"{underrepresented_category_per_class}." - ) - return dict(woe_dict_per_category) - - @nw.narwhalify - @check_X_y - @check_type_of_target("binary", "multiclass") - def fit(self, X: IntoFrameT, y: IntoSeriesT) -> "WOEEncoder": - """Fit the encoder. - - Args: - X (DataFrame): The input data. - y (Series): The target variable. - """ - self.feature_names_in_ = list(X.columns) - self.columns_ = list(select_columns(X, self.columns)) - - X = self._handle_missing_values(X) - - self.encoding_map_ = {} - self.is_zero_one_target_ = False - unique_classes = sorted(y.unique().to_list()) - self.unqiue_classes_ = unique_classes - - if not self.columns_: - return self - - if len(unique_classes) == 2: - unique_classes = [unique_classes[1]] - - try: - greatest_class_as_int = int(unique_classes[0]) - except ValueError: - self.is_zero_one_target_ = False - else: - if greatest_class_as_int == 1: - self.is_zero_one_target_ = True else: - self.is_zero_one_target_ = False - else: - self.is_zero_one_target_ = False - - for column in self.columns_: - self.encoding_map_[column] = self._calculate_woe( - X[column], y, unique_classes - ) - - return self - - @nw.narwhalify - @check_if_fitted - def transform(self, X: IntoFrameT) -> IntoFrameT: - """Transform the data. - - Args: - X (DataFrame): The input data. - """ - X = self._handle_missing_values(X) - unseen_per_col = {} - - for column, mapping in self.encoding_map_.items(): - uniques = X[column].unique() - unseen_cats = uniques.filter( - ( - ~uniques.is_in(next(iter(mapping.values())).keys()) - & ~uniques.is_null() - ) - ).to_list() - if unseen_cats: - unseen_per_col[column] = unseen_cats - - if unseen_per_col: - if self.unseen == "raise": - raise ValueError( - f"Unseen categories {unseen_per_col} found during transform. " - "Please handle unseen categories for example by using a RareLabelEncoder. " - "Alternatively, set unseen to 'ignore'." - ) + woe_dict[row[column]] = self.fill_values_underrepresented[1] else: - warnings.warn( - f"Unseen categories {unseen_per_col} found during transform. " - "Please handle unseen categories for example by using a RareLabelEncoder. " - f"These categories will be encoded as {self.fill_value_unseen}." + woe_dict[row[column]] = math.log( + (n_events / total_number_of_events) + / (n_non_events / total_number_of_non_events) ) - - X_out = X.with_columns( - nw.col(column) - .replace_strict( - { - **mapping, - **{ - cat: self.fill_value_unseen - for cat in unseen_per_col.get(column, []) - }, - } - ) - .alias( - column if self.is_zero_one_target_ else f"{column}_WOE_class_{class_}" - ) - for column, classes_mapping in self.encoding_map_.items() - for class_, mapping in classes_mapping.items() - ) - - # In case of binary target, the original columns are replaced with the encoded columns. - # If it is not a binary target, the original columns need to be dropped before returning. - if not self.is_zero_one_target_: - X_out = X_out.drop(*self.columns_) - - return X_out - - @check_if_fitted - def get_feature_names_out(self) -> list[str]: - """Get the feature names after encoding.""" - if self.is_zero_one_target_: - return self.feature_names_in_ - else: - return [ - feat for feat in self.feature_names_in_ if feat not in self.columns_ - ] + [ - f"{column}_WOE_class_{class_}" - for column, classes_mapping in self.encoding_map_.items() - for class_ in classes_mapping - ] + return woe_dict diff --git a/tests/encoding/test_target.py b/tests/encoding/test_target.py index 8678c90..d7a0e76 100644 --- a/tests/encoding/test_target.py +++ b/tests/encoding/test_target.py @@ -34,7 +34,6 @@ def test_woe_encoder_fit_binary(self, binary_class_data, DataFrame): assert encoder.columns_ == ["category"] assert "category" in encoder.encoding_map_ - assert encoder.is_zero_one_target_ is True def test_woe_encoder_fit_multiclass_non_int_target( self, binary_class_data, DataFrame @@ -45,7 +44,6 @@ def test_woe_encoder_fit_multiclass_non_int_target( assert encoder.columns_ == ["target"] assert "target" in encoder.encoding_map_ - assert encoder.is_zero_one_target_ is False transformed_data = encoder.transform(binary_class_data[["target"]]) np.testing.assert_allclose( @@ -61,7 +59,6 @@ def test_woe_encoder_fit_binary_non_int_target(self, multi_class_data, DataFrame assert encoder.columns_ == ["target"] assert "target" in encoder.encoding_map_ - assert encoder.is_zero_one_target_ is False transformed_data = encoder.transform(multi_class_data[["target"]]) @@ -99,7 +96,6 @@ def test_woe_encoder_fit_binary_non_int_target_classes_1_and_2( assert encoder.columns_ == ["category"] assert "category" in encoder.encoding_map_ - assert encoder.is_zero_one_target_ is False transformed_data = encoder.transform(binary_class_data[["category"]]) @@ -135,7 +131,6 @@ def test_woe_encoder_fit_with_target_in_X_binary( assert encoder.columns_ == ["category", "target"] assert "category" in encoder.encoding_map_ - assert encoder.is_zero_one_target_ is True def test_woe_encoder_fit_with_target_in_X_multi_class( self, multi_class_data, DataFrame @@ -147,7 +142,6 @@ def test_woe_encoder_fit_with_target_in_X_multi_class( assert encoder.columns_ == ["category", "target"] assert "category" in encoder.encoding_map_ - assert encoder.is_zero_one_target_ is False def test_woe_encoder_fit_with_empty_columns(self, multi_class_data, DataFrame): multi_class_data = DataFrame(multi_class_data) @@ -164,7 +158,6 @@ def test_woe_encoder_fit_multi_class(self, multi_class_data, DataFrame): assert encoder.columns_ == ["category"] assert "category" in encoder.encoding_map_ - assert encoder.is_zero_one_target_ is False def test_woe_encoder_transform_binary(self, binary_class_data, DataFrame): binary_class_data = DataFrame(binary_class_data) diff --git a/tests/encoding/test_woe.py b/tests/encoding/test_woe.py index e978f09..3b8f5a3 100644 --- a/tests/encoding/test_woe.py +++ b/tests/encoding/test_woe.py @@ -34,7 +34,6 @@ def test_woe_encoder_fit_binary(self, binary_class_data, DataFrame): assert encoder.columns_ == ["category"] assert "category" in encoder.encoding_map_ - assert encoder.is_zero_one_target_ is True def test_woe_encoder_fit_multiclass_non_int_target( self, binary_class_data, DataFrame @@ -45,21 +44,20 @@ def test_woe_encoder_fit_multiclass_non_int_target( assert encoder.columns_ == ["target"] assert "target" in encoder.encoding_map_ - assert encoder.is_zero_one_target_ is False transformed_data = encoder.transform(binary_class_data[["target"]]) np.testing.assert_allclose( transformed_data["target_WOE_class_A"].to_list(), [ - -0.405465, - 0.847298, - 0.847298, - -0.405465, - -0.405465, - 0.847298, - -0.405465, - -0.405465, - 0.847298, + -0.693147, + 0.693147, + 0.693147, + -0.693147, + -0.693147, + 0.693147, + -0.693147, + -0.693147, + 0.693147, ], rtol=1e-5, ) @@ -71,28 +69,27 @@ def test_woe_encoder_fit_binary_non_int_target(self, multi_class_data, DataFrame assert encoder.columns_ == ["target"] assert "target" in encoder.encoding_map_ - assert encoder.is_zero_one_target_ is False transformed_data = encoder.transform(multi_class_data[["target"]]) assert ( encoder.get_feature_names_out() - == ["target_WOE_class_B"] + == ["target"] == list(transformed_data.columns) ) np.testing.assert_allclose( - transformed_data["target_WOE_class_B"].to_list(), + transformed_data["target"].to_list(), [ - -0.105361, - -0.105361, - 1.163151, - 0.470004, - 0.470004, - -0.105361, - 1.163151, - 1.163151, - 0.470004, - 0.470004, + -0.693147, + -0.693147, + 0.693147, + 0.0, + 0.0, + -0.693147, + 0.693147, + 0.693147, + 0.0, + 0.0, ], rtol=1e-5, ) @@ -109,19 +106,28 @@ def test_woe_encoder_fit_binary_non_int_target_classes_1_and_2( assert encoder.columns_ == ["category"] assert "category" in encoder.encoding_map_ - assert encoder.is_zero_one_target_ is False transformed_data = encoder.transform(binary_class_data[["category"]]) assert ( encoder.get_feature_names_out() - == ["category_WOE_class_2"] + == ["category"] == list(transformed_data.columns) ) np.testing.assert_allclose( - transformed_data["category_WOE_class_2"].to_list(), - [1.252763, 1.252763, 1.252763, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0], + transformed_data["category"].to_list(), + [ + 0.916291, + 0.916291, + 0.916291, + -0.470004, + -0.470004, + -0.470004, + -0.470004, + -0.470004, + -0.470004, + ], rtol=1e-5, ) @@ -133,12 +139,10 @@ def test_woe_encoder_fit_with_target_in_X_binary( columns=["category", "target"], underrepresented_categories="fill" ) - with pytest.warns(UserWarning): - encoder.fit(binary_class_data, binary_class_data["target"]) + encoder.fit(binary_class_data, binary_class_data["target"]) assert encoder.columns_ == ["category", "target"] assert "category" in encoder.encoding_map_ - assert encoder.is_zero_one_target_ is True def test_woe_encoder_fit_with_target_in_X_multi_class( self, multi_class_data, DataFrame @@ -148,12 +152,10 @@ def test_woe_encoder_fit_with_target_in_X_multi_class( columns=["category", "target"], underrepresented_categories="fill" ) - with pytest.warns(UserWarning): - encoder.fit(multi_class_data, multi_class_data["target"]) + encoder.fit(multi_class_data, multi_class_data["target"]) assert encoder.columns_ == ["category", "target"] assert "category" in encoder.encoding_map_ - assert encoder.is_zero_one_target_ is False def test_woe_encoder_fit_with_target_in_X_multi_class_raise_underrepresented( self, multi_class_data, DataFrame @@ -163,7 +165,7 @@ def test_woe_encoder_fit_with_target_in_X_multi_class_raise_underrepresented( columns=["category", "target"], underrepresented_categories="raise" ) - with pytest.raises(ValueError, match="Underrepresented categories"): + with pytest.raises(ValueError, match="Underrepresented category"): encoder.fit(multi_class_data, multi_class_data["target"]) def test_woe_encoder_fit_with_empty_columns(self, multi_class_data, DataFrame): @@ -181,7 +183,6 @@ def test_woe_encoder_fit_multi_class(self, multi_class_data, DataFrame): assert encoder.columns_ == ["category"] assert "category" in encoder.encoding_map_ - assert encoder.is_zero_one_target_ is False def test_woe_encoder_transform_binary(self, binary_class_data, DataFrame): binary_class_data = DataFrame(binary_class_data) @@ -189,16 +190,20 @@ def test_woe_encoder_transform_binary(self, binary_class_data, DataFrame): encoder.fit(binary_class_data[["category"]], binary_class_data["target"]) transformed = encoder.transform(binary_class_data[["category"]]) + # for category A: + # log((1/5)/(2/4)) = -0.916291... + # for categories B and C: + # log((2/5)/(1/4)) = 0.470004... expected_values = [ - -0.223144, - -0.223144, - -0.223144, - 1.029619, - 1.029619, - 1.029619, - 1.029619, - 1.029619, - 1.029619, + -0.916291, + -0.916291, + -0.916291, + 0.470004, + 0.470004, + 0.470004, + 0.470004, + 0.470004, + 0.470004, ] np.testing.assert_allclose( transformed["category"].to_list(), expected_values, rtol=1e-5 @@ -221,16 +226,16 @@ def test_woe_encoder_transform_multi_class(self, multi_class_data, DataFrame): transformed["category_WOE_class_1"], # For class 1 A counts : 2, B counts : 1 [ - 0.575364, - 0.575364, - 0.575364, - 0.575364, - 0.575364, - -0.287682, - -0.287682, - -0.287682, - -0.287682, - -0.287682, + 0.441833, + 0.441833, + 0.441833, + 0.441833, + 0.441833, + -0.538997, + -0.538997, + -0.538997, + -0.538997, + -0.538997, ], rtol=1e-5, ) @@ -239,16 +244,16 @@ def test_woe_encoder_transform_multi_class(self, multi_class_data, DataFrame): transformed["category_WOE_class_2"], # For class 2 A counts : 1, B counts : 2 [ - -0.287682, - -0.287682, - -0.287682, - -0.287682, - -0.287682, - 0.575364, - 0.575364, - 0.575364, - 0.575364, - 0.575364, + -0.538997, + -0.538997, + -0.538997, + -0.538997, + -0.538997, + 0.441833, + 0.441833, + 0.441833, + 0.441833, + 0.441833, ], rtol=1e-5, ) @@ -256,18 +261,7 @@ def test_woe_encoder_transform_multi_class(self, multi_class_data, DataFrame): np.testing.assert_allclose( transformed["category_WOE_class_3"], # For class 3 A counts : 2, B counts : 2 - [ - 0.287682, - 0.287682, - 0.287682, - 0.287682, - 0.287682, - 0.287682, - 0.287682, - 0.287682, - 0.287682, - 0.287682, - ], + [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0], rtol=1e-5, ) @@ -280,11 +274,9 @@ def test_woe_encoder_handle_missing_values_binary( encoder = WOEEncoder( missing_values="encode", underrepresented_categories="fill" ) - with pytest.warns(UserWarning): - encoder.fit(binary_class_data[["category"]], binary_class_data["target"]) - transformed = encoder.transform(binary_class_data[["category"]]) + encoder.fit(binary_class_data[["category"]], binary_class_data["target"]) - assert "MISSING" in encoder.encoding_map_["category"][1] + assert "MISSING" in encoder.encoding_map_["category"] def test_woe_encoder_handle_missing_values_multi_class( self, multi_class_data, DataFrame @@ -295,8 +287,7 @@ def test_woe_encoder_handle_missing_values_multi_class( encoder = WOEEncoder( missing_values="encode", underrepresented_categories="fill" ) - with pytest.warns(UserWarning): - encoder.fit(multi_class_data[["category"]], multi_class_data["target"]) + encoder.fit(multi_class_data[["category"]], multi_class_data["target"]) transformed = encoder.transform(multi_class_data[["category"]]) assert "MISSING" in encoder.encoding_map_["category"][1] @@ -311,7 +302,9 @@ def test_woe_encoder_unseen_category_binary(self, binary_class_data, DataFrame): transformed = encoder.transform(new_data) np.testing.assert_allclose( - transformed["category"].to_list(), [-0.223144, 1.029619, -999], rtol=1e-5 + transformed["category"].to_list(), + [-0.9162907, 0.4700036, -999.0], + rtol=1e-5, ) def test_woe_encoder_unseen_category_binary_raise( @@ -344,11 +337,10 @@ def test_woe_encoder_underrepresented_category_binary( encoder = WOEEncoder( underrepresented_categories="fill", fill_values_underrepresented=(-999, 999) ) - with pytest.warns(UserWarning): - encoder.fit( - binary_class_data[["category"]], - binary_class_data["target"], - ) + encoder.fit( + binary_class_data[["category"]], + binary_class_data["target"], + ) transformed = encoder.transform(binary_class_data[["category"]]) assert transformed["category"].to_list()[-1] == -999