Skip to content

Commit

Permalink
ruff 100%
Browse files Browse the repository at this point in the history
  • Loading branch information
johanos1 committed Mar 26, 2024
1 parent 81df82a commit 17075c8
Show file tree
Hide file tree
Showing 15 changed files with 324 additions and 265 deletions.
Binary file modified data/adult.pkl
Binary file not shown.
17 changes: 9 additions & 8 deletions leakpro.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
"""Main script to run LEAKPRO on a target model."""

import joblib
import logging
import numpy as np
from pathlib import Path
import random
import time
from pathlib import Path

import joblib
import numpy as np
import torch
import yaml

import leakpro.train as util
from leakpro import dataset, models
from leakpro.mia_attacks.attack_scheduler import AttackScheduler
from leakpro.reporting.utils import prepare_priavcy_risk_report
import leakpro.train as util


def setup_log(name: str, save_file: bool) -> logging.Logger:
Expand Down Expand Up @@ -51,8 +52,8 @@ def setup_log(name: str, save_file: bool) -> logging.Logger:
if __name__ == "__main__":

RETRAIN = True
#args = "./config/adult.yaml" # noqa: ERA001
args = "./config/cifar10.yaml"
args = "./config/adult.yaml" # noqa: ERA001
#args = "./config/cifar10.yaml" # noqa: ERA001
with open(args, "rb") as f:
configs = yaml.safe_load(f)

Expand All @@ -73,7 +74,7 @@ def setup_log(name: str, save_file: bool) -> logging.Logger:

# ------------------------------------------------
# Create the population dataset
population = dataset.get_dataset(configs["data"]["dataset"], configs["data"]["data_dir"])
population = dataset.get_dataset(configs["data"]["dataset"], configs["data"]["data_dir"], logger)
N_population = len(population)

