From 17075c83713fe6b4e3676bc31c64c94b1180177d Mon Sep 17 00:00:00 2001 From: johanos1 Date: Tue, 26 Mar 2024 19:22:57 +0000 Subject: [PATCH] ruff 100% --- data/adult.pkl | Bin 37986786 -> 37986786 bytes leakpro.py | 17 +-- leakpro/__init__.py | 1 + leakpro/dataset.py | 156 ++++++++++++------------ leakpro/metrics/attack_result.py | 43 +++---- leakpro/mia_attacks/attack_factory.py | 27 +++- leakpro/mia_attacks/attack_objects.py | 13 +- leakpro/mia_attacks/attack_scheduler.py | 43 ++++--- leakpro/mia_attacks/attack_utils.py | 141 ++++++++++----------- leakpro/mia_attacks/attacks/attack.py | 74 ++++++++--- leakpro/mia_attacks/attacks/attack_p.py | 36 +++--- leakpro/mia_attacks/attacks/rmia.py | 20 ++- leakpro/model.py | 4 +- leakpro/train.py | 14 +-- test_adult/models_metadata.pkl | Bin 8152 -> 8152 bytes 15 files changed, 324 insertions(+), 265 deletions(-) diff --git a/data/adult.pkl b/data/adult.pkl index ae0275cfce1503186935407e42805adac3345b61..d006c3e8dcaf40f3d3f25dff9b0e980e8e1b4a16 100644 GIT binary patch delta 2075 zcmWmDSKJT+07dcN_qF%ld+$9@d!UHSl7y`6D5H!Jt_m$PBV8PMxk+N5}Jl)p?PQ#T837kb!ZdXhIXNS=ny)F zPN8$?61s+Np?l~NdIqQV3cW+0&^Po8{lkDTFboQV!+v4^a6lLmhK2*fL19=pI2;m& zheN|*;qY)oI5Hd+Mud^!=rAfA6Gn$)!ez+mxN2hWnoIVJX{g33{%5Z;p%WrNVqmk z3)91la9y}Q+z@7lSz&gV6Xu2+!%gAla7(x~+!p4A+ru5<&Tv zSHo*zMOYbL4{wB3VRd*jycOOK?}T^5n($tDKYS2A3~R%>@KN|Ud=fqlpM}rE7vanB zRahUs4&Q`t!*^jr*ciSKKZH%;$FMnU2|tCO!`84Z{1Uc@9bsqK6?TVT!*AjD@JIMF z{1yHV|Aal^->^6An?b=$3RI{fDym{Ct`aJ#QYx)7DywoTuL`QDN~)|Xs;X+Lt{SST zTB@x&s;hdcuLf$UMry1kYN}>xt`=&kR%)#_YO8i?uMX;{PU@^K>Z)$)t{&>C{~LR$ zxB95B`l-JLXrKmZu=dmbIzU4-R0rxH4b#CoM8kEc4%6W}LPzQ-jnGIPtx-Bgqjjvt z=r|p(6Es#Q>Li`4Q*^4v=`@|LGjyiT(%Cvk<8`ji(*#Y_`MN+C>LN|jWL>OFbg3@W z6kV<>bfu>1DqXE>6kV%nnywkTPS@)O&D1Q-)*Q{%jk-xU>lWRr+cZzN>ki$iyEI>S z>mJ>!1-ei7>j6EehxD)>(Lz0{$FxX`^|+qUlUkyu^t7JQQa!8Z^t_ho1-+=3v|KOi z6}_t0v_dQOy57(#t=5}*OK0QWgrcEXC>~0LlA%;69m<5Vp9omGppbpuwU3e91so+2Ze*fA>q()SU5Z! z5r%{#!%^Yra7;Kh92brcCxoHl#BfqLISdP@gyG@Ta9S7~KyP8AgS3 z!+GKSa6uRyE({lii^G_3Nf;Z(g-gTua9NlTE)Q3PiD6Q>GF%lVhbdudm=>-M3D<;c z!*yYLxIWwvZVWTR%y3hf6>bi*!<=wSxHa4sZVz{aJHuV!?r=}IH_Q$5!u+rx+!yW- z4}=H9L*e1@NLUyi4UdJ#!xLdqSR9@VPlcz$GvV3rTzEdb5MB%~g(czTurw?SuY^~_ zYvJ{S=eht5cZQ=LuNBA>r4}XQf!$0BQ zup{gYyTb19U)U4&PN!f71u9e#6;&}6R|%C=DV0_kl~p;FR|Qp6B~?}xRaG@rR}Ixv zE!9>X)m1&!R|7RvBQ;hNHB~b;R|~aNE45Y|wN*Q{R|j=eCv{dAbyYWYR}b~nzQ$ha ztv>3je(J9Q8mK`Uto^jV4$y%*NC)c>9je20xQ@^e9jT*qw2slSI!?#y1P#@RI!PyM zm`>4fovPC`LZ|BtovE{Qw$9N=jncU~Pv`3bjn;*_NEd62F40(x)1?}(%QQik>k3WO zBweYiG+9$LRnv5}qHA=muG4f~uN!ouW@x5v(k$Jq*_xwUbgORD?YcvE>Mq@_dvven zYM$n6f$r1&dO#2AAw8@|v`~-gF+Hv)v`CBfq@L2#dPdLcIX$lz^rBwU61}XYTBcX@ zs$SFUTCNp(LvQLWy{&ijuHMu8`amD*BYmt-^r=>Al~!wwKGWy=LSO1DeXX@xr}g?q z-|9PU(D(X58?{NB^`o}vC;hCg`bEF$H*M4J`a^$eyZ+MO`bYn2hjwb0cI!Xw(cXgT z6ncLZs)&lJn2M`}N~)AftBlI3oXV?$DyotytBR_snyRaYYO0oMtB&fbp6aWC8mf^R ztBIPbnVPGGTB?;=tBu;Ko!YB|I;xX8tBbm-o4TuqdTL)|FZEU*^;JLh*8mOFpu%Mg IE?l_afAPA>J^%m! diff --git a/leakpro.py b/leakpro.py index a870a53b..764c9c00 100644 --- a/leakpro.py +++ b/leakpro.py @@ -1,18 +1,19 @@ """Main script to run LEAKPRO on a target model.""" -import joblib import logging -import numpy as np -from pathlib import Path import random import time +from pathlib import Path + +import joblib +import numpy as np import torch import yaml +import leakpro.train as util from leakpro import dataset, models from leakpro.mia_attacks.attack_scheduler import AttackScheduler from leakpro.reporting.utils import prepare_priavcy_risk_report -import leakpro.train as util def setup_log(name: str, save_file: bool) -> logging.Logger: @@ -51,8 +52,8 @@ def setup_log(name: str, save_file: bool) -> logging.Logger: if __name__ == "__main__": RETRAIN = True - #args = "./config/adult.yaml" # noqa: ERA001 - args = "./config/cifar10.yaml" + args = "./config/adult.yaml" # noqa: ERA001 + #args = "./config/cifar10.yaml" # noqa: ERA001 with open(args, "rb") as f: configs = yaml.safe_load(f) @@ -73,7 +74,7 @@ def setup_log(name: str, save_file: bool) -> logging.Logger: # ------------------------------------------------ # Create the population dataset - population = dataset.get_dataset(configs["data"]["dataset"], configs["data"]["data_dir"]) + population = dataset.get_dataset(configs["data"]["dataset"], configs["data"]["data_dir"], logger) N_population = len(population) # Create target training dataset and test dataset @@ -118,7 +119,7 @@ def setup_log(name: str, save_file: bool) -> logging.Logger: target_model_path = f"{log_dir}/model_0.pkl" with open(target_model_path, "rb") as f: if "adult" in configs["data"]["dataset"]: - target_model = models.NN(s + target_model = models.NN( configs["train"]["inputs"], configs["train"]["outputs"] ) # TODO: read metadata to get the model elif "cifar10" in configs["data"]["dataset"]: diff --git a/leakpro/__init__.py b/leakpro/__init__.py index e69de29b..8ae43ec6 100644 --- a/leakpro/__init__.py +++ b/leakpro/__init__.py @@ -0,0 +1 @@ +"""Dummy init file for leakpro package.""" diff --git a/leakpro/dataset.py b/leakpro/dataset.py index 7e3ab1a3..46816405 100644 --- a/leakpro/dataset.py +++ b/leakpro/dataset.py @@ -1,7 +1,11 @@ +"""Module that contains the dataset class and functions for preparing the dataset for training the target models.""" + +import logging import os import pickle -from typing import List +from typing import List, Self +import joblib import numpy as np import pandas as pd import torch @@ -13,80 +17,62 @@ class GeneralDataset(Dataset): - def __init__(self, data:np.ndarray, label:np.ndarray, transforms=None): - """data_list: A list of GeneralData instances. - """ - self.X = data # Convert to tensor and specify the data type + """Dataset class for general data.""" + + def __init__(self:Self, data:np.ndarray, label:np.ndarray, transforms:torch.nn.Module=None) -> None: + """data_list: A list of GeneralData instances.""" + self.x = data # Convert to tensor and specify the data type self.y = label # Assuming labels are for classification self.transforms = transforms - def __len__(self): + def __len__(self:Self) -> int: + """Return the length of the dataset.""" return len(self.y) - def __getitem__(self, idx): - """Returns the data and label for a single instance indexed by idx. - """ - if self.transforms: - X = self.transforms(self.X[idx]) - else: - X = self.X[idx] + def __getitem__(self:Self, idx:int) -> List[torch.Tensor]: + """Return the data and label for a single instance indexed by idx.""" + x = self.transforms(self.x[idx]) if self.transforms else self.x[idx] - # ensure that X is a tensor - if not isinstance(X, torch.Tensor): - X = torch.tensor(X, dtype=torch.float32) + # ensure that x is a tensor + if not isinstance(x, torch.Tensor): + x = torch.tensor(x, dtype=torch.float32) y = torch.tensor(self.y[idx], dtype=torch.long) - return X, y + return x, y -class TabularDataset(Dataset): - """Tabular dataset.""" +class InfiniteRepeatDataset(GeneralDataset): + """Dataset class for infinite repeat data.""" - def __init__(self, X, y): - """Initializes instance of class TabularDataset. + def __init__(self:Self, x:np.ndarray, y:np.ndarray, transform:torch.nn.Module=None) -> None: + """Initialize the InfiniteRepeatDataset class. Args: ---- - X (str): features - y (str): target + x (np.ndarray): The input data. + y (np.ndarray): The target labels. + transform (torch.nn.Module, optional): The data transformation module. Defaults to None. """ - super().__init__( - data_dict={"X": X, "y": y}, - default_input="X", - default_output="y", - ) - - def __len__(self): - return len(self.data_dict["y"]) - - def __getitem__(self, idx): - # Convert idx from tensor to list due to pandas bug (that arises when using pytorch's random_split) - if isinstance(idx, torch.Tensor): - idx = idx.tolist() - X = np.float32(self.data_dict["X"][idx]) - y = np.float32(self.data_dict["y"][idx]) - return [X, y] - + super().__init__(x, y, transform) -class InfiniteRepeatDataset(Dataset): - def __init__(self, dataset): - self.dataset = dataset - - def __len__(self): + def __len__(self:Self) -> int: + """Return the length of the dataset.""" return len(self.dataset) - def __getitem__(self, idx): - return self.dataset[idx % len(self.dataset)] + def __getitem__(self:Self, idx:int) -> List[torch.Tensor]: + """Return the data and label for a single instance indexed by idx.""" + return self.x[idx % len(self.dataset)], self.y[idx % len(self.dataset)] -def get_dataset(dataset_name: str, data_dir: str): +def get_dataset(dataset_name: str, data_dir: str, logger:logging.Logger) -> GeneralDataset: + """Get the dataset.""" path = f"{data_dir}/{dataset_name}" if os.path.exists(f"{path}.pkl"): with open(f"{path}.pkl", "rb") as file: - all_data = pickle.load(file) - print(f"Load data from {path}.pkl") + all_data = joblib.load(file) + logger.info(f"Load data from {path}.pkl") elif "adult" in dataset_name: column_names = [ "age", @@ -110,51 +96,51 @@ def get_dataset(dataset_name: str, data_dir: str): f"{path}/{dataset_name}.test", names=column_names, header=0 ) df_test["income"] = df_test["income"].str.replace(".", "", regex=False) - df = pd.concat([df_train, df_test], axis=0) - df = df.replace(" ?", np.nan) - df = df.dropna() - X, y = df.iloc[:, :-1], df.iloc[:, -1] + df_concatenated = pd.concat([df_train, df_test], axis=0) + df_replaced = df_concatenated.replace(" ?", np.nan) + df_clean = df_replaced.dropna() + x, y = df_clean.iloc[:, :-1], df_clean.iloc[:, -1] - categorical_features = [col for col in X.columns if X[col].dtype == "object"] + categorical_features = [col for col in x.columns if x[col].dtype == "object"] numerical_features = [ - col for col in X.columns if X[col].dtype in ["int64", "float64"] + col for col in x.columns if x[col].dtype in ["int64", "float64"] ] onehot_encoder = OneHotEncoder(sparse_output=False, handle_unknown="ignore") - X_categorical = onehot_encoder.fit_transform(X[categorical_features]) + x_categorical = onehot_encoder.fit_transform(x[categorical_features]) scaler = StandardScaler() - X_numerical = scaler.fit_transform(X[numerical_features]) + x_numerical = scaler.fit_transform(x[numerical_features]) - X = np.hstack([X_numerical, X_categorical]) + x = np.hstack([x_numerical, x_categorical]) # label encode the target variable to have the classes 0 and 1 y = LabelEncoder().fit_transform(y) - all_data = GeneralDataset(X,y) + all_data = GeneralDataset(x,y) with open(f"{path}.pkl", "wb") as file: pickle.dump(all_data, file) - print(f"Save data to {path}.pkl") + logger.info(f"Save data to {path}.pkl") elif "cifar10" in dataset_name: transform = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]) trainset = torchvision.datasets.CIFAR10(root="./data/cifar10", train=True, download=True, transform=transform) testset = torchvision.datasets.CIFAR10(root="./data/cifar10", train=False,download=True, transform=transform) - X = np.vstack([trainset.data, testset.data]) + x = np.vstack([trainset.data, testset.data]) y = np.hstack([trainset.targets, testset.targets]) - all_data = GeneralDataset(X, y, transform) + all_data = GeneralDataset(x, y, transform) with open(f"{path}.pkl", "wb") as file: pickle.dump(all_data, file) - print(f"Save data to {path}.pkl") + logger.info(f"Save data to {path}.pkl") return all_data def get_split( all_index: List[int], used_index: List[int], size: int, split_method: str -): - """Select points based on the splitting methods +) -> np.ndarray: + """Select points based on the splitting methods. Args: ---- @@ -190,8 +176,8 @@ def get_split( return selected_index -def prepare_train_test_datasets(dataset_size: int, configs: dict): - """Prepare the dataset for training the target models when the training data are sampled uniformly from the distribution (pool of all possible data). +def prepare_train_test_datasets(dataset_size: int, configs: dict) -> dict: + """Prepare the dataset for training the target models when the training data are sampled uniformly from the population. Args: ---- @@ -201,7 +187,7 @@ def prepare_train_test_datasets(dataset_size: int, configs: dict): Returns: ------- - dict: Data split information which saves the information of training points index and test points index for all target models. + dict: Data split information which saves the information of training points index and test points index. """ # The index_list will save all the information about the train, test and auit for each target model. @@ -211,20 +197,20 @@ def prepare_train_test_datasets(dataset_size: int, configs: dict): selected_index = np.random.choice(all_index, train_size + test_size, replace=False) train_index, test_index = train_test_split(selected_index, test_size=test_size) - dataset_train_test = {"train_indices": train_index, "test_indices": test_index} - return dataset_train_test + return {"train_indices": train_index, "test_indices": test_index} -def get_dataset_subset(dataset: Dataset, indices: List[int]): +def get_dataset_subset(dataset: Dataset, indices: List[int]) -> Dataset: """Get a subset of the dataset. Args: ---- dataset (torchvision.datasets): Whole dataset. - index (list): List of index. + indices (list): List of indices. """ - assert max(indices) < len(dataset) and min(indices) >= 0, "Index out of range" + if max(indices) >= len(dataset) or min(indices) < 0: + raise ValueError("Index out of range") data = dataset.X targets = dataset.y @@ -232,17 +218,30 @@ def get_dataset_subset(dataset: Dataset, indices: List[int]): subset_data = [data[idx] for idx in indices] subset_targets = [targets[idx] for idx in indices] - new_dataset = dataset.__class__(subset_data, subset_targets, transforms) + return dataset.__class__(subset_data, subset_targets, transforms) - return new_dataset def get_dataloader( dataset: GeneralDataset, batch_size: int, - loader_type="torch", + loader_type: str = "torch", shuffle: bool = True, -): +) -> torch.utils.data.DataLoader: + """Get a data loader for the given dataset. + + Args: + ---- + dataset (GeneralDataset): The dataset to load. + batch_size (int): The batch size. + loader_type (str, optional): The type of data loader. Defaults to "torch". + shuffle (bool, optional): Whether to shuffle the data. Defaults to True. + + Returns: + ------- + torch.utils.data.DataLoader: The data loader. + + """ if loader_type == "torch": return torch.utils.data.DataLoader( dataset, @@ -253,3 +252,4 @@ def get_dataloader( persistent_workers=True, prefetch_factor=16, ) + return None diff --git a/leakpro/metrics/attack_result.py b/leakpro/metrics/attack_result.py index 429caa57..bb037da6 100644 --- a/leakpro/metrics/attack_result.py +++ b/leakpro/metrics/attack_result.py @@ -1,4 +1,5 @@ -from typing import List +"""Contains the AttackResult class, which stores the results of an attack.""" +from typing import Any, List, Self import numpy as np from sklearn.metrics import ( @@ -15,19 +16,17 @@ class AttackResult: - """Contains results related to the performance of the attack. - """ + """Contains results related to the performance of the attack.""" - def __init__( - self, + def __init__( # noqa: PLR0913 + self:Self, predicted_labels: list, true_labels: list, predictions_proba: List[List[float]] = None, - signal_values=None, + signal_values:List[Any]=None, threshold: float = None, - ): - """Constructor. - Computes and stores the accuracy, ROC AUC score, and the confusion matrix for a metric. + ) -> None: + """Compute and stores the accuracy, ROC AUC score, and the confusion matrix for a metric. Args: ---- @@ -61,9 +60,8 @@ def __init__( y_true=true_labels, y_pred=predicted_labels ).ravel() - def __str__(self): - """Returns a string describing the metric result. - """ + def __str__(self:Self) -> str: + """Return a string describing the metric result.""" txt = [ f'{" METRIC RESULT OBJECT ":=^48}', f"Accuracy = {self.accuracy}", @@ -75,19 +73,17 @@ def __str__(self): class CombinedMetricResult: - """Contains results related to the performance of the metric. It contains the results for multiple fpr. - """ + """Contains results related to the performance of the metric. It contains the results for multiple fpr.""" - def __init__( - self, + def __init__( # noqa: PLR0913 + self:Self, predicted_labels: list, true_labels: list, - predictions_proba=None, - signal_values=None, + predictions_proba:list=None, + signal_values:list=None, threshold: list = None, - ): - """Constructor. - Computes and stores the accuracy, ROC AUC score, and the confusion matrix for a metric. + )-> None: + """Compute and store the accuracy, ROC AUC score, and the confusion matrix for a metric. Args: ---- @@ -118,9 +114,8 @@ def __init__( self.fp / (np.sum(true_labels == 0)), self.tp / (np.sum(true_labels == 1)) ) - def __str__(self): - """Returns a string describing the metric result. - """ + def __str__(self:Self) -> str: + """Return a string describing the metric result.""" txt_list = [] for idx in range(len(self.accuracy)): txt = [ diff --git a/leakpro/mia_attacks/attack_factory.py b/leakpro/mia_attacks/attack_factory.py index 503aa5af..30544368 100644 --- a/leakpro/mia_attacks/attack_factory.py +++ b/leakpro/mia_attacks/attack_factory.py @@ -1,17 +1,38 @@ +"""Module that contains the AttackFactory class which is responsible for creating the attack objects.""" + from leakpro.mia_attacks.attack_utils import AttackUtils +from leakpro.mia_attacks.attacks.attack import AttackAbstract from leakpro.mia_attacks.attacks.attack_p import AttackP from leakpro.mia_attacks.attacks.rmia import AttackRMIA class AttackFactory: + """Class responsible for creating the attack objects.""" + attack_classes = { "attack_p": AttackP, "rmia": AttackRMIA, } @classmethod - def create_attack(cls, name: str, attack_utils: AttackUtils, configs: dict): + def create_attack(cls, name: str, attack_utils: AttackUtils, configs: dict) -> AttackAbstract: # noqa: ANN102 + """Create an attack object based on the given name, attack_utils, and configs. + + Args: + ---- + name (str): The name of the attack. + attack_utils (AttackUtils): An instance of AttackUtils. + configs (dict): The attack configurations. + + Returns: + ------- + AttackBase: An instance of the attack object. + + Raises: + ------ + ValueError: If the attack type is unknown. + + """ if name in cls.attack_classes: return cls.attack_classes[name](attack_utils, configs) - else: - raise ValueError(f"Unknown attack type: {name}") + raise ValueError(f"Unknown attack type: {name}") diff --git a/leakpro/mia_attacks/attack_objects.py b/leakpro/mia_attacks/attack_objects.py index a97278fb..085af4da 100644 --- a/leakpro/mia_attacks/attack_objects.py +++ b/leakpro/mia_attacks/attack_objects.py @@ -216,9 +216,9 @@ def get_optimizer(self:Self, model: Module, configs: dict) -> torch.optim.Optimi learning_rate = configs.get("learning_rate", 0.01) weight_decay = configs.get("weight_decay", 0) momentum = configs.get("momentum", 0) - self.logger(f"Load the optimizer {optimizer}: ", end=" ") - self.logger(f"Learning rate {learning_rate}", end=" ") - self.logger(f"Weight decay {weight_decay} ") + self.logger.info(f"Load the optimizer {optimizer}") + self.logger.info(f"Learning rate {learning_rate}") + self.logger.info(f"Weight decay {weight_decay}") if optimizer == "SGD": return SGD(model.parameters(), @@ -292,11 +292,8 @@ def train_shadow_model(self:Self, shadow_model: Module, shadow_train_loader: Dat # Add the loss to the total loss train_loss += loss.item() - self.logger.info(f"Epoch: {epoch_idx+1}/{epochs} |", end=" ") - self.logger.info(f"Train Loss: {train_loss/len(shadow_train_loader):.8f} ", end=" ") - self.logger.info(f"Train Acc: {float(train_acc)/len(shadow_train_loader.dataset):.8f} ", end=" ") - - self.logger.info(f"One step uses {time.time() - start_time:.2f} seconds") + log_train_str = f"Epoch: {epoch_idx+1}/{epochs} | Train Loss: {train_loss/len(shadow_train_loader):.8f} | Train Acc: {float(train_acc)/len(shadow_train_loader.dataset):.8f} | One step uses {time.time() - start_time:.2f} seconds" # noqa: E501 + self.logger.info(log_train_str) # Move the model back to the CPU shadow_model.to("cpu") diff --git a/leakpro/mia_attacks/attack_scheduler.py b/leakpro/mia_attacks/attack_scheduler.py index c871e268..7cfd6ac8 100644 --- a/leakpro/mia_attacks/attack_scheduler.py +++ b/leakpro/mia_attacks/attack_scheduler.py @@ -1,3 +1,11 @@ +"""Module that contains the AttackScheduler class, which is responsible for creating and executing attacks.""" +import logging +from typing import Any, Dict, Self + +import numpy as np +import torch + +from leakpro.dataset import GeneralDataset from leakpro.mia_attacks.attack_factory import AttackFactory from leakpro.mia_attacks.attack_objects import AttackObjects from leakpro.mia_attacks.attack_utils import AttackUtils @@ -5,16 +13,18 @@ class AttackScheduler: - def __init__( - self, - population, - train_test_dataset, - target_model, - target_model_metadata, - configs, - logs_dirname, - logger, - ): + """Class responsible for creating and executing attacks.""" + + def __init__( # noqa: D107, PLR0913 + self:Self, + population:GeneralDataset, + train_test_dataset:np.ndarray, + target_model:torch.nn.Module, + target_model_metadata:Dict[str, Any], # noqa: ARG002 + configs:Dict[str, Any], + logs_dirname:str, + logger:logging.Logger + ) -> None: self.attack_list = configs["audit"]["attack_list"] self.attacks = [] @@ -28,15 +38,17 @@ def __init__( attack = AttackFactory.create_attack(attack_name, attack_utils, configs) self.add_attack(attack) except ValueError as e: - print(e) + logger.info(e) self.logs_dirname = logs_dirname self.logger = logger - def add_attack(self, attack: AttackAbstract): + def add_attack(self:Self, attack: AttackAbstract) -> None: + """Add an attack to the list of attacks.""" self.attacks.append(attack) - def run_attacks(self): + def run_attacks(self:Self) -> Dict[str, Any]: + """Run the attacks and return the results.""" results = {} for attack, attack_type in zip(self.attacks, self.attack_list): self.logger.info(f"Preparing attack: {attack_type}") @@ -50,6 +62,7 @@ def run_attacks(self): self.logger.info(f"Finished attack: {attack_type}") return results - def identify_attacks(self, model, dataset): - # Identify relevant attacks based on adversary setting + def identify_attacks(self:Self) -> None: + """Identify relevant attacks based on adversary setting.""" + # Implementation goes here pass diff --git a/leakpro/mia_attacks/attack_utils.py b/leakpro/mia_attacks/attack_utils.py index 9274ce60..c71d7894 100644 --- a/leakpro/mia_attacks/attack_utils.py +++ b/leakpro/mia_attacks/attack_utils.py @@ -1,4 +1,5 @@ -from typing import List +"""Module that contains utility functions that are used in the attack classes.""" +from typing import Any, Dict, List, Self import numpy as np from scipy.stats import norm @@ -7,11 +8,16 @@ class AttackUtils: - def __init__(self, attack_objects: AttackObjects): + """Utility class for the attacks.""" + + def __init__(self:Self, attack_objects: AttackObjects)->None: + """Initialize the AttackUtils class.""" self.attack_objects = attack_objects - def flatten_array(self, arr): - """Utility function to recursively flatten a list of lists. + def flatten_array(self:Self, arr: List[Any]) -> np.ndarray: + """Recursively flatten a list of lists. + + Utility function that recursively flattens a list of lists. Each element in the list can be another list, tuple, set, or np.ndarray, and can have variable sizes. @@ -32,8 +38,8 @@ def flatten_array(self, arr): flat_array.append(item) return np.array(flat_array) - def default_quantile(): - """Return the default fprs + def default_quantile() -> np.ndarray: + """Return the default fprs. Returns ------- @@ -42,21 +48,53 @@ def default_quantile(): """ return np.logspace(-5, 0, 100) - def prepare_attack_dataset(self, configs: dict): + def prepare_attack_dataset(self:Self, configs: dict) -> Dict[str, np.ndarray]: + """Prepare the attack dataset based on the provided configurations. + + Args: + ---- + configs: Dictionary containing the configurations for preparing the attack dataset. + + Returns: + ------- + Dictionary containing the audit indices for the attack dataset. + + """ audit_size = int( configs["data"]["f_audit"] * self.attack_objects.population_size ) audit_index = self.sample_dataset_no_overlap(audit_size) - audit_dataset = {"audit_indices": audit_index} - return audit_dataset + return {"audit_indices": audit_index} - def sample_dataset_uniformly(self, size: float): + def sample_dataset_uniformly(self:Self, size: float) -> np.ndarray: + """Sample the dataset uniformly. + + Args: + ---- + size: The size of the dataset to sample. + + Returns: + ------- + np.ndarray: The selected indices of the dataset. + + """ all_index = np.arange(self.attack_objects.population_size) if size <= len(all_index): selected_index = np.random.choice(all_index, size, replace=False) return selected_index - def sample_dataset_no_overlap(self, size: float): + def sample_dataset_no_overlap(self:Self, size: float) -> np.ndarray: + """Sample the dataset without overlap. + + Args: + ---- + size: The size of the dataset to sample. + + Returns: + ------- + np.ndarray: The selected indices of the dataset. + + """ all_index = np.arange(self.attack_objects.population_size) used_index = np.concatenate( ( @@ -74,11 +112,8 @@ def sample_dataset_no_overlap(self, size: float): - def threshold_func( - distribution: List[float], alpha: List[float], **kwargs - ) -> float: - """Function that returns the threshold as the alpha quantile of - the provided distribution. + def threshold_func(self:Self, distribution: List[float], alpha: List[float], **kwargs: Dict[str, Any]) -> float: + """Return the threshold as the alpha quantile of the provided distribution. Args: ---- @@ -86,33 +121,34 @@ def threshold_func( the threshold is computed. alpha: Quantile value that will be used to obtain the threshold from the distribution. + **kwargs: Additional keyword arguments. Returns: ------- threshold: alpha quantile of the provided distribution. """ - threshold = np.quantile(distribution, q=alpha, interpolation="lower", **kwargs) - return threshold + return np.quantile(distribution, q=alpha, interpolation="lower", **kwargs) ######################################################################################################################## # HYPOTHESIS TEST: LINEAR INTERPOLATION THRESHOLDING ######################################################################################################################## + def linear_itp_threshold_func( - self, + self:Self, distribution: List[float], alpha: List[float], - **kwargs, + **kwargs: Dict[str, Any], ) -> float: - """Function that returns the threshold as the alpha quantile of - a linear interpolation curve fit over the provided distribution. + """Return the threshold as the alpha quantile of a linear interpolation curve fit over the provided distribution. Args: ---- distribution: Sequence of values that form the distribution from which - the threshold is computed. (Here we only consider positive signal values.) + the threshold is computed. (Here we only consider positive signal values.) alpha: Quantile value that will be used to obtain the threshold from the distribution. + **kwargs: Additional keyword arguments. Returns: ------- @@ -162,20 +198,20 @@ def linear_itp_threshold_func( # HYPOTHESIS TEST: LOGIT RESCALE THRESHOLDING ######################################################################################################################## def logit_rescale_threshold_func( - self, + self:Self, distribution: List[float], - alpha: List[float], - **kwargs, + alpha: List[float] ) -> float: - """Function that returns the threshold as the alpha quantile of a Gaussian fit - over logit rescaling transform of the provided distribution + """Return the threshold as the alpha quantile of a Gaussian fit over logit rescaling transform. + Args: + ---- distribution: Sequence of values that form the distribution from which - the threshold is computed. (Here we only consider positive signal values.) + the threshold is computed. (Here we only consider positive signal values.) alpha: Quantile value that will be used to obtain the threshold from the distribution. - Returns + Returns: ------- threshold: alpha quantile of the provided distribution. @@ -185,6 +221,7 @@ def logit_rescale_threshold_func( np.divide(np.exp(-distribution), (1 - np.exp(-distribution))) ) + if len(distribution.shape) > 1: parameters = np.array( [norm.fit(distribution[i]) for i in range(distribution.shape[0])] @@ -196,30 +233,25 @@ def logit_rescale_threshold_func( alpha = np.array(alpha).reshape(-1, 1).repeat(num_points, 1) threshold = norm.ppf(1 - np.array(alpha), loc=loc.T, scale=scale.T) else: - print("none") - print(np.sum(distribution == -np.inf)) loc, scale = norm.fit(distribution) threshold = norm.ppf(1 - np.array(alpha), loc=loc, scale=scale) - threshold = np.log(np.exp(threshold) + 1) - threshold - return threshold + return np.log(np.exp(threshold) + 1) - threshold ######################################################################################################################## # HYPOTHESIS TEST: GAUSSIAN THRESHOLDING ######################################################################################################################## def gaussian_threshold_func( - self, + self:Self, distribution: List[float], - alpha: List[float], - **kwargs, + alpha: List[float] ) -> float: - """Function that returns the threshold as the alpha quantile of - a Gaussian curve fit over the provided distribution. + """Return the threshold as the alpha quantile of a Gaussian curve fit over the provided distribution. Args: ---- distribution: Sequence of values that form the distribution from which - the threshold is computed. + the threshold is computed. alpha: Quantile value that will be used to obtain the threshold from the distribution. @@ -243,32 +275,3 @@ def gaussian_threshold_func( threshold = norm.ppf(alpha, loc=loc, scale=scale) return threshold - # ######################################################################################################################## - # # HYPOTHESIS TEST: MIN LINEAR LOGIT RESCALE THRESHOLDING - # ######################################################################################################################## - # def min_linear_logit_threshold_func( - # self, - # distribution: List[float], - # alpha: List[float], - # **kwargs, - # ) -> float: - # """ - # Function that returns the threshold as the minimum of 1) alpha quantile of - # a linear interpolation curve fit over the provided distribution, and 2) alpha - # quantile of a Gaussian fit over logit rescaling transform of the provided - # distribution - # Args: - # distribution: Sequence of values that form the distribution from which - # the threshold is computed. (Here we only consider positive signal values.) - # alpha: Quantile value that will be used to obtain the threshold from the - # distribution. - # Returns: - # threshold: alpha quantile of the provided distribution. - # """ - - # threshold_linear = linear_itp_threshold_func(distribution, alpha, **kwargs) - # threshold_logit = logit_rescale_threshold_func(distribution, alpha, **kwargs) - - # threshold = np.minimum(threshold_logit, threshold_linear) - - # return threshold diff --git a/leakpro/mia_attacks/attacks/attack.py b/leakpro/mia_attacks/attacks/attack.py index 66d59333..cc4f530c 100644 --- a/leakpro/mia_attacks/attacks/attack.py +++ b/leakpro/mia_attacks/attacks/attack.py @@ -1,8 +1,10 @@ +"""Module that contains the abstract class for constructing and performing a membership inference attack on a target.""" + from abc import ABC, abstractmethod -from typing import List, Union +from typing import List, Self, Union -from ...metrics.attack_result import AttackResult -from ..attack_utils import AttackUtils +from leakpro.metrics.attack_result import AttackResult +from leakpro.mia_attacks.attack_utils import AttackUtils ######################################################################################################################## # METRIC CLASS @@ -10,49 +12,81 @@ class AttackAbstract(ABC): - """Interface to construct and perform a membership inference attack on a target model and dataset using auxiliary - information specified by the user. This serves as a guideline for implementing a metric to be used for measuring - the privacy leakage of a target model. + """Interface to construct and perform a membership inference attack on a target model and dataset. + + This serves as a guideline for implementing a metric to be used for measuring the privacy leakage of a target model. """ def __init__( - self, + self:Self, attack_utils: AttackUtils, - ): + )->None: + """Initialize the AttackAbstract class. + + Args: + ---- + attack_utils (AttackUtils): An instance of the AttackUtils class containing the attack objects. + + """ self.population = attack_utils.attack_objects.population self.population_size = attack_utils.attack_objects.population_size self.target_model = attack_utils.attack_objects.target_model self.audit_dataset = attack_utils.attack_objects.audit_dataset self.signal_data = [] + @property - def get_population(self): + def get_population(self:Self)-> List: + """Get the population used for the attack. + + Returns + ------- + List: The population used for the attack. + + """ return self.population @property - def get_population_size(self): + def get_population_size(self:Self)-> int: + """Get the size of the population used for the attack. + + Returns + ------- + int: The size of the population used for the attack. + + """ return self.population_size @property - def get_target_model(self): + def get_target_model(self:Self)-> Union[Self, List[Self] ]: + """Get the target model used for the attack. + + Returns + ------- + Union[Self, List[Self]]: The target model used for the attack. + + """ return self.target_model @property - def get_audit_dataset(self): + def get_audit_dataset(self:Self)-> Self: + """Get the audit dataset used for the attack. + + Returns + ------- + Self: The audit dataset used for the attack. + + """ return self.audit_dataset @abstractmethod - def prepare_attack(self): - """Function to prepare data needed for running the metric on the target model and dataset, using signals computed - on the auxiliary model(s) and dataset. - """ + def prepare_attack(self:Self) -> None: + """Prepare data needed for running the metric on the target model and dataset.""" pass @abstractmethod - def run_attack( - self, fpr_tolerance_rate_list=None - ) -> Union[AttackResult, List[AttackResult]]: - """Function to run the metric on the target model and dataset. + def run_attack(self:Self) -> Union[AttackResult, List[AttackResult]]: + """Run the metric on the target model and dataset. Args: ---- diff --git a/leakpro/mia_attacks/attacks/attack_p.py b/leakpro/mia_attacks/attacks/attack_p.py index 23282e90..9ccbdc61 100644 --- a/leakpro/mia_attacks/attacks/attack_p.py +++ b/leakpro/mia_attacks/attacks/attack_p.py @@ -1,3 +1,7 @@ +"""Module that contains the implementation of the attack P.""" + +from typing import Self + import numpy as np from leakpro.dataset import get_dataset_subset @@ -8,9 +12,17 @@ class AttackP(AttackAbstract): + """Implementation of the P-attack.""" + + def __init__(self:Self, attack_utils: AttackUtils, configs: dict) -> None: + """Initialize the AttackP class. - def __init__(self, attack_utils: AttackUtils, configs: dict): + Args: + ---- + attack_utils (AttackUtils): An instance of the AttackUtils class. + configs (dict): A dictionary containing the attack configurations. + """ # Initializes the parent metric super().__init__(attack_utils) @@ -23,10 +35,8 @@ def __init__(self, attack_utils: AttackUtils, configs: dict): self.signal = ModelLoss() self.hypothesis_test_func = attack_utils.linear_itp_threshold_func - def prepare_attack(self): - """Function to prepare data needed for running the metric on the target model and dataset, using signals computed - on the auxiliary model(s) and dataset. - """ + def prepare_attack(self:Self) -> None: + """Prepare data needed for running the metric on the target model and dataset.""" # sample dataset to compute histogram all_index = np.arange(self.population_size) attack_data_size = np.round( @@ -41,8 +51,8 @@ def prepare_attack(self): # signals based on training dataset self.attack_signal = self.signal([self.target_model], [attack_data])[0] - def run_attack(self, fpr_tolerance_rate_list=None): - """Function to run the attack on the target model and dataset. + def run_attack(self:Self) -> CombinedMetricResult: + """Run the attack on the target model and dataset. Args: ---- @@ -55,10 +65,7 @@ def run_attack(self, fpr_tolerance_rate_list=None): """ # map the threshold with the alpha - if fpr_tolerance_rate_list is not None: - self.quantiles = fpr_tolerance_rate_list - else: - self.quantiles = AttackUtils.default_quantile() + self.quantiles = AttackUtils.default_quantile() # obtain the threshold values based on the reference dataset thresholds = self.hypothesis_test_func( self.attack_signal, self.quantiles @@ -97,15 +104,10 @@ def run_attack(self, fpr_tolerance_rate_list=None): [self.in_member_signals, self.out_member_signals] ) - # compute the difference between the signals and the thresholds - # predictions_proba = np.hstack([member_signals, non_member_signals]) - thresholds - # compute ROC, TP, TN etc - metric_result = CombinedMetricResult( + return CombinedMetricResult( predicted_labels=predictions, true_labels=true_labels, predictions_proba=None, signal_values=signal_values, ) - - return metric_result diff --git a/leakpro/mia_attacks/attacks/rmia.py b/leakpro/mia_attacks/attacks/rmia.py index 92d50df0..ca1b9990 100644 --- a/leakpro/mia_attacks/attacks/rmia.py +++ b/leakpro/mia_attacks/attacks/rmia.py @@ -31,19 +31,14 @@ def __init__(self:Self, attack_utils: AttackUtils, configs: dict) -> None: self.gamma = 2.0 # threshold for the attack self.temperature = 2.0 # temperature for the softmax - if "f_attack_data_size" in configs: - self.f_attack_data_size = configs["audit"]["f_attack_data_size"] - else: - self.f_attack_data_size = ( - 0.3 # pick 10% of data to create histograms by default - ) + self.f_attack_data_size = configs["audit"].get("f_attack_data_size", 0.3) self.signal = ModelLogits() self.epsilon = 1e-6 - def softmax(self:Self, all_logits:np.ndarray, - true_label_indices:np.ndarray, + def softmax(self:Self, all_logits:np.ndarray, + true_label_indices:np.ndarray, return_full_distribution:bool=False) -> np.ndarray: """Compute the softmax function. @@ -51,7 +46,7 @@ def softmax(self:Self, all_logits:np.ndarray, ---- all_logits (np.ndarray): Logits for each class. true_label_indices (np.ndarray): Indices of the true labels. - return_full_distribution (bool, optional): Whether to return the full distribution or just the true class probabilities. + return_full_distribution (bool, optional): return the full distribution or just the true class probabilities. Returns: ------- @@ -76,8 +71,8 @@ def prepare_attack(self:Self) -> None: Signals are computed on the auxiliary model(s) and dataset. """ - # compute the ratio of p(z|theta) (target model) to p(z)=sum_{theta'} p(z|theta') (shadow models) for all points in the attack dataset - # output from signal: # models x # data points x # classes + # compute the ratio of p(z|theta) (target model) to p(z)=sum_{theta'} p(z|theta') (shadow models) + # for all points in the attack dataset output from signal: # models x # data points x # classes # get the true label indices z_label_indices = np.array(self.attack_data.y) @@ -128,7 +123,8 @@ def run_attack(self:Self) -> CombinedMetricResult: logits_shadow_models = self.signal(self.shadow_models, audit_data) # collect the softmax output of the correct class for each shadow model p_x_given_shadow_models = [self.softmax(np.array(x).reshape(1,*x.shape), x_label_indices) for x in logits_shadow_models] - # stack the softmax output of the correct class for each shadow model to dimension # models x # data points + # stack the softmax output of the correct class for each shadow model + # to dimension # models x # data points p_x_given_shadow_models = np.array(p_x_given_shadow_models).squeeze() # evaluate the marginal p_out(x) by averaging the output of the shadow models p_x_out = np.mean(p_x_given_shadow_models, axis=0) if len(self.shadow_models) > 1 else p_x_given_shadow_models.squeeze() diff --git a/leakpro/model.py b/leakpro/model.py index c021085e..190cc70a 100644 --- a/leakpro/model.py +++ b/leakpro/model.py @@ -15,7 +15,7 @@ class Model(ABC): """Interface to query a model without any assumption on how it is implemented.""" - def __init__(self:Self, model_obj: torch.Module, loss_fn: torch.nn.modules.loss._Loss) -> None: + def __init__(self:Self, model_obj: torch.nn.Module, loss_fn: torch.nn.modules.loss._Loss) -> None: """Initialize the Model. Args: @@ -102,7 +102,7 @@ class PytorchModel(Model): This particular class is to be used with pytorch models. """ - def __init__(self:Self, model_obj:torch.Module, loss_fn:torch.nn.modules.loss._Loss)->None: + def __init__(self:Self, model_obj:torch.nn.Module, loss_fn:torch.nn.modules.loss._Loss)->None: """Initialize the PytorchModel. Args: diff --git a/leakpro/train.py b/leakpro/train.py index a9deaa58..720c585b 100644 --- a/leakpro/train.py +++ b/leakpro/train.py @@ -100,9 +100,6 @@ def inference( # Calculating accuracy acc = float(acc) / len(loader.dataset) - # Move model back to CPU - model.to("cpu") - # Return loss and accuracy return loss, acc @@ -171,15 +168,14 @@ def train( # noqa: PLR0913 # Add the loss to the total loss train_loss += loss.item() - logger.info(f"Epoch: {epoch_idx+1}/{epochs} |") - logger.info(f"Train Loss: {train_loss/len(train_loader):.8f} ") - logger.info(f"Train Acc: {float(train_acc)/len(train_loader.dataset):.8f}") + # Log the training loss and accuracy + log_train_str = f"Epoch: {epoch_idx+1}/{epochs} | Train Loss: {train_loss/len(train_loader):.8f} | Train Acc: {float(train_acc)/len(train_loader.dataset):.8f} | One step uses {time.time() - start_time:.2f} seconds" # noqa: E501 + logger.info(log_train_str) test_loss, test_acc = inference(model, test_loader, device) - logger.info(f"Test Loss: {float(test_loss):.8f} ") - logger.info(f"Test Acc: {float(test_acc):.8f} ") - logger.info(f"One step uses {time.time() - start_time:.2f} seconds") + log_test_str = f"Epoch: {epoch_idx+1}/{epochs} | Test Loss: {test_loss:.8f} | Test Acc: {test_acc:.8f}" + logger.info(log_test_str) # Move the model back to the CPU model.to("cpu") diff --git a/test_adult/models_metadata.pkl b/test_adult/models_metadata.pkl index f399aabaf067dab5d4e179817d750422ed917fcc..994484e0979f5aa80e04371c4253919a5bfdd7ff 100644 GIT binary patch delta 43 mcmca%f5U#mCplhYOLH?rV@m@AV-s`Z$^Ye)(M0s+R{{VvwhXiY delta 43 mcmca%f5U#mCplgd17kBo3qw<53riEz$^Ye)(M0s+R{{VvOboaH