diff --git a/CHANGELOG.md b/CHANGELOG.md index 5cbdb88..3c5e7c6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [1.0.2] - 2023-06-14 + +### Added + +- Performance gain analysis plot + ## [1.0.1.2] - 2023-06-02 ### Fixed diff --git a/docs/conf.py b/docs/conf.py index c9fc82e..0c9693d 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -22,7 +22,7 @@ author = 'Adrian StaƄdo' # The full version, including alpha/beta/rc tags -release = '1.0.1.2' +release = '1.0.2' # -- General configuration --------------------------------------------------- diff --git a/edgaro/explain/explainer.py b/edgaro/explain/explainer.py index cdcf755..a303f51 100644 --- a/edgaro/explain/explainer.py +++ b/edgaro/explain/explainer.py @@ -6,7 +6,8 @@ import os import multiprocessing -from typing import List, Optional, Literal +from typing import List, Optional, Literal, Callable +from sklearn.metrics import balanced_accuracy_score from edgaro.data.dataset import Dataset from edgaro.model.model import Model @@ -44,6 +45,10 @@ class Explainer: Random state seed. B : int, optional, default=10 Number of permutation rounds to perform on each variable - applicable only if explanation_type='VI'. + performance_metric_name : str, default='balanced_accuracy' + Name of the performance metric. + performance_metric : callable, default=balanced_accuracy_score + Name of the performance metric. Attributes ---------- @@ -67,6 +72,10 @@ class Explainer: Random state seed B : int, optional Number of permutation rounds to perform on each variable - applicable only if explanation_type='VI'. + performance_metric_name : str + Name of the performance metric. + performance_metric : callable + Name of the performance metric. References ---------- @@ -77,7 +86,8 @@ class Explainer: def __init__(self, model: Model, N: Optional[int] = None, explanation_type: Literal['PDP', 'ALE', 'VI'] = 'PDP', verbose: bool = False, processes: int = 1, random_state: Optional[int] = None, - B: Optional[int] = 10) -> None: + B: Optional[int] = 10, performance_metric_name: str = 'balanced_accuracy', + performance_metric: Callable[[pd.Series, pd.Series], float] = balanced_accuracy_score) -> None: self.model = model self.explainer = None self.name = model.name @@ -87,6 +97,8 @@ def __init__(self, model: Model, N: Optional[int] = None, explanation_type: Lite self.processes = processes self.random_state = random_state self.B = B + self.performance_metric_name = performance_metric_name + self.performance_metric = performance_metric if self.processes == -1: self.processes = multiprocessing.cpu_count() @@ -134,6 +146,9 @@ def transform(self, variables: Optional[List[str]] = None) -> Explanation: ------- Explanation """ + performance = self.model.evaluate(metrics_output_class=[self.performance_metric]) + performance = performance['value'].iloc[0] + if self.explainer is None: raise Exception('Explainer was not fitted!') category_colnames_base = self.model.get_category_colnames() @@ -156,7 +171,8 @@ def transform(self, variables: Optional[List[str]] = None) -> Explanation: print_unbuffered(f'{self.explanation_type} was calculated calculated in {self.__repr__()} for ' f'{self.model.get_test_dataset().name}') - return ModelProfileExplanation(dict_output, self.name, self.model.get_category_colnames()) + return ModelProfileExplanation(dict_output, self.name, self.model.get_category_colnames(), + performance, self.performance_metric_name) elif self.explanation_type == 'VI': explanation_type = 'variable_importance' self.__transform_feature_importance(dict_output, variables, explanation_type) @@ -165,7 +181,8 @@ def transform(self, variables: Optional[List[str]] = None) -> Explanation: print_unbuffered(f'{self.explanation_type} was calculated calculated in {self.__repr__()} for ' f'{self.model.get_test_dataset().name}') - return ModelPartsExplanation(dict_output, self.name) + return ModelPartsExplanation(dict_output, self.name, + performance, self.performance_metric_name) else: raise Exception('Wrong curve type!') diff --git a/edgaro/explain/explainer_array.py b/edgaro/explain/explainer_array.py index 14dff15..5dac793 100644 --- a/edgaro/explain/explainer_array.py +++ b/edgaro/explain/explainer_array.py @@ -1,8 +1,10 @@ from __future__ import annotations import multiprocessing +import pandas as pd -from typing import Union, Optional, Literal +from typing import Union, Optional, Literal, Callable +from sklearn.metrics import balanced_accuracy_score from edgaro.model.model import Model from edgaro.model.model_array import ModelArray @@ -35,6 +37,10 @@ class ExplainerArray: Random state seed. B : int, optional, default=10 Number of permutation rounds to perform on each variable - applicable only if explanation_type='VI'. + performance_metric_name : str + Name of the performance metric. + performance_metric : callable + Name of the performance metric. Attributes ---------- @@ -58,12 +64,18 @@ class ExplainerArray: Random state seed B : int, optional Number of permutation rounds to perform on each variable - applicable only if explanation_type='VI'. + performance_metric_name : str + Name of the performance metric. + performance_metric : callable + Name of the performance metric. """ def __init__(self, models: Union[Model, ModelArray], N: Optional[int] = None, explanation_type: Literal['PDP', 'ALE', 'VI'] = 'PDP', verbose: bool = False, processes: int = 1, - random_state: Optional[int] = None, B: Optional[int] = 10) -> None: + random_state: Optional[int] = None, B: Optional[int] = 10, + performance_metric_name: str = 'balanced_accuracy', + performance_metric: Callable[[pd.Series, pd.Series], float] = balanced_accuracy_score) -> None: self.models = models self.sub_calculators = None self.name = models.name @@ -75,6 +87,9 @@ def __init__(self, models: Union[Model, ModelArray], N: Optional[int] = None, self.random_state = random_state self.B = B + self.performance_metric_name = performance_metric_name + self.performance_metric = performance_metric + if self.processes == -1: self.processes = multiprocessing.cpu_count() @@ -86,11 +101,14 @@ def fit(self) -> None: def create_sub_calculator(model: Union[Model, ModelArray]): if isinstance(model, Model): calc = Explainer(model=model, N=self.N, explanation_type=self.explanation_type, verbose=self.verbose, - processes=self.processes, random_state=self.random_state, B=self.B) + processes=self.processes, random_state=self.random_state, B=self.B, + performance_metric_name=self.performance_metric_name, + performance_metric=self.performance_metric) else: calc = ExplainerArray(models=model, N=self.N, explanation_type=self.explanation_type, verbose=self.verbose, processes=self.processes, random_state=self.random_state, - B=self.B) + B=self.B, performance_metric_name=self.performance_metric_name, + performance_metric=self.performance_metric) calc.fit() return calc diff --git a/edgaro/explain/explainer_result.py b/edgaro/explain/explainer_result.py index df8b4f4..3c03cab 100644 --- a/edgaro/explain/explainer_result.py +++ b/edgaro/explain/explainer_result.py @@ -52,7 +52,11 @@ def plot(self) -> None: pass @abstractmethod - def compare(self, other: List[Explanation]) -> List[Union[float, list]]: + def compare(self, other: List[Explanation]) -> List[Union[float, List]]: + pass + + @abstractmethod + def compare_performance(self, other: List[Explanation], percent: bool = False) -> List[float]: pass @@ -70,6 +74,10 @@ class ModelProfileExplanation(Explanation): List of categorical variables. explanation_type : {'PDP', 'ALE'}, default='PDP' A curve type. + performance_metric_value : float + Value of the performance metric. + performance_metric_name : str + Name of the performance metric. Attributes ---------- @@ -81,15 +89,22 @@ class ModelProfileExplanation(Explanation): List of categorical variables. explanation_type : {'PDP', 'ALE'} A curve type. + performance_metric_value : float, optional + Value of the performance metric. + performance_metric_name : str, optional + Name of the performance metric. """ def __init__(self, results: Dict[str, Curve], name: str, categorical_columns: List[str], + performance_metric_value: float, performance_metric_name: str, explanation_type: Literal['PDP', 'ALE'] = 'PDP') -> None: self.results = results self.name = name self.categorical_columns = categorical_columns self.explanation_type = explanation_type + self.performance_metric_value = performance_metric_value + self.performance_metric_name = performance_metric_name def __getitem__(self, key: str) -> Optional[Curve]: if key in self.results.keys(): @@ -285,6 +300,30 @@ def __retrieve_explainer_results(inp, explain_results_in): for inp_i in inp.results: ModelProfileExplanation.__retrieve_explainer_results(inp_i, explain_results_in) + def compare_performance(self, other: List[ModelProfileExplanation], percent: bool = False) -> List[float]: + """ + The function returns the difference between performance metric values. This object's value is subtracted + from other. + + Parameters + ---------- + other : list[ModelProfileExplanation] + List of ModelProfileExplanation objects to compare against. + percent : bool, default=False + If True, the percentage change will be returned instead of difference. + + Returns + ---------- + list[float] + """ + + tab = [elem.performance_metric_value - self.performance_metric_value for elem in other] + + if percent: + tab = [tab[i] / self.performance_metric_value for i in range(len(tab))] + + return tab + def compare(self, other: List[ModelProfileExplanation], variable: Optional[Union[str, List[str]]] = None, return_raw_per_variable: bool = False) -> List[Union[float, list]]: """ @@ -389,6 +428,10 @@ class ModelPartsExplanation(Explanation): The name of ModelProfileExplanation. It is best if it is a Model name. explanation_type : {'VI'}, default='VI' An explanation type. + performance_metric_value : float + Value of the performance metric. + performance_metric_name : str + Name of the performance metric. Attributes ---------- @@ -398,12 +441,20 @@ class ModelPartsExplanation(Explanation): The name of ModelProfileExplanation. It is best if it is a Model name. explanation_type : {'VI'}, default='VI' An explanation type. + performance_metric_value : float + Value of the performance metric. + performance_metric_name : str + Name of the performance metric. + """ - def __init__(self, results: Dict[str, float], name: str, explanation_type: Literal['VI'] = 'VI') -> None: + def __init__(self, results: Dict[str, float], name: str, performance_metric_value: float, + performance_metric_name: str, explanation_type: Literal['VI'] = 'VI') -> None: self.results = results self.name = name self.explanation_type = explanation_type + self.performance_metric_value = performance_metric_value + self.performance_metric_name = performance_metric_name def __getitem__(self, key: str) -> Optional[float]: if key in self.results.keys(): @@ -529,6 +580,30 @@ def extraction(a): fontsize='medium' ) + def compare_performance(self, other: List[ModelPartsExplanation], percent: bool = False) -> List[float]: + """ + The function returns the difference between performance metric values. This object's value is subtracted + from other. + + Parameters + ---------- + other : list[ModelPartsExplanation] + List of ModelPartsExplanation objects to compare against. + percent : bool, default=False + If True, the percentage change will be returned instead of difference. + + Returns + ---------- + list[float] + """ + + tab = [elem.performance_metric_value - self.performance_metric_value for elem in other] + + if percent: + tab = [tab[i] / self.performance_metric_value for i in range(len(tab))] + + return tab + def compare(self, other: List[ModelPartsExplanation], variable: Optional[Union[str, List[str]]] = None, max_variables: Optional[int] = None, return_raw: bool = True) -> List[Union[float, list]]: """ diff --git a/edgaro/explain/explainer_result_array.py b/edgaro/explain/explainer_result_array.py index 7847556..873e50a 100644 --- a/edgaro/explain/explainer_result_array.py +++ b/edgaro/explain/explainer_result_array.py @@ -4,10 +4,14 @@ import re from abc import ABC, abstractmethod + +import matplotlib.axes import matplotlib.pyplot as plt import numpy as np from typing import List, Literal, Optional, Union, Tuple + +import pandas as pd from matplotlib.axes import Axes from statsmodels.stats.multitest import fdrcorrection @@ -30,6 +34,10 @@ def compare(self) -> List[Union[float, list]]: def plot_summary(self) -> None: pass + @abstractmethod + def compare_performance(self) -> List[Union[float, List]]: + pass + class ModelProfileExplanationArray(ExplanationArray): """ @@ -181,6 +189,67 @@ def plot(self, variables: Optional[List[str]] = None, n_col: int = 3, figsize: O fig.legend([x.name for x in results], ncol=n_col, loc='lower center') plt.suptitle(f"{self.explanation_type} curves for {self.name}", fontsize=18) + def __filter_for_comparing(self, index_base, model_filter): + if isinstance(index_base, int) and index_base < 0: + index_base = self.results.index(self.results[index_base]) + + def filter_objects(obj): + if model_filter is not None and \ + re.search(model_filter, obj.name) is None: + return False + return True + + def flatten(lst): + out = [] + for i in range(len(lst)): + if not (isinstance(lst[i], list) or isinstance(lst[i], ModelProfileExplanationArray)): + out.append(lst[i]) + else: + tmp = flatten(lst[i]) + out = out + tmp + return out + + base_model = self[index_base] + if base_model is None: + raise Exception('Wrong index_base argument!') + + res = flatten(self.results) + res.remove(self.results[index_base]) + + tab = [res[i] for i in range(len(res)) if filter_objects(res[i])] + return base_model, tab + + def compare_performance(self, index_base: Union[str, int] = -1, model_filter: Optional[str] = None, + percent: bool = False) -> List[Union[float, List]]: + """ + The function returns the difference between performance metric values. index_base-object's value is subtracted + from others. + + Parameters + ---------- + index_base : int, str, default=-1 + Index of a curve to be a base for comparisons. + model_filter : str, optional, default=None + A regex expression to filter the names of the ModelProfileExplanation objects for comparing. + percent : bool, default=False + If True, the percentage change will we returned instead of difference. + + Returns + ---------- + list[float, list] + """ + + if isinstance(self.results[index_base], ModelProfileExplanation): + base_model, tab = self.__filter_for_comparing(index_base, model_filter) + return base_model.compare_performance(tab, percent=percent) + elif np.alltrue([isinstance(res, ModelProfileExplanationArray) for res in self.results]): + return [ + res.compare_performance(index_base=index_base, model_filter=model_filter, percent=percent) + for res in self.results + ] + else: + raise Exception('Wrong result structure!') + def compare(self, variable: Optional[Union[str, List[str]]] = None, index_base: Union[str, int] = -1, return_raw: bool = True, return_raw_per_variable: bool = True, model_filter: Optional[str] = None) \ -> List[Union[float, List]]: @@ -208,38 +277,11 @@ def compare(self, variable: Optional[Union[str, List[str]]] = None, index_base: """ if isinstance(self.results[index_base], ModelProfileExplanation): - if isinstance(index_base, int) and index_base < 0: - index_base = self.results.index(self.results[index_base]) - - def filter_objects(obj): - if model_filter is not None and \ - re.search(model_filter, obj.name) is None: - return False - return True - - def flatten(lst): - out = [] - for i in range(len(lst)): - if not (isinstance(lst[i], list) or isinstance(lst[i], ModelProfileExplanationArray)): - out.append(lst[i]) - else: - tmp = flatten(lst[i]) - out = out + tmp - return out - - base_model = self[index_base] - if base_model is None: - raise Exception('Wrong index_base argument!') - - res = flatten(self.results) - res.remove(self.results[index_base]) + base_model, res = self.__filter_for_comparing(index_base, model_filter) if return_raw: out = [] for i in range(len(res)): - if not filter_objects(res[i]): - continue - if not return_raw_per_variable: out.append(base_model.compare(res[i], variable=variable, return_raw_per_variable=False)[0]) @@ -248,8 +290,7 @@ def flatten(lst): return_raw_per_variable=True)) return out else: - tab = [res[i] for i in range(len(res)) if filter_objects(res[i])] - return base_model.compare(tab, variable=variable, return_raw_per_variable=return_raw_per_variable) + return base_model.compare(res, variable=variable, return_raw_per_variable=return_raw_per_variable) elif np.alltrue([isinstance(res, ModelProfileExplanationArray) for res in self.results]): return [ res.compare(variable=variable, index_base=index_base, return_raw=return_raw, @@ -334,6 +375,102 @@ def flatten(lst): if return_df: return results + def plot_performance_gain_analysis(self, model_filters: Optional[List[str]] = None, filter_labels: [List[str]] = None, + variables: Optional[List[str]] = None, figsize: Optional[Tuple[int, int]] = None, + index_base: Union[str, int] = -1, return_df: bool = False, percent: bool = False, + ax: Optional[matplotlib.axes.Axes] = None): + """ + The function plots performance gain analysis plot which compares ASDD values and + difference in performance metric values. + + Parameters + ---------- + variables : list[str], optional, default=None + Variables for which the plot should be generated. If None, plots for all variables are generated if all the + available ModelProfileExplanation objects have exactly the same set of column names. + figsize : tuple(int, int), optional, default=None + The size of a figure. + model_filters : list[str], optional, default=None + List of regex expressions to filter the names of the ModelProfileExplanation objects for comparing. + Each element in the list creates a new boxplot. If None, one boxplot of all results is plotted. + filter_labels : list[str], optional, default=None + Labels of model filters. + index_base : int, str, default=-1 + Index of a curve to be a base for comparisons. + return_df : bool, default=False + If True, the method returns a dataframe on which a plot is created. + percent : bool, default=False + If True, the percentage change will be plotted instead of difference. + ax : matplotlib.axes.Axes, optional + ax to plot on + """ + x = self.results + while not isinstance(x, ModelProfileExplanation): + x = x[0] + performance_metric_name = x.performance_metric_name + + if ax is None: + fig, ax = plt.subplots(figsize=figsize) + + plt.title(f'Performance gain analysis of {self.explanation_type}\nfor {self.name} with {performance_metric_name} metric') + plt.xlabel('Performance gain') + plt.ylabel(r'ASDD values [$10^{-3}$]') + + def format_func(value, tick_number): + return str(int(value * 10 ** 3)) + + def flatten(lst): + out = [] + for i in range(len(lst)): + if not isinstance(lst[i], list): + out.append(lst[i]) + else: + tmp = flatten(lst[i]) + out = out + tmp + return out + + ax.yaxis.set_major_formatter(plt.FuncFormatter(format_func)) + + if model_filters is None: + results = self.compare(variable=variables, index_base=index_base, + return_raw=True, return_raw_per_variable=False) + results = flatten(results) + + performance = self.compare_performance(index_base=index_base, percent=percent) + performance = flatten(performance) + + if filter_labels is not None and len(filter_labels) == 1: + plt.scatter(x=performance, y=results, label=filter_labels[0]) + plt.legend() + else: + plt.scatter(x=performance, y=results) + + else: + results = [] + performance = [] + for i in range(len(model_filters)): + f = model_filters[i] + tmp_res = self.compare(variable=variables, index_base=index_base, model_filter=f, + return_raw=True, return_raw_per_variable=False) + tmp_res = flatten(tmp_res) + results.append(tmp_res) + + tmp_per = self.compare_performance(index_base=index_base, percent=percent, model_filter=f) + tmp_per = flatten(tmp_per) + performance.append(tmp_per) + + if filter_labels is not None: + if len(filter_labels) == len(model_filters): + plt.scatter(x=tmp_per, y=tmp_res, label=filter_labels[i], c=i) + else: + raise Exception('Incorrect length of filter_labels!') + else: + plt.scatter(x=tmp_per, y=tmp_res, c=i) + plt.legend() + + if return_df: + return results, performance + def __str__(self) -> str: return f"ModelProfileExplanationArray {self.name} for {len(self.results)} variables: {list(self.results)} with {self.explanation_type} curve type" @@ -429,6 +566,57 @@ def plot(self, variable: Optional[Union[str, List[str]]] = None, max_variables: base.plot(variable=variable, figsize=figsize, max_variables=max_variables, add_plot=plots, ax=ax, show_legend=show_legend, x_lim=x_lim, metric_precision=metric_precision) + def __filter_for_comparing(self, index_base, model_filter): + if isinstance(index_base, int) and index_base < 0: + index_base = self.results.index(self.results[index_base]) + + def filter_objects(obj): + if model_filter is not None and \ + re.search(model_filter, obj.name) is None: + return False + return True + + base_model = self[index_base] + if base_model is None: + raise Exception('Wrong index_base argument!') + + res = ModelPartsExplanationArray.__flatten(self.results) + res.remove(self.results[index_base]) + + tab = [res[i] for i in range(len(res)) if filter_objects(res[i])] + return base_model, tab + + def compare_performance(self, index_base: Union[str, int] = -1, model_filter: Optional[str] = None, + percent: bool = False) -> List[Union[float, List]]: + """ + The function returns the difference between performance metric values. index_base-object's value is subtracted + from others. + + Parameters + ---------- + index_base : int, str, default=-1 + Index of a curve to be a base for comparisons. + model_filter : str, optional, default=None + A regex expression to filter the names of the ModelProfileExplanation objects for comparing. + percent : bool, default=False + If True, the percentage change will we returned instead of difference. + + Returns + ---------- + list[float, list] + """ + + if isinstance(self.results[index_base], ModelProfileExplanation): + base_model, tab = self.__filter_for_comparing(index_base, model_filter) + return base_model.compare_performance(tab, percent=percent) + elif np.alltrue([isinstance(res, ModelProfileExplanationArray) for res in self.results]): + return [ + res.compare_performance(index_base=index_base, model_filter=model_filter, percent=percent) + for res in self.results + ] + else: + raise Exception('Wrong result structure!') + def compare(self, variable: Optional[Union[str, List[str]]] = None, max_variables: Optional[int] = None, return_raw: bool = True, index_base: Union[str, int] = -1, model_filter: Optional[str] = None) \ -> List[Union[float, list]]: @@ -456,23 +644,7 @@ def compare(self, variable: Optional[Union[str, List[str]]] = None, max_variable """ if isinstance(self.results[index_base], ModelPartsExplanation): - if isinstance(index_base, int) and index_base < 0: - index_base = self.results.index(self.results[index_base]) - - def filter_objects(obj): - if model_filter is not None and \ - re.search(model_filter, obj.name) is None: - return False - return True - - base_model = self[index_base] - if base_model is None: - raise Exception('Wrong index_base argument!') - - res = ModelPartsExplanationArray.__flatten(self.results) - res.remove(self.results[index_base]) - - res_filtered = [res[i] for i in range(len(res)) if filter_objects(res[i])] + base_model, res_filtered = self.__filter_for_comparing(index_base, model_filter) return base_model.compare(other=res_filtered, variable=variable, max_variables=max_variables, return_raw=return_raw) elif np.alltrue([isinstance(res, ModelPartsExplanationArray) for res in self.results]): diff --git a/setup.py b/setup.py index ccf5b54..66bfb5b 100644 --- a/setup.py +++ b/setup.py @@ -7,7 +7,7 @@ setup( name="edgaro", - version="1.0.1.2", + version="1.0.2", description="Explainable imbalanceD learninG compARatOr", long_description=long_description, long_description_content_type="text/markdown", diff --git a/test/test_explain.py b/test/test_explain.py index 6306cf1..35ed34f 100644 --- a/test/test_explain.py +++ b/test/test_explain.py @@ -1,4 +1,5 @@ import pytest +import copy from imblearn.under_sampling import RandomUnderSampler from sklearn.metrics import accuracy_score @@ -335,6 +336,22 @@ def test_array_of_arrays_plot_summary_regex(model_array2): assert False +def test_array_of_arrays_plot_performance_gain(model_array2): + rf, t = model_array2 + + # simulate more results + t = copy.deepcopy(t) + t = t[[0, 1]] + t.results[1] = t[0] + t.results.append(t[0]) + t.results.append(t[0]) + + try: + t.plot_performance_gain_analysis() + except (Exception,): + assert False + + def test_array_of_arrays_compare(model_array2): rf, t = model_array2 column = rf.get_models()[0].get_train_dataset().data.columns[0]