# Create target training dataset and test dataset
Expand Down Expand Up @@ -118,7 +119,7 @@ def setup_log(name: str, save_file: bool) -> logging.Logger:
target_model_path = f"{log_dir}/model_0.pkl"
with open(target_model_path, "rb") as f:
if "adult" in configs["data"]["dataset"]:
target_model = models.NN(s
target_model = models.NN(
configs["train"]["inputs"], configs["train"]["outputs"]
) # TODO: read metadata to get the model
elif "cifar10" in configs["data"]["dataset"]:
Expand Down
1 change: 1 addition & 0 deletions leakpro/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Dummy init file for leakpro package."""
156 changes: 78 additions & 78 deletions leakpro/dataset.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
"""Module that contains the dataset class and functions for preparing the dataset for training the target models."""

import logging
import os
import pickle
from typing import List
from typing import List, Self

import joblib
import numpy as np
import pandas as pd
import torch
Expand All @@ -13,80 +17,62 @@


class GeneralDataset(Dataset):
def __init__(self, data:np.ndarray, label:np.ndarray, transforms=None):
"""data_list: A list of GeneralData instances.
"""
self.X = data # Convert to tensor and specify the data type
"""Dataset class for general data."""

def __init__(self:Self, data:np.ndarray, label:np.ndarray, transforms:torch.nn.Module=None) -> None:
"""data_list: A list of GeneralData instances."""
self.x = data # Convert to tensor and specify the data type
self.y = label # Assuming labels are for classification
self.transforms = transforms

def __len__(self):
def __len__(self:Self) -> int:
"""Return the length of the dataset."""
return len(self.y)

def __getitem__(self, idx):
"""Returns the data and label for a single instance indexed by idx.
"""
if self.transforms:
X = self.transforms(self.X[idx])
else:
X = self.X[idx]
def __getitem__(self:Self, idx:int) -> List[torch.Tensor]:
"""Return the data and label for a single instance indexed by idx."""
x = self.transforms(self.x[idx]) if self.transforms else self.x[idx]

# ensure that X is a tensor
if not isinstance(X, torch.Tensor):
X = torch.tensor(X, dtype=torch.float32)
# ensure that x is a tensor
if not isinstance(x, torch.Tensor):
x = torch.tensor(x, dtype=torch.float32)

y = torch.tensor(self.y[idx], dtype=torch.long)
return X, y
return x, y

class TabularDataset(Dataset):
"""Tabular dataset."""
class InfiniteRepeatDataset(GeneralDataset):
"""Dataset class for infinite repeat data."""

def __init__(self, X, y):
"""Initializes instance of class TabularDataset.
def __init__(self:Self, x:np.ndarray, y:np.ndarray, transform:torch.nn.Module=None) -> None:
"""Initialize the InfiniteRepeatDataset class.
Args:
----
X (str): features
y (str): target
x (np.ndarray): The input data.
y (np.ndarray): The target labels.
transform (torch.nn.Module, optional): The data transformation module. Defaults to None.
"""
super().__init__(
data_dict={"X": X, "y": y},
default_input="X",
default_output="y",
)

def __len__(self):
return len(self.data_dict["y"])

def __getitem__(self, idx):
# Convert idx from tensor to list due to pandas bug (that arises when using pytorch's random_split)
if isinstance(idx, torch.Tensor):
idx = idx.tolist()
X = np.float32(self.data_dict["X"][idx])
y = np.float32(self.data_dict["y"][idx])
return [X, y]

super().__init__(x, y, transform)

class InfiniteRepeatDataset(Dataset):
def __init__(self, dataset):
self.dataset = dataset

def __len__(self):
def __len__(self:Self) -> int:
"""Return the length of the dataset."""
return len(self.dataset)

def __getitem__(self, idx):
return self.dataset[idx % len(self.dataset)]
def __getitem__(self:Self, idx:int) -> List[torch.Tensor]:
"""Return the data and label for a single instance indexed by idx."""
return self.x[idx % len(self.dataset)], self.y[idx % len(self.dataset)]



def get_dataset(dataset_name: str, data_dir: str):
def get_dataset(dataset_name: str, data_dir: str, logger:logging.Logger) -> GeneralDataset:
"""Get the dataset."""
path = f"{data_dir}/{dataset_name}"

if os.path.exists(f"{path}.pkl"):
with open(f"{path}.pkl", "rb") as file:
all_data = pickle.load(file)
print(f"Load data from {path}.pkl")
all_data = joblib.load(file)
logger.info(f"Load data from {path}.pkl")
elif "adult" in dataset_name:
column_names = [
"age",
Expand All @@ -110,51 +96,51 @@ def get_dataset(dataset_name: str, data_dir: str):
f"{path}/{dataset_name}.test", names=column_names, header=0
)
df_test["income"] = df_test["income"].str.replace(".", "", regex=False)
df = pd.concat([df_train, df_test], axis=0)
df = df.replace(" ?", np.nan)
df = df.dropna()
X, y = df.iloc[:, :-1], df.iloc[:, -1]
df_concatenated = pd.concat([df_train, df_test], axis=0)
df_replaced = df_concatenated.replace(" ?", np.nan)
df_clean = df_replaced.dropna()
x, y = df_clean.iloc[:, :-1], df_clean.iloc[:, -1]

categorical_features = [col for col in X.columns if X[col].dtype == "object"]
categorical_features = [col for col in x.columns if x[col].dtype == "object"]
numerical_features = [
col for col in X.columns if X[col].dtype in ["int64", "float64"]
col for col in x.columns if x[col].dtype in ["int64", "float64"]
]

onehot_encoder = OneHotEncoder(sparse_output=False, handle_unknown="ignore")
X_categorical = onehot_encoder.fit_transform(X[categorical_features])
x_categorical = onehot_encoder.fit_transform(x[categorical_features])

scaler = StandardScaler()
X_numerical = scaler.fit_transform(X[numerical_features])
x_numerical = scaler.fit_transform(x[numerical_features])

X = np.hstack([X_numerical, X_categorical])
x = np.hstack([x_numerical, x_categorical])

# label encode the target variable to have the classes 0 and 1
y = LabelEncoder().fit_transform(y)

all_data = GeneralDataset(X,y)
all_data = GeneralDataset(x,y)
with open(f"{path}.pkl", "wb") as file:
pickle.dump(all_data, file)
print(f"Save data to {path}.pkl")
logger.info(f"Save data to {path}.pkl")
elif "cifar10" in dataset_name:
transform = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
trainset = torchvision.datasets.CIFAR10(root="./data/cifar10", train=True, download=True, transform=transform)
testset = torchvision.datasets.CIFAR10(root="./data/cifar10", train=False,download=True, transform=transform)
X = np.vstack([trainset.data, testset.data])
x = np.vstack([trainset.data, testset.data])
y = np.hstack([trainset.targets, testset.targets])

all_data = GeneralDataset(X, y, transform)
all_data = GeneralDataset(x, y, transform)

with open(f"{path}.pkl", "wb") as file:
pickle.dump(all_data, file)
print(f"Save data to {path}.pkl")
logger.info(f"Save data to {path}.pkl")

return all_data


def get_split(
all_index: List[int], used_index: List[int], size: int, split_method: str
):
"""Select points based on the splitting methods
) -> np.ndarray:
"""Select points based on the splitting methods.
Args:
----
Expand Down Expand Up @@ -190,8 +176,8 @@ def get_split(
return selected_index


def prepare_train_test_datasets(dataset_size: int, configs: dict):
"""Prepare the dataset for training the target models when the training data are sampled uniformly from the distribution (pool of all possible data).
def prepare_train_test_datasets(dataset_size: int, configs: dict) -> dict:
"""Prepare the dataset for training the target models when the training data are sampled uniformly from the population.
Args:
----
Expand All @@ -201,7 +187,7 @@ def prepare_train_test_datasets(dataset_size: int, configs: dict):
Returns:
-------
dict: Data split information which saves the information of training points index and test points index for all target models.
dict: Data split information which saves the information of training points index and test points index.
"""
# The index_list will save all the information about the train, test and auit for each target model.
Expand All @@ -211,38 +197,51 @@ def prepare_train_test_datasets(dataset_size: int, configs: dict):

selected_index = np.random.choice(all_index, train_size + test_size, replace=False)
train_index, test_index = train_test_split(selected_index, test_size=test_size)
dataset_train_test = {"train_indices": train_index, "test_indices": test_index}
return dataset_train_test
return {"train_indices": train_index, "test_indices": test_index}


def get_dataset_subset(dataset: Dataset, indices: List[int]):
def get_dataset_subset(dataset: Dataset, indices: List[int]) -> Dataset:
"""Get a subset of the dataset.
Args:
----
dataset (torchvision.datasets): Whole dataset.
index (list): List of index.
indices (list): List of indices.
"""
assert max(indices) < len(dataset) and min(indices) >= 0, "Index out of range"
if max(indices) >= len(dataset) or min(indices) < 0:
raise ValueError("Index out of range")

data = dataset.X
targets = dataset.y
transforms = dataset.transforms
subset_data = [data[idx] for idx in indices]
subset_targets = [targets[idx] for idx in indices]

new_dataset = dataset.__class__(subset_data, subset_targets, transforms)
return dataset.__class__(subset_data, subset_targets, transforms)

return new_dataset


def get_dataloader(
dataset: GeneralDataset,
batch_size: int,
loader_type="torch",
loader_type: str = "torch",
shuffle: bool = True,
):
) -> torch.utils.data.DataLoader:
"""Get a data loader for the given dataset.
Args:
----
dataset (GeneralDataset): The dataset to load.
batch_size (int): The batch size.
loader_type (str, optional): The type of data loader. Defaults to "torch".
shuffle (bool, optional): Whether to shuffle the data. Defaults to True.
Returns:
-------
torch.utils.data.DataLoader: The data loader.
"""
if loader_type == "torch":
return torch.utils.data.DataLoader(
dataset,
Expand All @@ -253,3 +252,4 @@ def get_dataloader(
persistent_workers=True,
prefetch_factor=16,
)
return None
Loading

0 comments on commit 17075c8

Please sign in to comment.