From 81df82ae4f0c14b3b1a5aafbdb6533b6db6e4b76 Mon Sep 17 00:00:00 2001 From: johanos1 Date: Tue, 26 Mar 2024 11:53:33 +0000 Subject: [PATCH] rough errors process 50% --- leakpro.py | 21 +-- leakpro/dataset.py | 222 ------------------------ leakpro/mia_attacks/attack_objects.py | 182 ++++++++++++++----- leakpro/mia_attacks/attack_scheduler.py | 2 +- leakpro/mia_attacks/attacks/rmia.py | 79 +++++---- leakpro/model.py | 77 ++++---- leakpro/models.py | 43 +++-- leakpro/reporting/audit_report.py | 108 ++++++------ leakpro/reporting/utils.py | 12 +- leakpro/signals/signal.py | 125 ++++--------- leakpro/train.py | 2 +- 11 files changed, 353 insertions(+), 520 deletions(-) diff --git a/leakpro.py b/leakpro.py index fff3f97..a870a53 100644 --- a/leakpro.py +++ b/leakpro.py @@ -1,17 +1,18 @@ +"""Main script to run LEAKPRO on a target model.""" + +import joblib import logging -import pickle +import numpy as np +from pathlib import Path import random import time -from pathlib import Path - -import numpy as np import torch import yaml -import leakpro.train as util from leakpro import dataset, models from leakpro.mia_attacks.attack_scheduler import AttackScheduler from leakpro.reporting.utils import prepare_priavcy_risk_report +import leakpro.train as util def setup_log(name: str, save_file: bool) -> logging.Logger: @@ -50,10 +51,10 @@ def setup_log(name: str, save_file: bool) -> logging.Logger: if __name__ == "__main__": RETRAIN = True - #args = "./config/adult.yaml" + #args = "./config/adult.yaml" # noqa: ERA001 args = "./config/cifar10.yaml" with open(args, "rb") as f: - configs = yaml.load(f, Loader=yaml.Loader) + configs = yaml.safe_load(f) # Set the random seed, log_dir and inference_game torch.manual_seed(configs["run"]["random_seed"]) @@ -105,7 +106,7 @@ def setup_log(name: str, save_file: bool) -> logging.Logger: data_file = configs["data"]["dataset"] dataset_path = f"{data_dir}/{data_file}.pkl" with open(dataset_path, "rb") as file: - population = pickle.load(file) + population = joblib.load(file) # Get the training and test data train_test_data = train_test_dataset @@ -113,11 +114,11 @@ def setup_log(name: str, save_file: bool) -> logging.Logger: # Get the target model + metadata target_model_metadata_path = f"{log_dir}/models_metadata.pkl" with open(target_model_metadata_path, "rb") as f: - target_model_metadata = pickle.load(f) + target_model_metadata = joblib.load(f) target_model_path = f"{log_dir}/model_0.pkl" with open(target_model_path, "rb") as f: if "adult" in configs["data"]["dataset"]: - target_model = models.NN( + target_model = models.NN(s configs["train"]["inputs"], configs["train"]["outputs"] ) # TODO: read metadata to get the model elif "cifar10" in configs["data"]["dataset"]: diff --git a/leakpro/dataset.py b/leakpro/dataset.py index a9b9530..7e3ab1a 100644 --- a/leakpro/dataset.py +++ b/leakpro/dataset.py @@ -38,228 +38,6 @@ def __getitem__(self, idx): y = torch.tensor(self.y[idx], dtype=torch.long) return X, y -######################################################################################################################## -# DATASET CLASS -######################################################################################################################## - - -# class Dataset: -# """ -# Wrapper around a dictionary-like formatted dataset, with functions to run preprocessing, to define default -# input/output features, and to split a dataset easily. -# """ - -# def __init__( -# self, -# data_dict: dict, -# default_input: str, -# default_output: str, -# default_group: str = None, -# preproc_fn_dict: dict = None, -# preprocessed: bool = False, -# ): -# """Constructor - -# Args: -# data_dict: Contains the dataset as a dict. -# default_input: The key of the data_dict that should be used by default to get the input of a model. -# default_output: The key of the data_dict that should be used by default to get the expected output -# of a model. -# default_group: The key of the data_dict that shouuld be used by default to get the group of the data points. -# This is to contruct class dependent threshold. -# preproc_fn_dict: Contains optional preprocessing functions for each feature. -# preprocessed: Indicates if the preprocessing of preproc_fn_dict has already been applied. -# """ - -# # Store parameters -# self.data_dict = data_dict -# self.default_input = default_input -# self.default_output = default_output -# self.default_group = default_group -# self.preproc_fn_dict = preproc_fn_dict - -# # Store splits names and features names -# self.splits = list(self.data_dict) -# self.features = list(self.data_dict[self.splits[0]]) - -# # If preprocessing functions were passed as parameters, execute them -# if not preprocessed and preproc_fn_dict is not None: -# self.preprocess() - -# def __len__(self): -# return len(self.data_dict[self.default_output]) - -# def preprocess(self): -# """ -# Preprocessing function, executed by the constructor, based on the preproc_fn_dict attribute. -# """ -# for split, feature in product(self.splits, self.features): -# if feature in list(self.preproc_fn_dict): -# fn = self.preproc_fn_dict[feature] -# self.data_dict[split][feature] = fn(self.data_dict[split][feature]) - -# def get_feature(self, split_name: str, feature_name: str, indices: list = None): -# """Returns a specific feature from samples of a specific split. - -# Args: -# split_name: Name of the split. -# feature_name: Name of the feature. -# indices: Optional list of indices. If not specified, the entire subset is returned. - -# Returns: -# The requested feature, from samples of the requested split. -# """ - -# # Two placeholders can be used to trigger either the default input or the default output, as specified during -# # object creation -# if feature_name == "": -# feature_name = self.default_input -# elif feature_name == "": -# feature_name = self.default_output -# elif feature_name == "": -# feature_name = self.default_group - -# # If 'indices' is not specified, returns the entire array. Else just return those indices -# if indices is None: -# return self.data_dict[split_name][feature_name] -# else: -# return self.data_dict[split_name][feature_name][indices] - -# def subdivide( -# self, -# num_splits: int, -# split_names: list = None, -# method: str = "independent", -# split_size: Union[int, Dict[str, int]] = None, -# delete_original: bool = False, -# in_place: bool = True, -# return_results: bool = False, -# ): -# """Subdivides the splits contained in split_names into sub-splits, e.g. for shadow model training. - -# Args: -# num_splits: Number of sub-splits per original split. -# split_names: The splits to subdivide (e.g. train and test). By default, includes all splits. -# method: Either independent or random. If method is independent, then the sub-splits are a partition of the -# original split (i.e. they contain the entire split without repetition). If method is random, then each -# sub-split is a random subset of the original split (i.e. some samples might be missing or repeated). If -# method is hybrid, then each sub-split is a random subset of the original split, with the guarantee that -# the 1st one is not overlapping with the others. -# split_size: If method is random, this is the size of one split (ignored if method is independent). Can -# either be an integer, or a dictionary of integer (one per split). -# delete_original: Indicates if the original split should be deleted. -# in_place: Indicates if the new splits should be included in the parent object or not -# return_results: Indicates if the new splits should be returned or not - -# Returns: -# If in_place, a list of new Dataset objects, with the sub-splits. Otherwise, nothing, as the results are -# stored in self.data_dict. -# """ - -# # By default, includes all splits. -# if split_names is None: -# split_names = self.splits - -# # List of results if in_place is False -# new_datasets_dict = [{} for _ in range(num_splits)] - -# for split in split_names: - -# if split_size is not None: -# parsed_split_size = ( -# split_size if isinstance(split_size, int) else split_size[split] -# ) - -# # If method is random, then each sub-split is a random subset of the original split. -# if method == "random": -# assert ( -# split_size is not None -# ), 'Argument split_size is required when method is "random" or "hybrid"' -# indices = np.random.randint( -# self.data_dict[split][self.features[0]].shape[0], -# size=(num_splits, parsed_split_size), -# ) - -# # If method is independent, then the sub-splits are a partition of the original split. -# elif method == "independent": -# indices = np.arange(self.data_dict[split][self.features[0]].shape[0]) -# np.random.shuffle(indices) -# indices = np.array_split(indices, num_splits) - -# # If method is hybrid, then each sub-split is a random subset of the original split, with the guarantee that -# # the 1st one is not overlapping with the others -# elif method == "hybrid": -# assert ( -# split_size is not None -# ), 'Argument split_size is required when method is "random" or "hybrid"' -# available_indices = np.arange( -# self.data_dict[split][self.features[0]].shape[0] -# ) -# indices_a = np.random.choice( -# available_indices, size=(1, parsed_split_size), replace=False -# ) -# available_indices = np.setdiff1d(available_indices, indices_a.flatten()) -# indices_b = np.random.choice( -# available_indices, -# size=(num_splits - 1, parsed_split_size), -# replace=True, -# ) -# indices = np.concatenate((indices_a, indices_b)) - -# else: -# raise ValueError(f'Split method "{method}" does not exist.') - -# for split_n in range(num_splits): -# # Fill the dictionary if in_place is True -# if in_place: -# self.data_dict[f"{split}{split_n:03d}"] = {} -# for feature in self.features: -# self.data_dict[f"{split}{split_n:03d}"][feature] = ( -# self.data_dict[split][feature][indices[split_n]] -# ) -# # Create new dictionaries if return_results is True -# if return_results: -# new_datasets_dict[split_n][f"{split}"] = {} -# for feature in self.features: -# new_datasets_dict[split_n][f"{split}"][feature] = ( -# self.data_dict[split][feature][indices[split_n]] -# ) - -# # delete_original indicates if the original split should be deleted. -# if delete_original: -# del self.data_dict[split] - -# # Update the list of splits -# self.splits = list(self.data_dict) - -# # Return new datasets if return_results is True -# if return_results: -# return [ -# Dataset( -# data_dict=new_datasets_dict[i], -# default_input=self.default_input, -# default_output=self.default_output, -# default_group=self.default_group, -# preproc_fn_dict=self.preproc_fn_dict, -# preprocessed=True, -# ) -# for i in range(num_splits) -# ] - -# def __str__(self): -# """ -# Returns a string describing the dataset. -# """ -# txt = [ -# f'{" DATASET OBJECT ":=^48}', -# f"Splits = {self.splits}", -# f"Features = {self.features}", -# f"Default features = {self.default_input} --> {self.default_output}", -# "=" * 48, -# ] -# return "\n".join(txt) - - class TabularDataset(Dataset): """Tabular dataset.""" diff --git a/leakpro/mia_attacks/attack_objects.py b/leakpro/mia_attacks/attack_objects.py index 906345c..a97278f 100644 --- a/leakpro/mia_attacks/attack_objects.py +++ b/leakpro/mia_attacks/attack_objects.py @@ -1,4 +1,8 @@ +"""Module for the AttackObjects class.""" + +import logging import time +from typing import List, Self import numpy as np import torch @@ -6,23 +10,43 @@ from torch.optim import SGD, Adam, AdamW from torch.utils.data import DataLoader, Subset -from ..dataset import Dataset -from ..model import Model, PytorchModel +from leakpro.dataset import Dataset +from leakpro.model import Model, PytorchModel class AttackObjects: - def __init__( - self, + """Class representing the attack objects for the MIA attacks.""" + + def __init__( # noqa: PLR0913 + self:Self, population: Dataset, - train_test_dataset, + train_test_dataset: dict, target_model: Model, configs: dict, - ): + logger: logging.Logger = None + ) -> None: + """Initialize the AttackObjects class. + + Parameters + ---------- + population : Dataset + The population. + train_test_dataset : dict + The train test dataset. + target_model : Model + The target model. + configs : dict + The configurations. + logger : logging.Logger, optional + The logger, by default None. + + """ self._population = population self._population_size = len(population) self._target_model = PytorchModel(target_model, CrossEntropyLoss()) self._train_test_dataset = train_test_dataset self._num_shadow_models = configs["audit"]["num_shadow_models"] + self.logger = logger self._audit_dataset = { # Assuming train_indices and test_indices are arrays of indices, not the actual data @@ -49,10 +73,12 @@ def __init__( f_shadow_data = configs["audit"]["f_attack_data_size"] - for k in range(self._num_shadow_models): + for _ in range(self._num_shadow_models): # Create shadow datasets by sampling from the population shadow_data_indices = self.create_shadow_dataset(f_shadow_data) - shadow_train_loader = DataLoader(Subset(population, shadow_data_indices), batch_size=configs["train"]["batch_size"], shuffle=True,) + shadow_train_loader = DataLoader(Subset(population, shadow_data_indices), + batch_size=configs["train"]["batch_size"], + shuffle=True,) self._shadow_train_indices.append(shadow_data_indices) # Initialize a shadow model @@ -70,43 +96,100 @@ def __init__( @property - def shadow_models(self): + def shadow_models(self: Self) -> List[Model]: + """Return the shadow models. + + Returns + ------- + List[Model]: The shadow models. + + """ return self._shadow_models @property - def shadow_train_indices(self): + def shadow_train_indices(self:Self) -> List[int]: + """Return the indices of the shadow training data. + + Returns + ------- + List[int]: The indices of the shadow training data. + + """ return self._shadow_train_indices @property - def population(self): + def population(self:Self) -> Dataset: + """Return the population. + + Returns + ------- + Dataset: The population. + + """ return self._population @property - def population_size(self): + def population_size(self:Self) -> int: + """Return the size of the population. + + Returns + ------- + int: The size of the population. + + """ return self._population_size @property - def target_model(self): + def target_model(self:Self) -> Model: + """Return the target model. + + Returns + ------- + Model: The target model. + + """ return self._target_model @property - def train_test_dataset(self): + def train_test_dataset(self:Self) -> dict: + """Return the train test dataset. + + Returns + ------- + dict: The train test dataset. + + """ return self._train_test_dataset @property - def audit_dataset(self): + def audit_dataset(self:Self) -> dict: + """Return the audit dataset. + + Returns + ------- + dict: The audit dataset. + + """ return self._audit_dataset - def create_shadow_dataset(self, f_shadow_data: float, include_in_members:bool=False): + def create_shadow_dataset(self:Self, f_shadow_data: float, include_in_members:bool=False) -> np.ndarray: + """Create a shadow dataset by sampling from the population. + + Args: + ---- + f_shadow_data (float): Fraction of shadow data to be sampled. + include_in_members (bool, optional): Include in-members in the shadow dataset. Defaults to False. + + Returns: + ------- + np.ndarray: Array of indices representing the shadow dataset. + """ shadow_data_size = int(f_shadow_data * self.population_size) all_index = np.arange(self.population_size) # Remove indices corresponding to training data - if include_in_members is False: - used_index = self.train_test_dataset["train_indices"] - else: - used_index = [] + used_index = self.train_test_dataset["train_indices"] if include_in_members is False else [] # pick allowed indices selected_index = np.setdiff1d(all_index, used_index, assume_unique=True) @@ -116,14 +199,26 @@ def create_shadow_dataset(self, f_shadow_data: float, include_in_members:bool=Fa raise ValueError("Not enough remaining data points.") return selected_index - def get_optimizer(self, model: Module, configs: dict): + def get_optimizer(self:Self, model: Module, configs: dict) -> torch.optim.Optimizer: + """Get the optimizer for training the model. + + Args: + ---- + model (nn.Module): Model for training. + configs (dict): Configurations for training. + + Returns: + ------- + Optimizer: The optimizer for training the model. + + """ optimizer = configs.get("optimizer", "SGD") learning_rate = configs.get("learning_rate", 0.01) weight_decay = configs.get("weight_decay", 0) momentum = configs.get("momentum", 0) - print(f"Load the optimizer {optimizer}: ", end=" ") - print(f"Learning rate {learning_rate}", end=" ") - print(f"Weight decay {weight_decay} ") + self.logger(f"Load the optimizer {optimizer}: ", end=" ") + self.logger(f"Learning rate {learning_rate}", end=" ") + self.logger(f"Weight decay {weight_decay} ") if optimizer == "SGD": return SGD(model.parameters(), @@ -131,22 +226,23 @@ def get_optimizer(self, model: Module, configs: dict): weight_decay=weight_decay, momentum=momentum, ) - elif optimizer == "Adam": + if optimizer == "Adam": return Adam(model.parameters(), lr=learning_rate, weight_decay=weight_decay) - elif optimizer == "AdamW": + if optimizer == "AdamW": return AdamW(model.parameters(), lr=learning_rate, weight_decay=weight_decay) - else: - raise NotImplementedError( - f"Optimizer {optimizer} has not been implemented. Please choose from SGD or Adam" - ) + raise NotImplementedError( + f"Optimizer {optimizer} has not been implemented. Please choose from SGD or Adam" + ) + + def train_shadow_model(self:Self, shadow_model: Module, shadow_train_loader: DataLoader, configs: dict = None) -> Module: + """Train the model based on on the train loader. - def train_shadow_model(self, shadow_model: Module, shadow_train_loader: DataLoader, shadow_test_loader: DataLoader = None, configs: dict = None): - """Train the model based on on the train loader Args: - model(nn.Module): Model for evaluation. - train_loader(torch.utils.data.DataLoader): Data loader for training. + ---- + shadow_model(nn.Module): Model for evaluation. + shadow_train_loader(torch.utils.data.DataLoader): Data loader for training. configs (dict): Configurations for training. Return: @@ -174,11 +270,11 @@ def train_shadow_model(self, shadow_model: Module, shadow_train_loader: DataLoad shadow_model.train() for data, target in shadow_train_loader: # Move data to the device - data, target = data.to(device, non_blocking=True), target.to( + data, target = data.to(device, non_blocking=True), target.to( # noqa: PLW2901 device, non_blocking=True ) # Cast target to long tensor - target = target.long() + target = target.long() # noqa: PLW2901 # Set the gradients to zero optimizer.zero_grad(set_to_none=True) @@ -196,20 +292,14 @@ def train_shadow_model(self, shadow_model: Module, shadow_train_loader: DataLoad # Add the loss to the total loss train_loss += loss.item() - print(f"Epoch: {epoch_idx+1}/{epochs} |", end=" ") - print(f"Train Loss: {train_loss/len(shadow_train_loader):.8f} ", end=" ") - print(f"Train Acc: {float(train_acc)/len(shadow_train_loader.dataset):.8f} ", end=" ") + self.logger.info(f"Epoch: {epoch_idx+1}/{epochs} |", end=" ") + self.logger.info(f"Train Loss: {train_loss/len(shadow_train_loader):.8f} ", end=" ") + self.logger.info(f"Train Acc: {float(train_acc)/len(shadow_train_loader.dataset):.8f} ", end=" ") - #test_loss, test_acc = inference(shadow_model, shadow_test_loader, device) - - # print(f"Test Loss: {float(test_loss):.8f} ", end=" ") - # print(f"Test Acc: {float(test_acc):.8f} ", end=" ") - print(f"One step uses {time.time() - start_time:.2f} seconds") + self.logger.info(f"One step uses {time.time() - start_time:.2f} seconds") # Move the model back to the CPU shadow_model.to("cpu") - # save_model_and_metadata(shadow_model, configs, train_acc, test_acc, train_loss, test_loss) - # Return the model return shadow_model diff --git a/leakpro/mia_attacks/attack_scheduler.py b/leakpro/mia_attacks/attack_scheduler.py index 4887c15..c871e26 100644 --- a/leakpro/mia_attacks/attack_scheduler.py +++ b/leakpro/mia_attacks/attack_scheduler.py @@ -19,7 +19,7 @@ def __init__( self.attacks = [] attack_objects = AttackObjects( - population, train_test_dataset, target_model, configs + population, train_test_dataset, target_model, configs, logger ) attack_utils = AttackUtils(attack_objects) diff --git a/leakpro/mia_attacks/attacks/rmia.py b/leakpro/mia_attacks/attacks/rmia.py index 6b860fa..92d50df 100644 --- a/leakpro/mia_attacks/attacks/rmia.py +++ b/leakpro/mia_attacks/attacks/rmia.py @@ -1,3 +1,6 @@ +"""Implementation of the RMIA attack.""" +from typing import Self + import numpy as np from leakpro.dataset import get_dataset_subset @@ -8,7 +11,17 @@ class AttackRMIA(AttackAbstract): - def __init__(self, attack_utils: AttackUtils, configs: dict): + """Implementation of the RMIA attack.""" + + def __init__(self:Self, attack_utils: AttackUtils, configs: dict) -> None: + """Initialize the RMIA attack. + + Args: + ---- + attack_utils (AttackUtils): Utility class for the attack. + configs (dict): Configuration parameters for the attack. + + """ # Initializes the parent metric super().__init__(attack_utils) @@ -29,8 +42,22 @@ def __init__(self, attack_utils: AttackUtils, configs: dict): self.epsilon = 1e-6 - def softmax(self, all_logits, true_label_indices, return_full_distribution=False): + def softmax(self:Self, all_logits:np.ndarray, + true_label_indices:np.ndarray, + return_full_distribution:bool=False) -> np.ndarray: + """Compute the softmax function. + + Args: + ---- + all_logits (np.ndarray): Logits for each class. + true_label_indices (np.ndarray): Indices of the true labels. + return_full_distribution (bool, optional): Whether to return the full distribution or just the true class probabilities. + + Returns: + ------- + np.ndarray: Softmax output. + """ logit_signals = all_logits / self.temperature max_logit_signals = np.max(logit_signals,axis=2) logit_signals = logit_signals - max_logit_signals.reshape(1,-1,1) @@ -44,52 +71,39 @@ def softmax(self, all_logits, true_label_indices, return_full_distribution=False output_signal = exp_logit_signals / exp_logit_sum[:,:,np.newaxis] return output_signal - def prepare_attack(self): - """Function to prepare data needed for running the metric on the target model and dataset, using signals computed - on the auxiliary model(s) and dataset. - """ - # sample dataset to compute histogram - all_index = np.arange(self.population_size) - attack_data_size = np.round( - self.f_attack_data_size * self.population_size - ).astype(int) - - self.attack_data_index = np.random.choice( - all_index, attack_data_size, replace=False - ) - attack_data = get_dataset_subset(self.population, self.attack_data_index) + def prepare_attack(self:Self) -> None: + """Prepare data needed for running the attack on the target model and dataset. + Signals are computed on the auxiliary model(s) and dataset. + """ # compute the ratio of p(z|theta) (target model) to p(z)=sum_{theta'} p(z|theta') (shadow models) for all points in the attack dataset # output from signal: # models x # data points x # classes # get the true label indices - z_label_indices = np.array(attack_data.y) + z_label_indices = np.array(self.attack_data.y) # run points through real model to collect the logits - logits_theta = np.array(self.signal([self.target_model], attack_data)) + logits_theta = np.array(self.signal([self.target_model], self.attack_data)) # collect the softmax output of the correct class p_z_given_theta = self.softmax(logits_theta, z_label_indices) # run points through shadow models and collect the logits - logits_shadow_models = self.signal(self.shadow_models, attack_data) + logits_shadow_models = self.signal(self.shadow_models, self.attack_data) # collect the softmax output of the correct class for each shadow model p_z_given_shadow_models = [self.softmax(np.array(x).reshape(1,*x.shape), z_label_indices) for x in logits_shadow_models] # stack the softmax output of the correct class for each shadow model to dimension # models x # data points p_z_given_shadow_models = np.array(p_z_given_shadow_models).squeeze() # evaluate the marginal p(z) - if len(self.shadow_models) > 1: - p_z = np.mean(p_z_given_shadow_models, axis=0) - else: - p_z = p_z_given_shadow_models.squeeze() + p_z = np.mean(p_z_given_shadow_models, axis=0) if len(self.shadow_models) > 1 else p_z_given_shadow_models.squeeze() p_z = 0.5*((self.offline_a + 1) * p_z + (1-self.offline_a)) #TODO: pick the maximum value of the softmax output in p(z) self.ratio_z = p_z_given_theta / (p_z + self.epsilon) - def run_attack(self, fpr_tolerance_rate_list=None): - """Function to run the attack on the target model and dataset. + def run_attack(self:Self) -> CombinedMetricResult: + """Run the attack on the target model and dataset. Args: ---- @@ -117,10 +131,8 @@ def run_attack(self, fpr_tolerance_rate_list=None): # stack the softmax output of the correct class for each shadow model to dimension # models x # data points p_x_given_shadow_models = np.array(p_x_given_shadow_models).squeeze() # evaluate the marginal p_out(x) by averaging the output of the shadow models - if len(self.shadow_models) > 1: - p_x_out = np.mean(p_x_given_shadow_models, axis=0) - else: - p_x_out = p_x_given_shadow_models.squeeze() + p_x_out = np.mean(p_x_given_shadow_models, axis=0) if len(self.shadow_models) > 1 else p_x_given_shadow_models.squeeze() + # compute the marginal p(x) from P_out and p_in where p_in = a*p_out+b p_x = 0.5*((self.offline_a + 1) * p_x_out + (1-self.offline_a)) @@ -128,14 +140,14 @@ def run_attack(self, fpr_tolerance_rate_list=None): ratio_x = p_x_given_theta / (p_x + self.epsilon) # for each x, compare it with the ratio of all z points - LRs = ratio_x.T / self.ratio_z - score = np.mean(LRs > self.gamma, axis=1) + likelihoods = ratio_x.T / self.ratio_z + score = np.mean(likelihoods > self.gamma, axis=1) # pick out the in-members and out-members signals self.in_member_signals = score[self.audit_dataset["in_members"]].reshape(-1,1) self.out_member_signals = score[self.audit_dataset["out_members"]].reshape(-1,1) - thresholds = np.linspace(1/LRs.shape[1], 1, 1000) + thresholds = np.linspace(1/likelihoods.shape[1], 1, 1000) member_preds = np.greater(self.in_member_signals, thresholds).T @@ -155,12 +167,11 @@ def run_attack(self, fpr_tolerance_rate_list=None): ) # compute ROC, TP, TN etc - metric_result = CombinedMetricResult( + return CombinedMetricResult( predicted_labels=predictions, true_labels=true_labels, predictions_proba=None, signal_values=signal_values, ) - return metric_result diff --git a/leakpro/model.py b/leakpro/model.py index 03edb8e..c021085 100644 --- a/leakpro/model.py +++ b/leakpro/model.py @@ -1,5 +1,8 @@ +"""Module containing the Model class, an interface to query a model without any assumption on how it is implemented.""" + from abc import ABC, abstractmethod from copy import deepcopy +from typing import Callable, List, Self import numpy as np import torch @@ -10,11 +13,10 @@ class Model(ABC): - """Interface to query a model without any assumption on how it is implemented. - """ + """Interface to query a model without any assumption on how it is implemented.""" - def __init__(self, model_obj, loss_fn): - """Constructor + def __init__(self:Self, model_obj: torch.Module, loss_fn: torch.nn.modules.loss._Loss) -> None: + """Initialize the Model. Args: ---- @@ -26,8 +28,8 @@ def __init__(self, model_obj, loss_fn): self.loss_fn = loss_fn @abstractmethod - def get_logits(self, batch_samples): - """Function to get the model output from a given input. + def get_logits(self:Self, batch_samples:np.ndarray) -> np.ndarray: + """Get the model output from a given input. Args: ---- @@ -41,8 +43,8 @@ def get_logits(self, batch_samples): pass @abstractmethod - def get_loss(self, batch_samples, batch_labels, per_point=True): - """Function to get the model loss on a given input and an expected output. + def get_loss(self:Self, batch_samples:np.ndarray, batch_labels: np.ndarray, per_point:bool=True) -> np.ndarray: + """Get the model loss on a given input and an expected output. Args: ---- @@ -58,9 +60,8 @@ def get_loss(self, batch_samples, batch_labels, per_point=True): pass @abstractmethod - def get_grad(self, batch_samples, batch_labels): - """Function to get the gradient of the model loss with respect to the model parameters, on a given input and an - expected output. + def get_grad(self:Self, batch_samples:np.ndarray, batch_labels:np.ndarray) -> np.ndarray: + """Get the gradient of the model loss with respect to the model parameters, given an input and an expected output. Args: ---- @@ -75,8 +76,11 @@ def get_grad(self, batch_samples, batch_labels): pass @abstractmethod - def get_intermediate_outputs(self, layers, batch_samples, forward_pass=True): - """Function to get the intermediate output of layers (a.k.a. features), on a given input. + def get_intermediate_outputs(self:Self, + layers:List[int], + batch_samples:np.ndarray, + forward_pass: bool=True) -> List[np.ndarray]: + """Get the intermediate output of layers (a.k.a. features), on a given input. Args: ---- @@ -92,19 +96,14 @@ def get_intermediate_outputs(self, layers, batch_samples, forward_pass=True): """ pass - -######################################################################################################################## -# PYTORCH_MODEL CLASS -######################################################################################################################## - - class PytorchModel(Model): """Inherits from the Model class, an interface to query a model without any assumption on how it is implemented. + This particular class is to be used with pytorch models. """ - def __init__(self, model_obj, loss_fn): - """Constructor + def __init__(self:Self, model_obj:torch.Module, loss_fn:torch.nn.modules.loss._Loss)->None: + """Initialize the PytorchModel. Args: ---- @@ -120,15 +119,15 @@ def __init__(self, model_obj, loss_fn): # Add hooks to the layers (to access their value during a forward pass) self.intermediate_outputs = {} - for i, layer in enumerate(list(self.model_obj._modules.keys())): + for _, layer in enumerate(list(self.model_obj._modules.keys())): getattr(self.model_obj, layer).register_forward_hook(self.__forward_hook(layer)) # Create a second loss function, per point self.loss_fn_no_reduction = deepcopy(loss_fn) self.loss_fn_no_reduction.reduction = "none" - def get_logits(self, batch_samples): - """Function to get the model output from a given input. + def get_logits(self:Self, batch_samples:np.ndarray)->np.ndarray: + """Get the model output from a given input. Args: ---- @@ -144,8 +143,8 @@ def get_logits(self, batch_samples): ) return self.model_obj(batch_samples_tensor).detach().numpy() - def get_loss(self, batch_samples, batch_labels, per_point=True): - """Function to get the model loss on a given input and an expected output. + def get_loss(self:Self, batch_samples:np.ndarray, batch_labels:np.ndarray, per_point:bool=True)->np.ndarray: + """Get the model loss on a given input and an expected output. Args: ---- @@ -172,15 +171,14 @@ def get_loss(self, batch_samples, batch_labels, per_point=True): .detach() .numpy() ) - else: - return self.loss_fn( - self.model_obj(torch.tensor(batch_samples_tensor)), - torch.tensor(batch_labels_tensor), - ).item() + return self.loss_fn( + self.model_obj(torch.tensor(batch_samples_tensor)), + torch.tensor(batch_labels_tensor), + ).item() + - def get_grad(self, batch_samples, batch_labels): - """Function to get the gradient of the model loss with respect to the model parameters, on a given input and an - expected output. + def get_grad(self:Self, batch_samples:np.ndarray, batch_labels:np.ndarray)->np.ndarray: + """Get the gradient of the model loss with respect to the model parameters, given an input and expected output. Args: ---- @@ -198,8 +196,11 @@ def get_grad(self, batch_samples, batch_labels): loss.backward() return [p.grad.numpy() for p in self.model_obj.parameters()] - def get_intermediate_outputs(self, layers, batch_samples, forward_pass=True): - """Function to get the intermediate output of layers (a.k.a. features), on a given input. + def get_intermediate_outputs(self:Self, + layers:List[int], + batch_samples:np.ndarray, + forward_pass:bool=True) -> List[np.ndarray]: + """Get the intermediate output of layers (a.k.a. features), on a given input. Args: ---- @@ -226,7 +227,7 @@ def get_intermediate_outputs(self, layers, batch_samples, forward_pass=True): for layer_name in layer_names ] - def __forward_hook(self, layer_name): + def __forward_hook(self:Self, layer_name: str) -> Callable: """Private helper function to access outputs of intermediate layers. Args: @@ -239,7 +240,7 @@ def __forward_hook(self, layer_name): """ - def hook(module, input, output): + def hook(_: torch.Tensor, __: torch.Tensor, output: torch.Tensor) -> None: self.intermediate_outputs[layer_name] = output return hook diff --git a/leakpro/models.py b/leakpro/models.py index 7ccfdad..a5511ab 100644 --- a/leakpro/models.py +++ b/leakpro/models.py @@ -1,27 +1,40 @@ +"""Models for the datasets.""" +from typing import Self + import torch -import torch.nn.functional as F +import torch.nn.functional as F # noqa: N812 from torch import nn class NN(nn.Module): - """NN for Purchase dataset.""" + """NN for Adult dataset.""" + + def __init__(self:Self, in_shape:int, num_classes:int=10) -> None: + """Initialize the model. - def __init__(self, in_shape, num_classes=10): + Args: + ---- + in_shape (int): The input shape. + num_classes (int, optional): The number of classes. Defaults to 10. + + """ super().__init__() self.fc1 = nn.Linear(in_shape, 100) self.fc2 = nn.Linear(100, 50) self.fc3 = nn.Linear(50, num_classes) - def forward(self, inputs): + def forward(self:Self, inputs:torch.Tensor) -> torch.Tensor: """Forward pass of the model.""" inputs = inputs.flatten(1) outputs = F.relu(self.fc1(inputs)) outputs = F.relu(self.fc2(outputs)) - outputs = F.relu(self.fc3(outputs)) - return outputs + return F.relu(self.fc3(outputs)) class ConvNet(nn.Module): - def __init__(self): + """Convolutional Neural Network model.""" + + def __init__(self:Self) -> None: + """Initialize the ConvNet model.""" super().__init__() self.conv1 = nn.Conv2d(3, 6, 5) self.pool = nn.MaxPool2d(2, 2) @@ -30,14 +43,24 @@ def __init__(self): self.fc2 = nn.Linear(120, 84) self.fc3 = nn.Linear(84, 10) - def forward(self, x): + def forward(self:Self, x:torch.Tensor) -> torch.Tensor: + """Forward pass of the model. + + Args: + ---- + x (torch.Tensor): The input tensor. + + Returns: + ------- + torch.Tensor: The output tensor. + + """ x = self.pool(F.relu(self.conv1(x))) x = self.pool(F.relu(self.conv2(x))) x = torch.flatten(x, 1) # flatten all dimensions except batch x = F.relu(self.fc1(x)) x = F.relu(self.fc2(x)) - x = self.fc3(x) - return x + return self.fc3(x) diff --git a/leakpro/reporting/audit_report.py b/leakpro/reporting/audit_report.py index 2d6c3a6..fd925a0 100644 --- a/leakpro/reporting/audit_report.py +++ b/leakpro/reporting/audit_report.py @@ -1,7 +1,8 @@ +"""Module containing classes to generate reports from metric results.""" +import datetime import os import subprocess from abc import ABC, abstractmethod -from datetime import date from typing import Dict, List, Tuple, Union import jinja2 @@ -11,7 +12,7 @@ import seaborn as sn from scipy import interpolate -from ..metrics.attack_result import AttackResult, CombinedMetricResult +from leakpro.metrics.attack_result import AttackResult, CombinedMetricResult ######################################################################################################################## # GLOBAL SETTINGS @@ -31,7 +32,7 @@ line_statement_prefix="%%", line_comment_prefix="%#", trim_blocks=True, - autoescape=False, + autoescape=True, loader=jinja2.FileSystemLoader(os.path.abspath(".")), ) @@ -54,19 +55,19 @@ "figure": { "roc_curve": { "name": "ROC curve", - "details": "shows the ROC (Receiver Operating Characteristic) curve, a graph illustrating the performance of a classification model at various decision thresholds. The AUC (Area Under the Curve), represented in blue, is a threshold-independant measure of the classifier performance.\n\nA higher AUC is an indicator of a system vulnerable to the chosen metric. For reference, a random classifier yields an AUC of 0.5, while a perfect classifier yields an AUC of 1.0", + "details": "shows the ROC (Receiver Operating Characteristic) curve, a graph illustrating the performance of a classification model at various decision thresholds. The AUC (Area Under the Curve), represented in blue, is a threshold-independant measure of the classifier performance.\n\nA higher AUC is an indicator of a system vulnerable to the chosen metric. For reference, a random classifier yields an AUC of 0.5, while a perfect classifier yields an AUC of 1.0", # noqa: E501 }, "confusion_matrix": { "name": "Confusion matrix", - "details": "shows the confusion matrix, a graph illustrating the performance of a classification model for a specific decision threshold.\n\nHigher values on the top-left to bottom-right diagonal is an indicator of a system vulnerable to the chosen metric, while higher values on the top-right to bottom-left diagonal is an indicator of a system less vulnerable to the chosen metric.", + "details": "shows the confusion matrix, a graph illustrating the performance of a classification model for a specific decision threshold.\n\nHigher values on the top-left to bottom-right diagonal is an indicator of a system vulnerable to the chosen metric, while higher values on the top-right to bottom-left diagonal is an indicator of a system less vulnerable to the chosen metric.", # noqa: E501 }, "signal_histogram": { "name": "Signal histogram", - "details": "shows the histogram of the signal used by the chosen metric, on both members and non-member samples.\n\nA clear separation between the two groups is an indicator of a system vulnerable to the chosen metric.", + "details": "shows the histogram of the signal used by the chosen metric, on both members and non-member samples.\n\nA clear separation between the two groups is an indicator of a system vulnerable to the chosen metric.", # noqa: E501 }, "vulnerable_points": { "name": "Vulnerable points", - "details": "shows points that are most vulnerable to the chosen metric.\n\nThe score depends on the chosen metric, but is always between 0 and 1, with 0 meaning low vulnerability and 1 high vulnerability.", + "details": "shows points that are most vulnerable to the chosen metric.\n\nThe score depends on the chosen metric, but is always between 0 and 1, with 0 meaning low vulnerability and 1 high vulnerability.", # noqa: E501 }, }, } @@ -77,8 +78,7 @@ class AuditReport(ABC): - """An abstract class to display and/or save some elements of a metric result object. - """ + """An abstract class to display and/or save some elements of a metric result object.""" @staticmethod @abstractmethod @@ -86,7 +86,7 @@ def generate_report( metric_result: Union[ AttackResult, List[AttackResult], dict, CombinedMetricResult ] - ): + ) -> None: """Core function of the AuditReport class that actually generates the report. Args: @@ -103,8 +103,9 @@ def generate_report( class ROCCurveReport(AuditReport): - """Inherits from the AuditReport class, an interface class to display and/or save some elements of a metric result - object. This particular class is used to generate a ROC (Receiver Operating Characteristic) curve. + """An interface class to display and/or save some elements of a metric result object. + + This particular class is used to generate a ROC (Receiver Operating Characteristic) curve. """ @staticmethod @@ -143,8 +144,8 @@ def generate_report( show: bool = False, save: bool = True, filename: str = "roc_curve.jpg", - configs: dict = {}, - ): + configs: dict = None, # noqa: ARG004 + ) -> None: """Core function of the AuditReport class that actually generates the report. Args: @@ -154,6 +155,7 @@ def generate_report( show: Boolean specifying if the plot should be displayed on screen. save: Boolean specifying if the plot should be saved as a file. filename: File name to be used if the plot is saved as a file. + configs: Dictionary containing the configuration of the audit. """ # Check if it is the combined report: @@ -195,13 +197,6 @@ def generate_report( # Gets metric ID # TODO: add metric ID to the CombinedMetricResult class metric_id = "population_metric" - # if isinstance(metric_result, list): - # if isinstance(metric_result[0], list): - # metric_id = metric_result[0][0].metric_id - # else: - # metric_id = metric_result[0].metric_id - # else: - # metric_id = metric_result.metric_id # Generate plot range01 = np.linspace(0, 1) @@ -222,13 +217,12 @@ def generate_report( f"AUC = {roc_auc:.03f}", horizontalalignment="center", verticalalignment="center", - bbox=dict(facecolor="white", alpha=0.5), + bbox={"facecolor": "white", "alpha": 0.5}, ) if save: plt.savefig(fname=filename, dpi=1000) if show: plt.show() - print(f"AUC = {roc_auc:.03f}") plt.clf() @@ -238,8 +232,9 @@ def generate_report( class ConfusionMatrixReport(AuditReport): - """Inherits from the AuditReport class, an interface class to display and/or save some elements of a metric result - object. This particular class is used to generate a confusion matrix. + """An interface class to display and/or save some elements of a metric result object. + + This particular class is used to generate a confusion matrix. """ @staticmethod @@ -248,7 +243,7 @@ def generate_report( show: bool = False, save: bool = True, filename: str = "confusion_matrix.jpg", - ): + ) -> None: """Core function of the AuditReport class that actually generates the report. Args: @@ -260,7 +255,9 @@ def generate_report( filename: File name to be used if the plot is saved as a file. """ - assert isinstance(metric_result, AttackResult) + if not isinstance(metric_result, AttackResult): + raise ValueError("metric_result must be an instance of AttackResult" + ) cm = np.array( [ [metric_result.tn, metric_result.fp], @@ -288,8 +285,9 @@ def generate_report( class SignalHistogramReport(AuditReport): - """Inherits from the AuditReport class, an interface class to display and/or save some elements of a metric result - object. This particular class is used to generate a histogram of the signal values. + """An interface class to display and/or save some elements of a metric result object. + + This particular class is used to generate a histogram of the signal values. """ @staticmethod @@ -298,7 +296,7 @@ def generate_report( show: bool = False, save: bool = True, filename: str = "signal_histogram.jpg", - ): + ) -> None: """Core function of the AuditReport class that actually generates the report. Args: @@ -357,19 +355,20 @@ def generate_report( class VulnerablePointsReport(AuditReport): - """Inherits from the AuditReport class, an interface class to display and/or save some elements of a metric result - object. This particular class is used to identify the most vulnerable points. + """An interface class to display and/or save some elements of a metric result object. + + This particular class is used to identify the most vulnerable points. """ @staticmethod - def generate_report( + def generate_report( # noqa: PLR0913 metric_results: List[AttackResult], number_of_points: int = 10, save_tex: bool = False, filename: str = "vulnerable_points.tex", return_raw_values: bool = True, point_type: str = "any", - ): + ) -> Tuple[np.ndarray, np.ndarray]: """Core function of the AuditReport class that actually generates the report. Args: @@ -426,22 +425,11 @@ def generate_report( # Map indices stored in the metric_result object to indices in the training set indices_to_train_indices = [] counter = 0 - for k, v in enumerate(metric_results[0].true_labels): + for _, v in enumerate(metric_results[0].true_labels): indices_to_train_indices.append(counter) counter += v indices = np.array(indices_to_train_indices)[np.array(indices)] - # If points are images and we are creating a LaTex file, then we read the information source to create image - # files from the vulnerable - # if save_tex and point_type == "image": - # for k, point in enumerate(indices): - # x = target_info_source.get_signal( - # signal=DatasetSample(), - # model_to_split_mapping=target_model_to_train_split_mapping, - # extra={"model_num": 0, "point_num": point}, - # ) - # Image.fromarray((x * 255).astype("uint8")).save(f"point{k:03d}.jpg") - # If we are creating a LaTex if save_tex: # Load template @@ -469,6 +457,7 @@ def generate_report( # If we required the values to be returned if return_raw_values: return indices, scores + return None ######################################################################################################################## @@ -477,23 +466,24 @@ def generate_report( class PDFReport(AuditReport): - """Inherits from the AuditReport class, an interface class to display and/or save some elements of a metric result - object. This particular class is used to generate a user-friendly report, with multiple plots and some explanations. + """An interface class to display and/or save some elements of a metric result object. + + This particular class is used to generate a user-friendly report, with multiple plots and some explanations. """ @staticmethod - def generate_report( + def generate_report( # noqa: PLR0913, D417 metric_results: Dict[ str, Union[AttackResult, List[AttackResult], List[List[AttackResult]]] ], figures_dict: dict, system_name: str, call_pdflatex: bool = True, - show: bool = False, - save: bool = True, + show: bool = False, # noqa: ARG004 + save: bool = True, # noqa: ARG004 filename_no_extension: str = "report", point_type: str = "any", - ): + ) -> None: """Core function of the AuditReport class that actually generates the report. Args: @@ -572,7 +562,7 @@ def generate_report( image_folder=os.path.abspath("."), name=system_name, tool_version="1.0", - report_date=date.today().strftime("%b-%d-%Y"), + report_date=datetime.datetime.now().date().strftime("%b-%d-%Y"), # noqa: DTZ005 explanations=EXPLANATIONS, figures_dict=figures_dict, files_dict=files_dict, @@ -582,40 +572,40 @@ def generate_report( with open(f"{filename_no_extension}.tex", "w") as f: f.write(latex_content) - print(f'LaTex file created:\t{os.path.abspath(f"{filename_no_extension}.tex")}') + print(f'LaTex file created:\t{os.path.abspath(f"{filename_no_extension}.tex")}') # noqa: T201 if call_pdflatex: # Compile the .tex file to a .pdf file. Several rounds are required to get the references (to papers, to # page numbers, and to figure numbers) process = subprocess.Popen( - ["pdflatex", os.path.abspath(f"{filename_no_extension}.tex")], + ["pdflatex", os.path.abspath(f"{filename_no_extension}.tex")], # noqa: S607, S603 stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) stdout, stderr = process.communicate() process = subprocess.Popen( - ["biber", os.path.abspath(f"{filename_no_extension}")], + ["biber", os.path.abspath(f"{filename_no_extension}")], # noqa: S607, S603 stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) stdout, stderr = process.communicate() process = subprocess.Popen( - ["pdflatex", os.path.abspath(f"{filename_no_extension}.tex")], + ["pdflatex", os.path.abspath(f"{filename_no_extension}.tex")], # noqa: S607, S603 stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) stdout, stderr = process.communicate() process = subprocess.Popen( - ["pdflatex", os.path.abspath(f"{filename_no_extension}.tex")], + ["pdflatex", os.path.abspath(f"{filename_no_extension}.tex")], # noqa: S607, S603 stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) stdout, stderr = process.communicate() - print( + print( # noqa: T201 f'PDF file created:\t{os.path.abspath(f"{filename_no_extension}.pdf")}' ) diff --git a/leakpro/reporting/utils.py b/leakpro/reporting/utils.py index 3a51e88..22a5c58 100644 --- a/leakpro/reporting/utils.py +++ b/leakpro/reporting/utils.py @@ -1,3 +1,4 @@ +"""Utility functions for generating privacy risk report.""" from typing import List from leakpro.reporting import audit_report @@ -12,8 +13,8 @@ def prepare_priavcy_risk_report( audit_results: List, configs: dict, save_path: str = None, -): - """Generate privacy risk report based on the auditing report +) -> None: + """Generate privacy risk report based on the auditing report. Args: ---- @@ -48,13 +49,6 @@ def prepare_priavcy_risk_report( save=True, filename=f"{save_path}/Histogram.png", ) - # VulnerablePointsReport.generate_report( - # metric_results=audit_results[0], - # inference_game_type=InferenceGame.PRIVACY_LOSS_MODEL, - # target_info_source=target_info_source, - # target_model_to_train_split_mapping=target_model_to_train_split_mapping, - # filename = f"{save_path}/VulnerablePoints.png", - # ) else: raise ValueError( f"{len(audit_results)} results are not enough for {configs['privacy_game']})" diff --git a/leakpro/signals/signal.py b/leakpro/signals/signal.py index cbed01d..5a8aece 100644 --- a/leakpro/signals/signal.py +++ b/leakpro/signals/signal.py @@ -1,11 +1,13 @@ +"""Signal class, which is an abstract class representing any type of signal that can be obtained.""" + from abc import ABC, abstractmethod -from typing import List, Tuple +from typing import List, Self, Tuple import numpy as np from torch.utils.data import DataLoader -from ..dataset import Dataset -from ..model import Model +from leakpro.dataset import Dataset +from leakpro.model import Model ######################################################################################################################## # SIGNAL CLASS @@ -13,12 +15,11 @@ class Signal(ABC): - """Abstract class, representing any type of signal that can be obtained from a Model and/or a Dataset. - """ + """Abstract class, representing any type of signal that can be obtained from a Model and/or a Dataset.""" @abstractmethod - def __call__( - self, + def __call__( # noqa: ANN204 + self: Self, models: List[Model], datasets: List[Dataset], extra: dict, @@ -47,55 +48,6 @@ def __call__( pass -######################################################################################################################## -# DATASET_SAMPLE CLASS -######################################################################################################################## -class DatasetSample(Signal): - """Inherits from the Signal class, used to represent any type of signal that can be obtained from a Model and/or a - Dataset. - This particular class is used to get a given point from the Dataset. - """ - - def __call__( - self, - models: List[Model], - datasets: List[Dataset], - model_to_split_mapping: List[Tuple[int, str, str, str]], - extra: dict, - ): - """Built-in call method. - - Args: - ---- - models: List of models that can be queried. - datasets: List of datasets that can be queried. - model_to_split_mapping: List of tuples, indicating how each model should query the dataset. - More specifically, for model #i: - model_to_split_mapping[i][0] contains the index of the dataset in the list, - model_to_split_mapping[i][1] contains the name of the split, - model_to_split_mapping[i][2] contains the name of the input feature, - model_to_split_mapping[i][3] contains the name of the output feature. - This can also be provided once and for all at the instantiation of InformationSource, through the - default_model_to_split_mapping argument. - extra: Dictionary containing any additional parameter that should be passed to the signal object. - - Returns: - ------- - The sample point from the dataset. - - """ - ( - dataset_index, - split_name, - input_feature, - output_feature, - ) = model_to_split_mapping[extra["model_num"]] - x = datasets[dataset_index].get_feature(split_name, input_feature)[ - extra["point_num"] - ] - return x - - ######################################################################################################################## # MODEL_LOGIT CLASS ######################################################################################################################## @@ -103,15 +55,15 @@ def __call__( class ModelLogits(Signal): """Inherits from the Signal class, used to represent any type of signal that can be obtained from a Model and/or a Dataset. + This particular class is used to get the output of a model. """ def __call__( - self, + self: Self, models: List[Model], datasets: Dataset, - extra: dict=None, - ): + ) -> List[np.ndarray]: """Built-in call method. Args: @@ -150,16 +102,16 @@ def __call__( class ModelNegativeRescaledLogits(Signal): """Inherits from the Signal class, used to represent any type of signal that can be obtained from a Model and/or a Dataset. + This particular class is used to get the output of a model. """ def __call__( - self, + self:Self, models: List[Model], datasets: List[Dataset], model_to_split_mapping: List[Tuple[int, str, str, str]], - extra: dict, - ): + ) -> List[np.ndarray]: """Built-in call method. Args: @@ -193,10 +145,7 @@ def __call__( ) = model_to_split_mapping[k] x = datasets[dataset_index].get_feature(split_name, input_feature) # Check if output feature has been provided, else pass None - if output_feature is not None: - y = datasets[dataset_index].get_feature(split_name, output_feature) - else: - y = None + y = datasets[dataset_index].get_feature(split_name, output_feature) if output_feature is not None else None results.append(-model.get_rescaled_logits(x, y)) return results @@ -208,18 +157,18 @@ def __call__( class ModelIntermediateOutput(Signal): - """Inherits from the Signal class, used to represent any type of signal that can be obtained from a Model and/or a - Dataset. + """Used to represent any type of signal that can be obtained from a Model and/or a Dataset. + This particular class is used to get the value of an intermediate layer of model. """ def __call__( - self, + self:Self, models: List[Model], datasets: List[Dataset], model_to_split_mapping: List[Tuple[int, str, str, str]], extra: dict, - ): + ) -> List[np.ndarray]: """Built-in call method. Args: @@ -266,17 +215,16 @@ def __call__( class ModelLoss(Signal): - """Inherits from the Signal class, used to represent any type of signal that can be obtained from a Model and/or a - Dataset. + """Used to represent any type of signal that can be obtained from a Model and/or a Dataset. + This particular class is used to get the loss of a model. """ def __call__( - self, + self:Self, models: List[Model], datasets: List[Dataset], - extra: dict = None, - ): + ) -> List[np.ndarray]: """Built-in call method. Args: @@ -313,18 +261,17 @@ def __call__( # MODEL_GRADIENT CLASS ######################################################################################################################## class ModelGradient(Signal): - """Inherits from the Signal class, used to represent any type of signal that can be obtained from a Model and/or a - Dataset. + """Used to represent any type of signal that can be obtained from a Model and/or a Dataset. + This particular class is used to get the gradient of a model. """ def __call__( - self, + self:Self, models: List[Model], datasets: List[Dataset], model_to_split_mapping: List[Tuple[int, str, str, str]], - extra: dict, - ): + ) -> List[np.ndarray]: """Built-in call method. Args: @@ -368,18 +315,17 @@ def __call__( class GroupInfo(Signal): - """Inherits from the Signal class, used to represent any type of signal that can be obtained from a Model and/or a - Dataset. + """Used to represent any type of signal that can be obtained from a Model and/or a Dataset. + This particular class is used to get the group membership of data records. """ def __call__( - self, + self:Self, models: List[Model], datasets: List[Dataset], model_to_split_mapping: List[Tuple[int, str, str, str]], - extra: dict, - ): + ) -> List[np.ndarray]: """Built-in call method. Args: @@ -410,18 +356,17 @@ def __call__( class ModelGradientNorm(Signal): - """Inherits from the Signal class, used to represent any type of signal that can be obtained from a Model and/or a - Dataset. + """sed to represent any type of signal that can be obtained from a Model and/or a Dataset. + This particular class is used to get the gradient norm of a model. """ def __call__( - self, + self:Self, models: List[Model], datasets: List[Dataset], model_to_split_mapping: List[Tuple[int, str, str, str]], - extra: dict, - ): + ) -> List[np.ndarray]: """Built-in call method. Args: diff --git a/leakpro/train.py b/leakpro/train.py index 1a51e7c..a9deaa5 100644 --- a/leakpro/train.py +++ b/leakpro/train.py @@ -153,7 +153,7 @@ def train( # noqa: PLR0913 target = target.long() # noqa: PLW2901 # Move data to the device - data, target = data.to(device, non_blocking=True), target.to(device, non_blocking=True) + data, target = data.to(device, non_blocking=True), target.to(device, non_blocking=True) # noqa: PLW2901 # Set the gradients to zero optimizer.zero_grad()