diff --git a/flexynesis/__init__.py b/flexynesis/__init__.py index a08068d..eb92695 100644 --- a/flexynesis/__init__.py +++ b/flexynesis/__init__.py @@ -50,14 +50,11 @@ Bora Uyar, bora.uyar@mdc-berlin.de """ -from .models_shared import * +from .modules import * from .data import * from .main import * -from .model_DirectPred import * -from .model_SVAE import * -from .model_TripletEncoder import * +from .models import * from .feature_selection import * from .data_augmentation import * from .utils import * from .config import * -from . import models diff --git a/flexynesis/model_SVAE.py b/flexynesis/model_SVAE.py deleted file mode 100644 index 3ef9160..0000000 --- a/flexynesis/model_SVAE.py +++ /dev/null @@ -1,410 +0,0 @@ -# Supervised VAE-MMD architecture -import torch -from torch import nn -from torch.nn import functional as F -from torch.utils.data import Dataset, DataLoader, random_split - -import pandas as pd -import numpy as np - -import pytorch_lightning as pl -from scipy import stats - -from captum.attr import IntegratedGradients - -from .models_shared import * - -# Supervised Variational Auto-encoder that can train one or more layers of omics datasets -# num_layers: number of omics layers in the input -# each layer is encoded separately, encodings are concatenated, and decoded separately -# depends on MLP, Encoder, Decoder classes in models_shared -class supervised_vae(pl.LightningModule): - """ - Supervised Variational Auto-encoder for multi-omics data fusion and prediction. - - This class implements a deep learning model for fusing and predicting from multiple omics layers/matrices. - Each omics layer is encoded separately using an Encoder. The resulting latent representations are then - concatenated and passed through a fully connected network (fusion layer) to make predictions. The model - also includes a supervisor head for supervised learning. - - Args: - num_layers (int): Number of omics layers/matrices. - input_dims (list of int): A list of input dimensions for each omics layer. - hidden_dims (list of int): A list of hidden dimensions for the Encoder and Decoder. - latent_dim (int): The dimension of the latent space for each encoder. - num_class (int): Number of output classes for the prediction task. - **kwargs: Additional keyword arguments to be passed to the MLP encoders. - - Example: - - # Instantiate a supervised_vae model with 2 omics layers and input dimensions of 100 and 200 - model = supervised_vae(num_layers=2, input_dims=[100, 200], hidden_dims=[64, 32], latent_dim=16, num_class=1) - - """ - def __init__(self, config, dataset, target_variables, batch_variables = None, val_size = 0.2): - super(supervised_vae, self).__init__() - self.config = config - self.dataset = dataset - self.target_variables = target_variables - self.batch_variables = batch_variables - self.variables = target_variables + batch_variables if batch_variables else target_variables - self.val_size = val_size - - self.dat_train, self.dat_val = self.prepare_data() - self.feature_importances = {} - - # sometimes the model may have exploding/vanishing gradients leading to NaN values - self.nan_detected = False - - layers = list(dataset.dat.keys()) - input_dims = [len(dataset.features[layers[i]]) for i in range(len(layers))] - # create a list of Encoder instances for separately encoding each omics layer - self.encoders = nn.ModuleList([Encoder(input_dims[i], [config['hidden_dim']], config['latent_dim']) for i in range(len(layers))]) - # Fully connected layers for concatenated means and log_vars - self.FC_mean = nn.Linear(len(layers) * config['latent_dim'], config['latent_dim']) - self.FC_log_var = nn.Linear(len(layers) * config['latent_dim'], config['latent_dim']) - # list of decoders to decode each omics layer separately - self.decoders = nn.ModuleList([Decoder(config['latent_dim'], [config['hidden_dim']], input_dims[i]) for i in range(len(layers))]) - - # define supervisor heads - # using ModuleDict to store multiple MLPs - self.MLPs = nn.ModuleDict() - for var in self.target_variables: - if self.dataset.variable_types[var] == 'numerical': - num_class = 1 - else: - num_class = len(np.unique(self.dataset.ann[var])) - self.MLPs[var] = MLP(input_dim = config['latent_dim'], - hidden_dim = config['supervisor_hidden_dim'], - output_dim = num_class) - - def multi_encoder(self, x_list): - """ - Encode each input matrix separately using the corresponding Encoder. - - Args: - x_list (list of torch.Tensor): List of input matrices for each omics layer. - - Returns: - tuple: Tuple containing: - - mean (torch.Tensor): Concatenated mean values from each encoder. - - log_var (torch.Tensor): Concatenated log variance values from each encoder. - """ - means, log_vars = [], [] - # Process each input matrix with its corresponding Encoder - for i, x in enumerate(x_list): - mean, log_var = self.encoders[i](x) - means.append(mean) - log_vars.append(log_var) - - # Concatenate means and log_vars - # Push concatenated means and log_vars through the fully connected layers - mean = self.FC_mean(torch.cat(means, dim=1)) - log_var = self.FC_log_var(torch.cat(log_vars, dim=1)) - return mean, log_var - - def forward(self, x_list): - """ - Forward pass through the model. - - Args: - x_list (list of torch.Tensor): List of input matrices for each omics layer. - - Returns: - tuple: Tuple containing: - - x_hat_list (list of torch.Tensor): List of reconstructed matrices for each omics layer. - - z (torch.Tensor): Latent representation. - - mean (torch.Tensor): Concatenated mean values from each encoder. - - log_var (torch.Tensor): Concatenated log variance values from each encoder. - - y_pred (torch.Tensor): Predicted output. - """ - mean, log_var = self.multi_encoder(x_list) - - # generate latent layer - z = self.reparameterization(mean, log_var) - - # Decode each latent variable with its corresponding Decoder - x_hat_list = [self.decoders[i](z) for i in range(len(x_list))] - - #run the supervisor heads using the latent layer as input - outputs = {} - for var, mlp in self.MLPs.items(): - outputs[var] = mlp(z) - - return x_hat_list, z, mean, log_var, outputs - - def reparameterization(self, mean, var): - """ - Reparameterize the mean and variance values. - - Args: - mean (torch.Tensor): Mean values from the encoders. - var (torch.Tensor): Variance values from the encoders. - - Returns: - torch.Tensor: Latent representation. - """ - epsilon = torch.randn_like(var) - z = mean + var*epsilon - return z - - def configure_optimizers(self): - """ - Configure the optimizer for the model. - - Returns: - torch.optim.Adam: Adam optimizer with learning rate 1e-3. - """ - optimizer = torch.optim.Adam(self.parameters(), lr=self.config['lr']) - return optimizer - - def compute_loss(self, var, y, y_hat): - if self.dataset.variable_types[var] == 'numerical': - # Ignore instances with missing labels for numerical variables - valid_indices = ~torch.isnan(y) - if valid_indices.sum() > 0: # only calculate loss if there are valid targets - y_hat = y_hat[valid_indices] - y = y[valid_indices] - loss = F.mse_loss(torch.flatten(y_hat), y.float()) - else: - loss = 0 # if no valid labels, set loss to 0 - else: - # Ignore instances with missing labels for categorical variables - # Assuming that missing values were encoded as -1 - valid_indices = (y != -1) & (~torch.isnan(y)) - if valid_indices.sum() > 0: # only calculate loss if there are valid targets - y_hat = y_hat[valid_indices] - y = y[valid_indices] - loss = F.cross_entropy(y_hat, y.long()) - else: - loss = 0 - return loss - - def training_step(self, train_batch, batch_idx): - dat, y_dict = train_batch - layers = dat.keys() - x_list = [dat[x] for x in layers] - - x_hat_list, z, mean, log_var, outputs = self.forward(x_list) - - # compute mmd loss for each layer and take average - mmd_loss_list = [self.MMD_loss(z.shape[1], z, x_hat_list[i], x_list[i]) for i in range(len(layers))] - mmd_loss = torch.mean(torch.stack(mmd_loss_list)) - - # compute loss values for the supervisor heads - losses = {'mmd_loss': mmd_loss} - - for var in self.target_variables: - y_hat = outputs[var] - y = y_dict[var] - loss = self.compute_loss(var, y, y_hat) - losses[var] = loss - total_loss = sum(losses.values()) - losses['train_loss'] = total_loss - self.log_dict(losses, on_step=False, on_epoch=True, prog_bar=True) - return total_loss - - def validation_step(self, val_batch, batch_idx): - dat, y_dict = val_batch - layers = dat.keys() - x_list = [dat[x] for x in layers] - - x_hat_list, z, mean, log_var, outputs = self.forward(x_list) - - # compute mmd loss for each layer and take average - mmd_loss_list = [self.MMD_loss(z.shape[1], z, x_hat_list[i], x_list[i]) for i in range(len(layers))] - mmd_loss = torch.mean(torch.stack(mmd_loss_list)) - - # compute loss values for the supervisor heads - losses = {'mmd_loss': mmd_loss} - for var in self.target_variables: - y_hat = outputs[var] - y = y_dict[var] - loss = self.compute_loss(var, y, y_hat) - losses[var] = loss - - total_loss = sum(losses.values()) - losses['val_loss'] = total_loss - self.log_dict(losses, on_step=False, on_epoch=True, prog_bar=True) - return total_loss - - def prepare_data(self): - lt = int(len(self.dataset)*(1-self.val_size)) - lv = len(self.dataset)-lt - dat_train, dat_val = random_split(self.dataset, [lt, lv], - generator=torch.Generator().manual_seed(42)) - return dat_train, dat_val - - def train_dataloader(self): - return DataLoader(self.dat_train, batch_size=int(self.config['batch_size']), num_workers=0, pin_memory=True, shuffle=True, drop_last=True) - - def val_dataloader(self): - return DataLoader(self.dat_val, batch_size=int(self.config['batch_size']), num_workers=0, pin_memory=True, shuffle=False) - - def transform(self, dataset): - """ - Transform the input dataset to latent representation. - - Args: - dataset (MultiOmicDataset): MultiOmicDataset containing input matrices for each omics layer. - - Returns: - pd.DataFrame: Transformed dataset as a pandas DataFrame. - """ - self.eval() - layers = list(dataset.dat.keys()) - x_list = [dataset.dat[x] for x in layers] - M = self.forward(x_list)[1].detach().numpy() - z = pd.DataFrame(M) - z.columns = [''.join(['E', str(x)]) for x in z.columns] - z.index = dataset.samples - return z - - def predict(self, dataset): - """ - Evaluate the model on a dataset. - - Args: - dataset (CustomDataset): Custom dataset containing input matrices for each omics layer. - - Returns: - predicted values. - """ - self.eval() - layers = list(dataset.dat.keys()) - x_list = [dataset.dat[x] for x in layers] - X_hat, z, mean, log_var, outputs = self.forward(x_list) - - predictions = {} - for var in self.target_variables: - y_pred = outputs[var].detach().numpy() - if self.dataset.variable_types[var] == 'categorical': - predictions[var] = np.argmax(y_pred, axis=1) - else: - predictions[var] = y_pred - - return predictions - - def compute_kernel(self, x, y): - """ - Compute the Gaussian kernel matrix between two sets of vectors. - - Args: - x (torch.Tensor): A tensor of shape (x_size, dim) representing the first set of vectors. - y (torch.Tensor): A tensor of shape (y_size, dim) representing the second set of vectors. - - Returns: - torch.Tensor: The Gaussian kernel matrix of shape (x_size, y_size) computed between x and y. - """ - x_size = x.size(0) - y_size = y.size(0) - dim = x.size(1) - x = x.unsqueeze(1) # (x_size, 1, dim) - y = y.unsqueeze(0) # (1, y_size, dim) - tiled_x = x.expand(x_size, y_size, dim) - tiled_y = y.expand(x_size, y_size, dim) - kernel_input = (tiled_x - tiled_y).pow(2).mean(2)/float(dim) - return torch.exp(-kernel_input) # (x_size, y_size) - - def compute_mmd(self, x, y): - """ - Compute the maximum mean discrepancy (MMD) between two sets of vectors. - - Args: - x (torch.Tensor): A tensor of shape (x_size, dim) representing the first set of vectors. - y (torch.Tensor): A tensor of shape (y_size, dim) representing the second set of vectors. - - Returns: - torch.Tensor: A scalar tensor representing the MMD between x and y. - """ - x_kernel = self.compute_kernel(x, x) - y_kernel = self.compute_kernel(y, y) - xy_kernel = self.compute_kernel(x, y) - mmd = x_kernel.mean() + y_kernel.mean() - 2*xy_kernel.mean() - return mmd - - def MMD_loss(self, latent_dim, z, xhat, x): - """ - Compute the loss function based on maximum mean discrepancy (MMD) and negative log likelihood (NLL). - - Args: - latent_dim (int): The dimensionality of the latent space. - z (torch.Tensor): A tensor of shape (batch_size, latent_dim) representing the latent codes. - xhat (torch.Tensor): A tensor of shape (batch_size, dim) representing the reconstructed data. - x (torch.Tensor): A tensor of shape (batch_size, dim) representing the original data. - - Returns: - torch.Tensor: A scalar tensor representing the MMD loss. - """ - true_samples = torch.randn(200, latent_dim) - mmd = self.compute_mmd(true_samples, z) # compute maximum mean discrepancy (MMD) - nll = (xhat - x).pow(2).mean() #negative log likelihood - return mmd+nll - - # Adaptor forward function for captum integrated gradients. - def forward_target(self, *args): - input_data = list(args[:-2]) # one or more tensors (one per omics layer) - target_var = args[-2] # target variable of interest - steps = args[-1] # number of steps for IntegratedGradients().attribute - outputs_list = [] - for i in range(steps): - # get list of tensors for each step into a list of tensors - x_step = [input_data[j][i] for j in range(len(input_data))] - x_hat_list, z, mean, log_var, outputs = self.forward(x_step) - outputs_list.append(outputs[target_var]) - return torch.cat(outputs_list, dim = 0) - - def compute_feature_importance(self, target_var, steps = 5): - """ - Compute the feature importance. - - Args: - input_data (torch.Tensor): The input data to compute the feature importance for. - target_var (str): The target variable to compute the feature importance for. - Returns: - attributions (list of torch.Tensor): The feature importances for each class. - """ - x_list = [self.dataset.dat[x] for x in self.dataset.dat.keys()] - - # Initialize the Integrated Gradients method - ig = IntegratedGradients(self.forward_target) - - input_data = tuple([data.unsqueeze(0).requires_grad_() for data in x_list]) - - # Define a baseline (you might need to adjust this depending on your actual data) - baseline = tuple([torch.zeros_like(data) for data in input_data]) - - # Get the number of classes for the target variable - if self.dataset.variable_types[target_var] == 'numerical': - num_class = 1 - else: - num_class = len(np.unique(self.dataset.ann[target_var])) - - # Compute the feature importance for each class - attributions = [] - if num_class > 1: - for target_class in range(num_class): - attributions.append(ig.attribute(input_data, baseline, additional_forward_args=(target_var, steps), target=target_class, n_steps=steps)) - else: - attributions.append(ig.attribute(input_data, baseline, additional_forward_args=(target_var, steps), n_steps=steps)) - - # summarize feature importances - # Compute absolute attributions - abs_attr = [[torch.abs(a) for a in attr_class] for attr_class in attributions] - # average over samples - imp = [[a.mean(dim=1) for a in attr_class] for attr_class in abs_attr] - - # combine into a single data frame - df_list = [] - layers = list(self.dataset.dat.keys()) - for i in range(num_class): - for j in range(len(layers)): - features = self.dataset.features[layers[j]] - importances = imp[i][j][0].detach().numpy() - df_list.append(pd.DataFrame({'target_variable': target_var, 'target_class': i, 'layer': layers[j], 'name': features, 'importance': importances})) - df_imp = pd.concat(df_list, ignore_index = True) - - # save scores in model - self.feature_importances[target_var] = df_imp - - diff --git a/flexynesis/model_TripletEncoder.py b/flexynesis/model_TripletEncoder.py deleted file mode 100644 index 7a6bb77..0000000 --- a/flexynesis/model_TripletEncoder.py +++ /dev/null @@ -1,368 +0,0 @@ -# Generating encodings of multi-omic data using triplet loss -import torch -from torch import nn -from torch.nn import functional as F -from torch.utils.data import Dataset, DataLoader, random_split - -import pandas as pd -import numpy as np - -import pytorch_lightning as pl - -from .models_shared import * -from .data import TripletMultiOmicDataset - -from captum.attr import IntegratedGradients - - -class MultiEmbeddingNetwork(nn.Module): - """ - A neural network module that computes multiple embeddings and fuses them into a single representation. - - Attributes: - embedding_networks (nn.ModuleList): A list of EmbeddingNetwork instances for each input feature. - """ - def __init__(self, input_sizes, hidden_sizes, output_size): - """ - Initialize the MultiEmbeddingNetwork with the given input sizes, hidden sizes, and output size. - - Args: - input_sizes (list of int): A list of input sizes for each EmbeddingNetwork instance. - hidden_sizes (list of int): A list of hidden sizes for each EmbeddingNetwork instance. - output_size (int): The size of the fused embedding output. - """ - super(MultiEmbeddingNetwork, self).__init__() - self.embedding_networks = nn.ModuleList([ - EmbeddingNetwork(input_size, hidden_size, output_size) - for input_size, hidden_size in zip(input_sizes, hidden_sizes) - ]) - - def forward(self, x): - """ - Compute the forward pass of the MultiEmbeddingNetwork and return the fused embedding. - - Args: - x (dict): A dictionary containing the input tensors for each EmbeddingNetwork. - Keys should correspond to the feature names and values to the input tensors. - - Returns: - torch.Tensor: The fused embedding tensor resulting from the concatenation of individual embeddings. - """ - embeddings = [ - embedding_network(x[key]) - for key, embedding_network in zip(x.keys(), self.embedding_networks) - ] - fused_embedding = torch.cat((embeddings), dim=-1) - return fused_embedding - -class MultiTripletNetwork(pl.LightningModule): - """ - """ - def __init__(self, config, dataset, target_variables, batch_variables = None, val_size = 0.2): - """ - Initialize the MultiTripletNetwork with the given parameters. - - Args: - TODO - """ - super(MultiTripletNetwork, self).__init__() - - self.config = config - self.target_variables = target_variables - self.batch_variables = batch_variables - self.variables = target_variables + batch_variables if batch_variables else target_variables - self.val_size = val_size - self.dataset = dataset - self.ann = self.dataset.ann - self.variable_types = self.dataset.variable_types - self.feature_importances = {} - - layers = list(dataset.dat.keys()) - input_sizes = [len(dataset.features[layers[i]]) for i in range(len(layers))] - hidden_sizes = [config['hidden_dim'] for x in range(len(layers))] - - - # The first target variable is the main variable that dictates the triplets - # it has to be a categorical variable - main_var = self.target_variables[0] - if self.dataset.variable_types[main_var] == 'numerical': - raise ValueError("The first target variable",main_var," must be a categorical variable") - - # create train/validation splits and convert TripletMultiOmicDataset format - self.dataset = TripletMultiOmicDataset(self.dataset, main_var) - self.dat_train, self.dat_val = self.prepare_data() - - # define embedding network for data matrices - self.multi_embedding_network = MultiEmbeddingNetwork(input_sizes, hidden_sizes, config['latent_dim']) - - # define supervisor heads for both target and batch variables - self.MLPs = nn.ModuleDict() # using ModuleDict to store multiple MLPs - for var in self.target_variables: - if self.variable_types[var] == 'numerical': - num_class = 1 - else: - num_class = len(np.unique(self.ann[var])) - self.MLPs[var] = MLP(input_dim=self.config['latent_dim'] * len(layers), - hidden_dim=self.config['supervisor_hidden_dim'], - output_dim=num_class) - - def forward(self, anchor, positive, negative): - """ - Compute the forward pass of the MultiTripletNetwork and return the embeddings and predictions. - - Args: - anchor (dict): A dictionary containing the anchor input tensors for each EmbeddingNetwork. - positive (dict): A dictionary containing the positive input tensors for each EmbeddingNetwork. - negative (dict): A dictionary containing the negative input tensors for each EmbeddingNetwork. - - Returns: - tuple: A tuple containing the anchor, positive, and negative embeddings and the predicted class labels. - """ - # triplet encoding - anchor_embedding = self.multi_embedding_network(anchor) - positive_embedding = self.multi_embedding_network(positive) - negative_embedding = self.multi_embedding_network(negative) - - #run the supervisor heads using the anchor embeddings as input - outputs = {} - for var, mlp in self.MLPs.items(): - outputs[var] = mlp(anchor_embedding) - return anchor_embedding, positive_embedding, negative_embedding, outputs - - def configure_optimizers(self): - """ - Configure the optimizer for the MultiTripletNetwork. - - Returns: - torch.optim.Optimizer: The configured optimizer. - """ - optimizer = torch.optim.Adam(self.parameters(), lr=self.config['lr']) - return optimizer - - def triplet_loss(self, anchor, positive, negative, margin=1.0): - """ - Compute the triplet loss for the given anchor, positive, and negative embeddings. - - Args: - anchor (torch.Tensor): The anchor embedding tensor. - positive (torch.Tensor): The positive embedding tensor. - negative (torch.Tensor): The negative embedding tensor. - margin (float, optional): The margin for the triplet loss. Default is 1.0. - - Returns: - torch.Tensor: The computed triplet loss. - """ - distance_positive = (anchor - positive).pow(2).sum(1) - distance_negative = (anchor - negative).pow(2).sum(1) - losses = torch.relu(distance_positive - distance_negative + margin) - return losses.mean() - - def compute_loss(self, var, y, y_hat): - if self.variable_types[var] == 'numerical': - # Ignore instances with missing labels for numerical variables - valid_indices = ~torch.isnan(y) - if valid_indices.sum() > 0: # only calculate loss if there are valid targets - y_hat = y_hat[valid_indices] - y = y[valid_indices] - - loss = F.mse_loss(torch.flatten(y_hat), y.float()) - else: - loss = 0 # if no valid labels, set loss to 0 - else: - # Ignore instances with missing labels for categorical variables - # Assuming that missing values were encoded as -1 - valid_indices = (y != -1) & (~torch.isnan(y)) - if valid_indices.sum() > 0: # only calculate loss if there are valid targets - y_hat = y_hat[valid_indices] - y = y[valid_indices] - loss = F.cross_entropy(y_hat, y.long()) - else: - loss = 0 - return loss - def training_step(self, train_batch, batch_idx): - anchor, positive, negative, y_dict = train_batch[0], train_batch[1], train_batch[2], train_batch[3] - anchor_embedding, positive_embedding, negative_embedding, outputs = self.forward(anchor, positive, negative) - triplet_loss = self.triplet_loss(anchor_embedding, positive_embedding, negative_embedding) - - # compute loss values for the supervisor heads - losses = {'triplet_loss': triplet_loss} - for var in self.target_variables: - y_hat = outputs[var] - y = y_dict[var] - loss = self.compute_loss(var, y, y_hat) - losses[var] = loss - - total_loss = sum(losses.values()) - losses['train_loss'] = total_loss - self.log_dict(losses, on_step=False, on_epoch=True, prog_bar=True) - return total_loss - - def validation_step(self, val_batch, batch_idx): - anchor, positive, negative, y_dict = val_batch[0], val_batch[1], val_batch[2], val_batch[3] - anchor_embedding, positive_embedding, negative_embedding, outputs = self.forward(anchor, positive, negative) - triplet_loss = self.triplet_loss(anchor_embedding, positive_embedding, negative_embedding) - - # compute loss values for the supervisor heads - losses = {'triplet_loss': triplet_loss} - for var in self.target_variables: - y_hat = outputs[var] - y = y_dict[var] - loss = self.compute_loss(var, y, y_hat) - losses[var] = loss - - total_loss = sum(losses.values()) - losses['val_loss'] = total_loss - self.log_dict(losses, on_step=False, on_epoch=True, prog_bar=True) - return total_loss - - def prepare_data(self): - lt = int(len(self.dataset)*(1-self.val_size)) - lv = len(self.dataset)-lt - dat_train, dat_val = random_split(self.dataset, [lt, lv], - generator=torch.Generator().manual_seed(42)) - return dat_train, dat_val - - def train_dataloader(self): - return DataLoader(self.dat_train, batch_size=int(self.config['batch_size']), num_workers=0, pin_memory=True, shuffle=True, drop_last=True) - - def val_dataloader(self): - return DataLoader(self.dat_val, batch_size=int(self.config['batch_size']), num_workers=0, pin_memory=True, shuffle=False) - - # dataset: MultiOmicDataset - def transform(self, dataset): - """ - Transforms the input dataset by generating embeddings and predictions. - - Args: - dataset (MultiOmicDataset): An instance of the MultiOmicDataset class. - - Returns: - z (pd.DataFrame): A dataframe containing the computed embeddings. - y_pred (np.ndarray): A numpy array containing the predicted labels. - """ - self.eval() - # get anchor embeddings - z = pd.DataFrame(self.multi_embedding_network(dataset.dat).detach().numpy()) - z.columns = [''.join(['E', str(x)]) for x in z.columns] - z.index = dataset.samples - return z - - def predict(self, dataset): - """ - Evaluate the model on a given dataset. - - Args: - dataset: The dataset to evaluate the model on. - - Returns: - A dictionary where each key is a target variable and the corresponding value is the predicted output for that variable. - """ - self.eval() - # get anchor embedding - anchor_embedding = self.multi_embedding_network(dataset.dat) - # get MLP outputs for each var - outputs = {} - for var, mlp in self.MLPs.items(): - outputs[var] = mlp(anchor_embedding) - - # get predictions from the mlp outputs for each var - predictions = {} - for var in self.target_variables: - y_pred = outputs[var].detach().numpy() - if self.variable_types[var] == 'categorical': - predictions[var] = np.argmax(y_pred, axis=1) - else: - predictions[var] = y_pred - return predictions - - - # Adaptor forward function for captum integrated gradients. - # layer_sizes: number of features in each omic layer - def forward_target(self, input_data, layer_sizes, target_var, steps): - outputs_list = [] - for i in range(steps): - # for each step, get anchor/positive/negative tensors - # (split the concatenated omics layers) - anchor = input_data[i][0].split(layer_sizes, dim = 1) - positive = input_data[i][1].split(layer_sizes, dim = 1) - negative = input_data[i][2].split(layer_sizes, dim = 1) - - # convert to dict - anchor = {k: anchor[k] for k in range(len(anchor))} - positive = {k: anchor[k] for k in range(len(positive))} - negative = {k: anchor[k] for k in range(len(negative))} - anchor_embedding, positive_embedding, negative_embedding, outputs = self.forward(anchor, positive, negative) - outputs_list.append(outputs[target_var]) - return torch.cat(outputs_list, dim = 0) - - def compute_feature_importance(self, target_var, steps = 5): - """ - Compute the feature importance. - - Args: - input_data (torch.Tensor): The input data to compute the feature importance for. - target_var (str): The target variable to compute the feature importance for. - Returns: - attributions (list of torch.Tensor): The feature importances for each class. - """ - - # self.dataset is a TripletMultiomicDataset, which has a different - # structure than the MultiomicDataset. We use data loader to - # read the triplets and get anchor/positive/negative tensors - # read the whole dataset - dl = DataLoader(self.dataset, batch_size=len(self.dataset)) - it = iter(dl) - anchor, positive, negative, y_dict = next(it) - - # Initialize the Integrated Gradients method - ig = IntegratedGradients(self.forward_target) - - anchor = [data.requires_grad_() for data in list(anchor.values())] - positive = [data.requires_grad_() for data in list(positive.values())] - negative = [data.requires_grad_() for data in list(negative.values())] - - # concatenate multiomic layers of each list element - # then stack the anchor/positive/negative - # the purpose is to get a single tensor - input_data = torch.stack([torch.cat(sublist, dim = 1) for sublist in [anchor, positive, negative]]).unsqueeze(0) - - # layer sizes will be needed to revert the concatenated tensor - # anchor/positive/negative have the same shape - layer_sizes = [anchor[i].shape[1] for i in range(len(anchor))] - - # Define a baseline - baseline = torch.zeros_like(input_data) - - # Get the number of classes for the target variable - if self.variable_types[target_var] == 'numerical': - num_class = 1 - else: - num_class = len(np.unique(self.ann[target_var])) - - # Compute the feature importance for each class - attributions = [] - if num_class > 1: - for target_class in range(num_class): - attributions.append(ig.attribute(input_data, baseline, additional_forward_args=(layer_sizes, target_var, steps), target=target_class, n_steps=steps)) - else: - attributions.append(ig.attribute(input_data, baseline, additional_forward_args=(layer_sizes, target_var, steps), n_steps=steps)) - - # summarize feature importances - # Compute absolute attributions - abs_attr = [[torch.abs(a) for a in attr_class] for attr_class in attributions] - # average over samples - imp = [[a.mean(dim=1) for a in attr_class] for attr_class in abs_attr] - - # combine into a single data frame - df_list = [] - layers = list(self.dataset.dataset.dat.keys()) # accessing multiomicdataset within tripletmultiomic dataset here - for i in range(num_class): - imp_layerwise = imp[i][0].split(layer_sizes, dim = 1) - for j in range(len(layers)): - features = self.dataset.dataset.features[layers[j]] # accessing multiomicdataset within tripletmultiomic dataset here - importances = imp_layerwise[j][0].detach().numpy() # 0 => extract importances only for the anchor - df_list.append(pd.DataFrame({'target_variable': target_var, 'target_class': i, 'layer': layers[j], 'name': features, 'importance': importances})) - df_imp = pd.concat(df_list, ignore_index = True) - - # save scores in model - self.feature_importances[target_var] = df_imp diff --git a/flexynesis/models/__init__.py b/flexynesis/models/__init__.py index 2eda149..a4d3dfb 100644 --- a/flexynesis/models/__init__.py +++ b/flexynesis/models/__init__.py @@ -1,6 +1,6 @@ from .direct_pred import DirectPred from .direct_pred_cnn import DirectPredCNN -from .supervised_vae import SupervisedVAE +from .supervised_vae import supervised_vae from .triplet_encoder import MultiTripletNetwork -__all__ = ["DirectPred", "DirectPredCNN", "SupervisedVAE", "MultiTripletNetwork"] +__all__ = ["DirectPred", "DirectPredCNN", "supervised_vae", "MultiTripletNetwork"] diff --git a/flexynesis/model_DirectPred.py b/flexynesis/models/base_direct_pred.py similarity index 52% rename from flexynesis/model_DirectPred.py rename to flexynesis/models/base_direct_pred.py index 88926fd..dd72a0c 100644 --- a/flexynesis/model_DirectPred.py +++ b/flexynesis/models/base_direct_pred.py @@ -1,24 +1,18 @@ import torch -from torch import nn from torch.nn import functional as F -import pytorch_lightning as pl -from torch.utils.data import Dataset, DataLoader, random_split +from torch.utils.data import DataLoader, random_split -import pandas as pd import numpy as np -import os, argparse -from scipy import stats -from functools import reduce - -from captum.attr import IntegratedGradients +import pandas as pd -from .models_shared import * +import pytorch_lightning as pl +from captum.attr import IntegratedGradients -class DirectPred(pl.LightningModule): - def __init__(self, config, dataset, target_variables, batch_variables = None, val_size = 0.2): - super(DirectPred, self).__init__() +class BaseDirectPred(pl.LightningModule): + def __init__(self, config, dataset, target_variables, batch_variables=None, val_size=0.2): + super().__init__() self.config = config self.dataset = dataset self.target_variables = target_variables @@ -26,43 +20,18 @@ def __init__(self, config, dataset, target_variables, batch_variables = None, va self.variables = target_variables + batch_variables if batch_variables else target_variables self.val_size = val_size self.dat_train, self.dat_val = self.prepare_data() - self.feature_importances = {} + self.feature_importances = {} # Instantiate layers. self._init_encoders() self._init_output_layers() def _init_encoders(self): - layers = list(self.dataset.dat.keys()) - input_dims = [len(self.dataset.features[layers[i]]) for i in range(len(layers))] - self.encoders = nn.ModuleList([ - MLP(input_dim=input_dims[i], hidden_dim=self.config["hidden_dim"], output_dim=self.config["latent_dim"]) - for i in range(len(layers)) - ]) + raise NotImplementedError def _init_output_layers(self): - layers = list(self.dataset.dat.keys()) - self.MLPs = nn.ModuleDict() # using ModuleDict to store multiple MLPs - for var in self.target_variables: - if self.dataset.variable_types[var] == "numerical": - num_class = 1 - else: - num_class = len(np.unique(self.dataset.ann[var])) - self.MLPs[var] = MLP( - input_dim=self.config["latent_dim"] * len(layers), - hidden_dim=self.config["hidden_dim"], - output_dim=num_class, - ) + raise NotImplementedError def forward(self, x_list): - """ - Forward pass of the DirectPred model. - - Args: - x_list (list of torch.Tensor): A list of input matrices (omics layers), one for each layer. - - Returns: - dict: A dictionary where each key-value pair corresponds to the target variable name and its predicted output respectively. - """ embeddings_list = [] # Process each input matrix with its corresponding Encoder for i, x in enumerate(x_list): @@ -72,22 +41,14 @@ def forward(self, x_list): outputs = {} for var, mlp in self.MLPs.items(): outputs[var] = mlp(embeddings_concat) - return outputs - - - def configure_optimizers(self): - """ - Configure the optimizer for the DirectPred model. - - Returns: - torch.optim.Optimizer: The configured optimizer. - """ + return outputs - optimizer = torch.optim.Adam(self.parameters(), lr=self.config['lr']) + def configure_optimizers(self): + optimizer = torch.optim.Adam(self.parameters(), lr=self.config["lr"]) return optimizer - + def compute_loss(self, var, y, y_hat): - if self.dataset.variable_types[var] == 'numerical': + if self.dataset.variable_types[var] == "numerical": # Ignore instances with missing labels for numerical variables valid_indices = ~torch.isnan(y) if valid_indices.sum() > 0: # only calculate loss if there are valid targets @@ -95,7 +56,7 @@ def compute_loss(self, var, y, y_hat): y = y[valid_indices] loss = F.mse_loss(torch.flatten(y_hat), y.float()) else: - loss = 0 # if no valid labels, set loss to 0 + loss = 0 # if no valid labels, set loss to 0 else: # Ignore instances with missing labels for categorical variables # Assuming that missing values were encoded as -1 @@ -104,21 +65,12 @@ def compute_loss(self, var, y, y_hat): y_hat = y_hat[valid_indices] y = y[valid_indices] loss = F.cross_entropy(y_hat, y.long()) - else: + else: loss = 0 return loss def training_step(self, train_batch, batch_idx): - """ - Perform a single training step. - Args: - train_batch (tuple): A tuple containing the input data and labels for the current batch. - batch_idx (int): The index of the current batch. - Returns: - torch.Tensor: The total loss for the current training step. - """ - - dat, y_dict = train_batch + dat, y_dict = train_batch layers = dat.keys() x_list = [dat[x] for x in layers] outputs = self.forward(x_list) @@ -129,23 +81,12 @@ def training_step(self, train_batch, batch_idx): loss = self.compute_loss(var, y, y_hat) losses[var] = loss total_loss = sum(losses.values()) - losses['train_loss'] = total_loss + losses["train_loss"] = total_loss self.log_dict(losses, on_step=False, on_epoch=True, prog_bar=True) return total_loss - def validation_step(self, val_batch, batch_idx): - """ - Perform a single validation step. - - Args: - val_batch (tuple): A tuple containing the input data and labels for the current batch. - batch_idx (int): The index of the current batch. - - Returns: - torch.Tensor: The total loss for the current validation step. - """ - dat, y_dict = val_batch + dat, y_dict = val_batch layers = dat.keys() x_list = [dat[x] for x in layers] outputs = self.forward(x_list) @@ -156,34 +97,32 @@ def validation_step(self, val_batch, batch_idx): loss = self.compute_loss(var, y, y_hat) losses[var] = loss total_loss = sum(losses.values()) - losses['val_loss'] = total_loss + losses["val_loss"] = total_loss self.log_dict(losses, on_step=False, on_epoch=True, prog_bar=True) return total_loss - def prepare_data(self): - lt = int(len(self.dataset)*(1-self.val_size)) - lv = len(self.dataset)-lt - dat_train, dat_val = random_split(self.dataset, [lt, lv], - generator=torch.Generator().manual_seed(42)) + lt = int(len(self.dataset) * (1 - self.val_size)) + lv = len(self.dataset) - lt + dat_train, dat_val = random_split(self.dataset, [lt, lv], generator=torch.Generator().manual_seed(42)) return dat_train, dat_val - + def train_dataloader(self): - return DataLoader(self.dat_train, batch_size=int(self.config['batch_size']), num_workers=0, pin_memory=True, shuffle=True, drop_last=True) + return DataLoader( + self.dat_train, + batch_size=int(self.config["batch_size"]), + num_workers=0, + pin_memory=True, + shuffle=True, + drop_last=True, + ) def val_dataloader(self): - return DataLoader(self.dat_val, batch_size=int(self.config['batch_size']), num_workers=0, pin_memory=True, shuffle=False) - - def predict(self, dataset): - """ - Evaluate the DirectPred model on a given dataset. - - Args: - dataset: The dataset to evaluate the model on. + return DataLoader( + self.dat_val, batch_size=int(self.config["batch_size"]), num_workers=0, pin_memory=True, shuffle=False + ) - Returns: - A dictionary where each key is a target variable and the corresponding value is the predicted output for that variable. - """ + def predict(self, dataset): self.eval() layers = dataset.dat.keys() x_list = [dataset.dat[x] for x in layers] @@ -192,24 +131,13 @@ def predict(self, dataset): predictions = {} for var in self.target_variables: y_pred = outputs[var].detach().numpy() - if self.dataset.variable_types[var] == 'categorical': + if self.dataset.variable_types[var] == "categorical": predictions[var] = np.argmax(y_pred, axis=1) else: predictions[var] = y_pred return predictions def transform(self, dataset): - """ - Transform the input data into a lower-dimensional space using the trained encoders. - - Args: - dataset: The input dataset containing the omics data. - - Returns: - pd.DataFrame: A dataframe of embeddings where the row indices are - dataset.samples and the column names are created by appending - the substring "E" to each dimension index. - """ self.eval() embeddings_list = [] # Process each input matrix with its corresponding Encoder @@ -218,36 +146,29 @@ def transform(self, dataset): embeddings_concat = torch.cat(embeddings_list, dim=1) # Converting tensor to numpy array and then to DataFrame - embeddings_df = pd.DataFrame(embeddings_concat.detach().numpy(), - index=dataset.samples, - columns=[f"E{dim}" for dim in range(embeddings_concat.shape[1])]) + embeddings_df = pd.DataFrame( + embeddings_concat.detach().numpy(), + index=dataset.samples, + columns=[f"E{dim}" for dim in range(embeddings_concat.shape[1])], + ) return embeddings_df - - # Adaptor forward function for captum integrated gradients. + + # Adaptor forward function for captum integrated gradients. def forward_target(self, *args): input_data = list(args[:-2]) # one or more tensors (one per omics layer) target_var = args[-2] # target variable of interest - steps = args[-1] # number of steps for IntegratedGradients().attribute + steps = args[-1] # number of steps for IntegratedGradients().attribute outputs_list = [] for i in range(steps): # get list of tensors for each step into a list of tensors x_step = [input_data[j][i] for j in range(len(input_data))] out = self.forward(x_step) outputs_list.append(out[target_var]) - return torch.cat(outputs_list, dim = 0) - - def compute_feature_importance(self, target_var, steps = 5): - """ - Compute the feature importance. + return torch.cat(outputs_list, dim=0) - Args: - input_data (torch.Tensor): The input data to compute the feature importance for. - target_var (str): The target variable to compute the feature importance for. - Returns: - attributions (list of torch.Tensor): The feature importances for each class. - """ + def compute_feature_importance(self, target_var, steps=5): x_list = [self.dataset.dat[x] for x in self.dataset.dat.keys()] - + # Initialize the Integrated Gradients method ig = IntegratedGradients(self.forward_target) @@ -257,7 +178,7 @@ def compute_feature_importance(self, target_var, steps = 5): baseline = tuple([torch.zeros_like(data) for data in input_data]) # Get the number of classes for the target variable - if self.dataset.variable_types[target_var] == 'numerical': + if self.dataset.variable_types[target_var] == "numerical": num_class = 1 else: num_class = len(np.unique(self.dataset.ann[target_var])) @@ -266,27 +187,45 @@ def compute_feature_importance(self, target_var, steps = 5): attributions = [] if num_class > 1: for target_class in range(num_class): - attributions.append(ig.attribute(input_data, baseline, additional_forward_args=(target_var, steps), target=target_class, n_steps=steps)) + attributions.append( + ig.attribute( + input_data, + baseline, + additional_forward_args=(target_var, steps), + target=target_class, + n_steps=steps, + ) + ) else: - attributions.append(ig.attribute(input_data, baseline, additional_forward_args=(target_var, steps), n_steps=steps)) + attributions.append( + ig.attribute(input_data, baseline, additional_forward_args=(target_var, steps), n_steps=steps) + ) # summarize feature importances # Compute absolute attributions abs_attr = [[torch.abs(a) for a in attr_class] for attr_class in attributions] - # average over samples + # average over samples imp = [[a.mean(dim=1) for a in attr_class] for attr_class in abs_attr] - # combine into a single data frame + # combine into a single data frame df_list = [] layers = list(self.dataset.dat.keys()) for i in range(num_class): for j in range(len(layers)): features = self.dataset.features[layers[j]] importances = imp[i][j][0].detach().numpy() - df_list.append(pd.DataFrame({'target_variable': target_var, 'target_class': i, 'layer': layers[j], 'name': features, 'importance': importances})) - df_imp = pd.concat(df_list, ignore_index = True) - + df_list.append( + pd.DataFrame( + { + "target_variable": target_var, + "target_class": i, + "layer": layers[j], + "name": features, + "importance": importances, + } + ) + ) + df_imp = pd.concat(df_list, ignore_index=True) + # save the computed scores in the model self.feature_importances[target_var] = df_imp - - diff --git a/flexynesis/models/direct_pred.py b/flexynesis/models/direct_pred.py index 79ad19b..431c58b 100644 --- a/flexynesis/models/direct_pred.py +++ b/flexynesis/models/direct_pred.py @@ -1 +1,292 @@ -from ..model_DirectPred import DirectPred +import torch +from torch import nn +from torch.nn import functional as F +import pytorch_lightning as pl +from torch.utils.data import Dataset, DataLoader, random_split + +import pandas as pd +import numpy as np +import os, argparse +from scipy import stats +from functools import reduce + +from captum.attr import IntegratedGradients + +from ..modules import * + + + +class DirectPred(pl.LightningModule): + def __init__(self, config, dataset, target_variables, batch_variables = None, val_size = 0.2): + super(DirectPred, self).__init__() + self.config = config + self.dataset = dataset + self.target_variables = target_variables + self.batch_variables = batch_variables + self.variables = target_variables + batch_variables if batch_variables else target_variables + self.val_size = val_size + self.dat_train, self.dat_val = self.prepare_data() + self.feature_importances = {} + # Instantiate layers. + self._init_encoders() + self._init_output_layers() + + def _init_encoders(self): + layers = list(self.dataset.dat.keys()) + input_dims = [len(self.dataset.features[layers[i]]) for i in range(len(layers))] + self.encoders = nn.ModuleList([ + MLP(input_dim=input_dims[i], hidden_dim=self.config["hidden_dim"], output_dim=self.config["latent_dim"]) + for i in range(len(layers)) + ]) + + def _init_output_layers(self): + layers = list(self.dataset.dat.keys()) + self.MLPs = nn.ModuleDict() # using ModuleDict to store multiple MLPs + for var in self.target_variables: + if self.dataset.variable_types[var] == "numerical": + num_class = 1 + else: + num_class = len(np.unique(self.dataset.ann[var])) + self.MLPs[var] = MLP( + input_dim=self.config["latent_dim"] * len(layers), + hidden_dim=self.config["hidden_dim"], + output_dim=num_class, + ) + + def forward(self, x_list): + """ + Forward pass of the DirectPred model. + + Args: + x_list (list of torch.Tensor): A list of input matrices (omics layers), one for each layer. + + Returns: + dict: A dictionary where each key-value pair corresponds to the target variable name and its predicted output respectively. + """ + embeddings_list = [] + # Process each input matrix with its corresponding Encoder + for i, x in enumerate(x_list): + embeddings_list.append(self.encoders[i](x)) + embeddings_concat = torch.cat(embeddings_list, dim=1) + + outputs = {} + for var, mlp in self.MLPs.items(): + outputs[var] = mlp(embeddings_concat) + return outputs + + + def configure_optimizers(self): + """ + Configure the optimizer for the DirectPred model. + + Returns: + torch.optim.Optimizer: The configured optimizer. + """ + + optimizer = torch.optim.Adam(self.parameters(), lr=self.config['lr']) + return optimizer + + def compute_loss(self, var, y, y_hat): + if self.dataset.variable_types[var] == 'numerical': + # Ignore instances with missing labels for numerical variables + valid_indices = ~torch.isnan(y) + if valid_indices.sum() > 0: # only calculate loss if there are valid targets + y_hat = y_hat[valid_indices] + y = y[valid_indices] + loss = F.mse_loss(torch.flatten(y_hat), y.float()) + else: + loss = 0 # if no valid labels, set loss to 0 + else: + # Ignore instances with missing labels for categorical variables + # Assuming that missing values were encoded as -1 + valid_indices = (y != -1) & (~torch.isnan(y)) + if valid_indices.sum() > 0: # only calculate loss if there are valid targets + y_hat = y_hat[valid_indices] + y = y[valid_indices] + loss = F.cross_entropy(y_hat, y.long()) + else: + loss = 0 + return loss + + def training_step(self, train_batch, batch_idx): + """ + Perform a single training step. + Args: + train_batch (tuple): A tuple containing the input data and labels for the current batch. + batch_idx (int): The index of the current batch. + Returns: + torch.Tensor: The total loss for the current training step. + """ + + dat, y_dict = train_batch + layers = dat.keys() + x_list = [dat[x] for x in layers] + outputs = self.forward(x_list) + losses = {} + for var in self.target_variables: + y_hat = outputs[var] + y = y_dict[var] + loss = self.compute_loss(var, y, y_hat) + losses[var] = loss + total_loss = sum(losses.values()) + losses['train_loss'] = total_loss + self.log_dict(losses, on_step=False, on_epoch=True, prog_bar=True) + return total_loss + + + def validation_step(self, val_batch, batch_idx): + """ + Perform a single validation step. + + Args: + val_batch (tuple): A tuple containing the input data and labels for the current batch. + batch_idx (int): The index of the current batch. + + Returns: + torch.Tensor: The total loss for the current validation step. + """ + dat, y_dict = val_batch + layers = dat.keys() + x_list = [dat[x] for x in layers] + outputs = self.forward(x_list) + losses = {} + for var in self.target_variables: + y_hat = outputs[var] + y = y_dict[var] + loss = self.compute_loss(var, y, y_hat) + losses[var] = loss + total_loss = sum(losses.values()) + losses['val_loss'] = total_loss + self.log_dict(losses, on_step=False, on_epoch=True, prog_bar=True) + return total_loss + + + def prepare_data(self): + lt = int(len(self.dataset)*(1-self.val_size)) + lv = len(self.dataset)-lt + dat_train, dat_val = random_split(self.dataset, [lt, lv], + generator=torch.Generator().manual_seed(42)) + return dat_train, dat_val + + def train_dataloader(self): + return DataLoader(self.dat_train, batch_size=int(self.config['batch_size']), num_workers=0, pin_memory=True, shuffle=True, drop_last=True) + + def val_dataloader(self): + return DataLoader(self.dat_val, batch_size=int(self.config['batch_size']), num_workers=0, pin_memory=True, shuffle=False) + + def predict(self, dataset): + """ + Evaluate the DirectPred model on a given dataset. + + Args: + dataset: The dataset to evaluate the model on. + + Returns: + A dictionary where each key is a target variable and the corresponding value is the predicted output for that variable. + """ + self.eval() + layers = dataset.dat.keys() + x_list = [dataset.dat[x] for x in layers] + outputs = self.forward(x_list) + + predictions = {} + for var in self.target_variables: + y_pred = outputs[var].detach().numpy() + if self.dataset.variable_types[var] == 'categorical': + predictions[var] = np.argmax(y_pred, axis=1) + else: + predictions[var] = y_pred + return predictions + + def transform(self, dataset): + """ + Transform the input data into a lower-dimensional space using the trained encoders. + + Args: + dataset: The input dataset containing the omics data. + + Returns: + pd.DataFrame: A dataframe of embeddings where the row indices are + dataset.samples and the column names are created by appending + the substring "E" to each dimension index. + """ + self.eval() + embeddings_list = [] + # Process each input matrix with its corresponding Encoder + for i, x in enumerate(dataset.dat.values()): + embeddings_list.append(self.encoders[i](x)) + embeddings_concat = torch.cat(embeddings_list, dim=1) + + # Converting tensor to numpy array and then to DataFrame + embeddings_df = pd.DataFrame(embeddings_concat.detach().numpy(), + index=dataset.samples, + columns=[f"E{dim}" for dim in range(embeddings_concat.shape[1])]) + return embeddings_df + + # Adaptor forward function for captum integrated gradients. + def forward_target(self, *args): + input_data = list(args[:-2]) # one or more tensors (one per omics layer) + target_var = args[-2] # target variable of interest + steps = args[-1] # number of steps for IntegratedGradients().attribute + outputs_list = [] + for i in range(steps): + # get list of tensors for each step into a list of tensors + x_step = [input_data[j][i] for j in range(len(input_data))] + out = self.forward(x_step) + outputs_list.append(out[target_var]) + return torch.cat(outputs_list, dim = 0) + + def compute_feature_importance(self, target_var, steps = 5): + """ + Compute the feature importance. + + Args: + input_data (torch.Tensor): The input data to compute the feature importance for. + target_var (str): The target variable to compute the feature importance for. + Returns: + attributions (list of torch.Tensor): The feature importances for each class. + """ + x_list = [self.dataset.dat[x] for x in self.dataset.dat.keys()] + + # Initialize the Integrated Gradients method + ig = IntegratedGradients(self.forward_target) + + input_data = tuple([data.unsqueeze(0).requires_grad_() for data in x_list]) + + # Define a baseline (you might need to adjust this depending on your actual data) + baseline = tuple([torch.zeros_like(data) for data in input_data]) + + # Get the number of classes for the target variable + if self.dataset.variable_types[target_var] == 'numerical': + num_class = 1 + else: + num_class = len(np.unique(self.dataset.ann[target_var])) + + # Compute the feature importance for each class + attributions = [] + if num_class > 1: + for target_class in range(num_class): + attributions.append(ig.attribute(input_data, baseline, additional_forward_args=(target_var, steps), target=target_class, n_steps=steps)) + else: + attributions.append(ig.attribute(input_data, baseline, additional_forward_args=(target_var, steps), n_steps=steps)) + + # summarize feature importances + # Compute absolute attributions + abs_attr = [[torch.abs(a) for a in attr_class] for attr_class in attributions] + # average over samples + imp = [[a.mean(dim=1) for a in attr_class] for attr_class in abs_attr] + + # combine into a single data frame + df_list = [] + layers = list(self.dataset.dat.keys()) + for i in range(num_class): + for j in range(len(layers)): + features = self.dataset.features[layers[j]] + importances = imp[i][j][0].detach().numpy() + df_list.append(pd.DataFrame({'target_variable': target_var, 'target_class': i, 'layer': layers[j], 'name': features, 'importance': importances})) + df_imp = pd.concat(df_list, ignore_index = True) + + # save the computed scores in the model + self.feature_importances[target_var] = df_imp + + diff --git a/flexynesis/models/direct_pred_cnn.py b/flexynesis/models/direct_pred_cnn.py index 6aeb9f5..8ae9846 100644 --- a/flexynesis/models/direct_pred_cnn.py +++ b/flexynesis/models/direct_pred_cnn.py @@ -1,18 +1,20 @@ import numpy as np from torch import nn -from .direct_pred import DirectPred from ..modules import CNN +from .base_direct_pred import BaseDirectPred -class DirectPredCNN(DirectPred): +class DirectPredCNN(BaseDirectPred): def _init_encoders(self): layers = list(self.dataset.dat.keys()) input_dims = [len(self.dataset.features[layers[i]]) for i in range(len(layers))] - self.encoders = nn.ModuleList([ - CNN(input_dim=input_dims[i], hidden_dim=self.config["hidden_dim"], output_dim=self.config["latent_dim"]) - for i in range(len(layers)) - ]) + self.encoders = nn.ModuleList( + [ + CNN(input_dim=input_dims[i], hidden_dim=self.config["hidden_dim"], output_dim=self.config["latent_dim"]) + for i in range(len(layers)) + ] + ) def _init_output_layers(self): layers = list(self.dataset.dat.keys()) diff --git a/flexynesis/models/supervised_vae.py b/flexynesis/models/supervised_vae.py index dd93f28..e7f9441 100644 --- a/flexynesis/models/supervised_vae.py +++ b/flexynesis/models/supervised_vae.py @@ -1 +1,410 @@ -from ..model_SVAE import supervised_vae as SupervisedVAE +# Supervised VAE-MMD architecture +import torch +from torch import nn +from torch.nn import functional as F +from torch.utils.data import Dataset, DataLoader, random_split + +import pandas as pd +import numpy as np + +import pytorch_lightning as pl +from scipy import stats + +from captum.attr import IntegratedGradients + +from ..modules import * + +# Supervised Variational Auto-encoder that can train one or more layers of omics datasets +# num_layers: number of omics layers in the input +# each layer is encoded separately, encodings are concatenated, and decoded separately +# depends on MLP, Encoder, Decoder classes in models_shared +class supervised_vae(pl.LightningModule): + """ + Supervised Variational Auto-encoder for multi-omics data fusion and prediction. + + This class implements a deep learning model for fusing and predicting from multiple omics layers/matrices. + Each omics layer is encoded separately using an Encoder. The resulting latent representations are then + concatenated and passed through a fully connected network (fusion layer) to make predictions. The model + also includes a supervisor head for supervised learning. + + Args: + num_layers (int): Number of omics layers/matrices. + input_dims (list of int): A list of input dimensions for each omics layer. + hidden_dims (list of int): A list of hidden dimensions for the Encoder and Decoder. + latent_dim (int): The dimension of the latent space for each encoder. + num_class (int): Number of output classes for the prediction task. + **kwargs: Additional keyword arguments to be passed to the MLP encoders. + + Example: + + # Instantiate a supervised_vae model with 2 omics layers and input dimensions of 100 and 200 + model = supervised_vae(num_layers=2, input_dims=[100, 200], hidden_dims=[64, 32], latent_dim=16, num_class=1) + + """ + def __init__(self, config, dataset, target_variables, batch_variables = None, val_size = 0.2): + super(supervised_vae, self).__init__() + self.config = config + self.dataset = dataset + self.target_variables = target_variables + self.batch_variables = batch_variables + self.variables = target_variables + batch_variables if batch_variables else target_variables + self.val_size = val_size + + self.dat_train, self.dat_val = self.prepare_data() + self.feature_importances = {} + + # sometimes the model may have exploding/vanishing gradients leading to NaN values + self.nan_detected = False + + layers = list(dataset.dat.keys()) + input_dims = [len(dataset.features[layers[i]]) for i in range(len(layers))] + # create a list of Encoder instances for separately encoding each omics layer + self.encoders = nn.ModuleList([Encoder(input_dims[i], [config['hidden_dim']], config['latent_dim']) for i in range(len(layers))]) + # Fully connected layers for concatenated means and log_vars + self.FC_mean = nn.Linear(len(layers) * config['latent_dim'], config['latent_dim']) + self.FC_log_var = nn.Linear(len(layers) * config['latent_dim'], config['latent_dim']) + # list of decoders to decode each omics layer separately + self.decoders = nn.ModuleList([Decoder(config['latent_dim'], [config['hidden_dim']], input_dims[i]) for i in range(len(layers))]) + + # define supervisor heads + # using ModuleDict to store multiple MLPs + self.MLPs = nn.ModuleDict() + for var in self.target_variables: + if self.dataset.variable_types[var] == 'numerical': + num_class = 1 + else: + num_class = len(np.unique(self.dataset.ann[var])) + self.MLPs[var] = MLP(input_dim = config['latent_dim'], + hidden_dim = config['supervisor_hidden_dim'], + output_dim = num_class) + + def multi_encoder(self, x_list): + """ + Encode each input matrix separately using the corresponding Encoder. + + Args: + x_list (list of torch.Tensor): List of input matrices for each omics layer. + + Returns: + tuple: Tuple containing: + - mean (torch.Tensor): Concatenated mean values from each encoder. + - log_var (torch.Tensor): Concatenated log variance values from each encoder. + """ + means, log_vars = [], [] + # Process each input matrix with its corresponding Encoder + for i, x in enumerate(x_list): + mean, log_var = self.encoders[i](x) + means.append(mean) + log_vars.append(log_var) + + # Concatenate means and log_vars + # Push concatenated means and log_vars through the fully connected layers + mean = self.FC_mean(torch.cat(means, dim=1)) + log_var = self.FC_log_var(torch.cat(log_vars, dim=1)) + return mean, log_var + + def forward(self, x_list): + """ + Forward pass through the model. + + Args: + x_list (list of torch.Tensor): List of input matrices for each omics layer. + + Returns: + tuple: Tuple containing: + - x_hat_list (list of torch.Tensor): List of reconstructed matrices for each omics layer. + - z (torch.Tensor): Latent representation. + - mean (torch.Tensor): Concatenated mean values from each encoder. + - log_var (torch.Tensor): Concatenated log variance values from each encoder. + - y_pred (torch.Tensor): Predicted output. + """ + mean, log_var = self.multi_encoder(x_list) + + # generate latent layer + z = self.reparameterization(mean, log_var) + + # Decode each latent variable with its corresponding Decoder + x_hat_list = [self.decoders[i](z) for i in range(len(x_list))] + + #run the supervisor heads using the latent layer as input + outputs = {} + for var, mlp in self.MLPs.items(): + outputs[var] = mlp(z) + + return x_hat_list, z, mean, log_var, outputs + + def reparameterization(self, mean, var): + """ + Reparameterize the mean and variance values. + + Args: + mean (torch.Tensor): Mean values from the encoders. + var (torch.Tensor): Variance values from the encoders. + + Returns: + torch.Tensor: Latent representation. + """ + epsilon = torch.randn_like(var) + z = mean + var*epsilon + return z + + def configure_optimizers(self): + """ + Configure the optimizer for the model. + + Returns: + torch.optim.Adam: Adam optimizer with learning rate 1e-3. + """ + optimizer = torch.optim.Adam(self.parameters(), lr=self.config['lr']) + return optimizer + + def compute_loss(self, var, y, y_hat): + if self.dataset.variable_types[var] == 'numerical': + # Ignore instances with missing labels for numerical variables + valid_indices = ~torch.isnan(y) + if valid_indices.sum() > 0: # only calculate loss if there are valid targets + y_hat = y_hat[valid_indices] + y = y[valid_indices] + loss = F.mse_loss(torch.flatten(y_hat), y.float()) + else: + loss = 0 # if no valid labels, set loss to 0 + else: + # Ignore instances with missing labels for categorical variables + # Assuming that missing values were encoded as -1 + valid_indices = (y != -1) & (~torch.isnan(y)) + if valid_indices.sum() > 0: # only calculate loss if there are valid targets + y_hat = y_hat[valid_indices] + y = y[valid_indices] + loss = F.cross_entropy(y_hat, y.long()) + else: + loss = 0 + return loss + + def training_step(self, train_batch, batch_idx): + dat, y_dict = train_batch + layers = dat.keys() + x_list = [dat[x] for x in layers] + + x_hat_list, z, mean, log_var, outputs = self.forward(x_list) + + # compute mmd loss for each layer and take average + mmd_loss_list = [self.MMD_loss(z.shape[1], z, x_hat_list[i], x_list[i]) for i in range(len(layers))] + mmd_loss = torch.mean(torch.stack(mmd_loss_list)) + + # compute loss values for the supervisor heads + losses = {'mmd_loss': mmd_loss} + + for var in self.target_variables: + y_hat = outputs[var] + y = y_dict[var] + loss = self.compute_loss(var, y, y_hat) + losses[var] = loss + total_loss = sum(losses.values()) + losses['train_loss'] = total_loss + self.log_dict(losses, on_step=False, on_epoch=True, prog_bar=True) + return total_loss + + def validation_step(self, val_batch, batch_idx): + dat, y_dict = val_batch + layers = dat.keys() + x_list = [dat[x] for x in layers] + + x_hat_list, z, mean, log_var, outputs = self.forward(x_list) + + # compute mmd loss for each layer and take average + mmd_loss_list = [self.MMD_loss(z.shape[1], z, x_hat_list[i], x_list[i]) for i in range(len(layers))] + mmd_loss = torch.mean(torch.stack(mmd_loss_list)) + + # compute loss values for the supervisor heads + losses = {'mmd_loss': mmd_loss} + for var in self.target_variables: + y_hat = outputs[var] + y = y_dict[var] + loss = self.compute_loss(var, y, y_hat) + losses[var] = loss + + total_loss = sum(losses.values()) + losses['val_loss'] = total_loss + self.log_dict(losses, on_step=False, on_epoch=True, prog_bar=True) + return total_loss + + def prepare_data(self): + lt = int(len(self.dataset)*(1-self.val_size)) + lv = len(self.dataset)-lt + dat_train, dat_val = random_split(self.dataset, [lt, lv], + generator=torch.Generator().manual_seed(42)) + return dat_train, dat_val + + def train_dataloader(self): + return DataLoader(self.dat_train, batch_size=int(self.config['batch_size']), num_workers=0, pin_memory=True, shuffle=True, drop_last=True) + + def val_dataloader(self): + return DataLoader(self.dat_val, batch_size=int(self.config['batch_size']), num_workers=0, pin_memory=True, shuffle=False) + + def transform(self, dataset): + """ + Transform the input dataset to latent representation. + + Args: + dataset (MultiOmicDataset): MultiOmicDataset containing input matrices for each omics layer. + + Returns: + pd.DataFrame: Transformed dataset as a pandas DataFrame. + """ + self.eval() + layers = list(dataset.dat.keys()) + x_list = [dataset.dat[x] for x in layers] + M = self.forward(x_list)[1].detach().numpy() + z = pd.DataFrame(M) + z.columns = [''.join(['E', str(x)]) for x in z.columns] + z.index = dataset.samples + return z + + def predict(self, dataset): + """ + Evaluate the model on a dataset. + + Args: + dataset (CustomDataset): Custom dataset containing input matrices for each omics layer. + + Returns: + predicted values. + """ + self.eval() + layers = list(dataset.dat.keys()) + x_list = [dataset.dat[x] for x in layers] + X_hat, z, mean, log_var, outputs = self.forward(x_list) + + predictions = {} + for var in self.target_variables: + y_pred = outputs[var].detach().numpy() + if self.dataset.variable_types[var] == 'categorical': + predictions[var] = np.argmax(y_pred, axis=1) + else: + predictions[var] = y_pred + + return predictions + + def compute_kernel(self, x, y): + """ + Compute the Gaussian kernel matrix between two sets of vectors. + + Args: + x (torch.Tensor): A tensor of shape (x_size, dim) representing the first set of vectors. + y (torch.Tensor): A tensor of shape (y_size, dim) representing the second set of vectors. + + Returns: + torch.Tensor: The Gaussian kernel matrix of shape (x_size, y_size) computed between x and y. + """ + x_size = x.size(0) + y_size = y.size(0) + dim = x.size(1) + x = x.unsqueeze(1) # (x_size, 1, dim) + y = y.unsqueeze(0) # (1, y_size, dim) + tiled_x = x.expand(x_size, y_size, dim) + tiled_y = y.expand(x_size, y_size, dim) + kernel_input = (tiled_x - tiled_y).pow(2).mean(2)/float(dim) + return torch.exp(-kernel_input) # (x_size, y_size) + + def compute_mmd(self, x, y): + """ + Compute the maximum mean discrepancy (MMD) between two sets of vectors. + + Args: + x (torch.Tensor): A tensor of shape (x_size, dim) representing the first set of vectors. + y (torch.Tensor): A tensor of shape (y_size, dim) representing the second set of vectors. + + Returns: + torch.Tensor: A scalar tensor representing the MMD between x and y. + """ + x_kernel = self.compute_kernel(x, x) + y_kernel = self.compute_kernel(y, y) + xy_kernel = self.compute_kernel(x, y) + mmd = x_kernel.mean() + y_kernel.mean() - 2*xy_kernel.mean() + return mmd + + def MMD_loss(self, latent_dim, z, xhat, x): + """ + Compute the loss function based on maximum mean discrepancy (MMD) and negative log likelihood (NLL). + + Args: + latent_dim (int): The dimensionality of the latent space. + z (torch.Tensor): A tensor of shape (batch_size, latent_dim) representing the latent codes. + xhat (torch.Tensor): A tensor of shape (batch_size, dim) representing the reconstructed data. + x (torch.Tensor): A tensor of shape (batch_size, dim) representing the original data. + + Returns: + torch.Tensor: A scalar tensor representing the MMD loss. + """ + true_samples = torch.randn(200, latent_dim) + mmd = self.compute_mmd(true_samples, z) # compute maximum mean discrepancy (MMD) + nll = (xhat - x).pow(2).mean() #negative log likelihood + return mmd+nll + + # Adaptor forward function for captum integrated gradients. + def forward_target(self, *args): + input_data = list(args[:-2]) # one or more tensors (one per omics layer) + target_var = args[-2] # target variable of interest + steps = args[-1] # number of steps for IntegratedGradients().attribute + outputs_list = [] + for i in range(steps): + # get list of tensors for each step into a list of tensors + x_step = [input_data[j][i] for j in range(len(input_data))] + x_hat_list, z, mean, log_var, outputs = self.forward(x_step) + outputs_list.append(outputs[target_var]) + return torch.cat(outputs_list, dim = 0) + + def compute_feature_importance(self, target_var, steps = 5): + """ + Compute the feature importance. + + Args: + input_data (torch.Tensor): The input data to compute the feature importance for. + target_var (str): The target variable to compute the feature importance for. + Returns: + attributions (list of torch.Tensor): The feature importances for each class. + """ + x_list = [self.dataset.dat[x] for x in self.dataset.dat.keys()] + + # Initialize the Integrated Gradients method + ig = IntegratedGradients(self.forward_target) + + input_data = tuple([data.unsqueeze(0).requires_grad_() for data in x_list]) + + # Define a baseline (you might need to adjust this depending on your actual data) + baseline = tuple([torch.zeros_like(data) for data in input_data]) + + # Get the number of classes for the target variable + if self.dataset.variable_types[target_var] == 'numerical': + num_class = 1 + else: + num_class = len(np.unique(self.dataset.ann[target_var])) + + # Compute the feature importance for each class + attributions = [] + if num_class > 1: + for target_class in range(num_class): + attributions.append(ig.attribute(input_data, baseline, additional_forward_args=(target_var, steps), target=target_class, n_steps=steps)) + else: + attributions.append(ig.attribute(input_data, baseline, additional_forward_args=(target_var, steps), n_steps=steps)) + + # summarize feature importances + # Compute absolute attributions + abs_attr = [[torch.abs(a) for a in attr_class] for attr_class in attributions] + # average over samples + imp = [[a.mean(dim=1) for a in attr_class] for attr_class in abs_attr] + + # combine into a single data frame + df_list = [] + layers = list(self.dataset.dat.keys()) + for i in range(num_class): + for j in range(len(layers)): + features = self.dataset.features[layers[j]] + importances = imp[i][j][0].detach().numpy() + df_list.append(pd.DataFrame({'target_variable': target_var, 'target_class': i, 'layer': layers[j], 'name': features, 'importance': importances})) + df_imp = pd.concat(df_list, ignore_index = True) + + # save scores in model + self.feature_importances[target_var] = df_imp + + diff --git a/flexynesis/models/triplet_encoder.py b/flexynesis/models/triplet_encoder.py index af5dc8e..f342ef9 100644 --- a/flexynesis/models/triplet_encoder.py +++ b/flexynesis/models/triplet_encoder.py @@ -1 +1,368 @@ -from ..model_TripletEncoder import MultiTripletNetwork +# Generating encodings of multi-omic data using triplet loss +import torch +from torch import nn +from torch.nn import functional as F +from torch.utils.data import Dataset, DataLoader, random_split + +import pandas as pd +import numpy as np + +import pytorch_lightning as pl + +from ..modules import * +from ..data import TripletMultiOmicDataset + +from captum.attr import IntegratedGradients + + +class MultiEmbeddingNetwork(nn.Module): + """ + A neural network module that computes multiple embeddings and fuses them into a single representation. + + Attributes: + embedding_networks (nn.ModuleList): A list of EmbeddingNetwork instances for each input feature. + """ + def __init__(self, input_sizes, hidden_sizes, output_size): + """ + Initialize the MultiEmbeddingNetwork with the given input sizes, hidden sizes, and output size. + + Args: + input_sizes (list of int): A list of input sizes for each EmbeddingNetwork instance. + hidden_sizes (list of int): A list of hidden sizes for each EmbeddingNetwork instance. + output_size (int): The size of the fused embedding output. + """ + super(MultiEmbeddingNetwork, self).__init__() + self.embedding_networks = nn.ModuleList([ + EmbeddingNetwork(input_size, hidden_size, output_size) + for input_size, hidden_size in zip(input_sizes, hidden_sizes) + ]) + + def forward(self, x): + """ + Compute the forward pass of the MultiEmbeddingNetwork and return the fused embedding. + + Args: + x (dict): A dictionary containing the input tensors for each EmbeddingNetwork. + Keys should correspond to the feature names and values to the input tensors. + + Returns: + torch.Tensor: The fused embedding tensor resulting from the concatenation of individual embeddings. + """ + embeddings = [ + embedding_network(x[key]) + for key, embedding_network in zip(x.keys(), self.embedding_networks) + ] + fused_embedding = torch.cat((embeddings), dim=-1) + return fused_embedding + +class MultiTripletNetwork(pl.LightningModule): + """ + """ + def __init__(self, config, dataset, target_variables, batch_variables = None, val_size = 0.2): + """ + Initialize the MultiTripletNetwork with the given parameters. + + Args: + TODO + """ + super(MultiTripletNetwork, self).__init__() + + self.config = config + self.target_variables = target_variables + self.batch_variables = batch_variables + self.variables = target_variables + batch_variables if batch_variables else target_variables + self.val_size = val_size + self.dataset = dataset + self.ann = self.dataset.ann + self.variable_types = self.dataset.variable_types + self.feature_importances = {} + + layers = list(dataset.dat.keys()) + input_sizes = [len(dataset.features[layers[i]]) for i in range(len(layers))] + hidden_sizes = [config['hidden_dim'] for x in range(len(layers))] + + + # The first target variable is the main variable that dictates the triplets + # it has to be a categorical variable + main_var = self.target_variables[0] + if self.dataset.variable_types[main_var] == 'numerical': + raise ValueError("The first target variable",main_var," must be a categorical variable") + + # create train/validation splits and convert TripletMultiOmicDataset format + self.dataset = TripletMultiOmicDataset(self.dataset, main_var) + self.dat_train, self.dat_val = self.prepare_data() + + # define embedding network for data matrices + self.multi_embedding_network = MultiEmbeddingNetwork(input_sizes, hidden_sizes, config['latent_dim']) + + # define supervisor heads for both target and batch variables + self.MLPs = nn.ModuleDict() # using ModuleDict to store multiple MLPs + for var in self.target_variables: + if self.variable_types[var] == 'numerical': + num_class = 1 + else: + num_class = len(np.unique(self.ann[var])) + self.MLPs[var] = MLP(input_dim=self.config['latent_dim'] * len(layers), + hidden_dim=self.config['supervisor_hidden_dim'], + output_dim=num_class) + + def forward(self, anchor, positive, negative): + """ + Compute the forward pass of the MultiTripletNetwork and return the embeddings and predictions. + + Args: + anchor (dict): A dictionary containing the anchor input tensors for each EmbeddingNetwork. + positive (dict): A dictionary containing the positive input tensors for each EmbeddingNetwork. + negative (dict): A dictionary containing the negative input tensors for each EmbeddingNetwork. + + Returns: + tuple: A tuple containing the anchor, positive, and negative embeddings and the predicted class labels. + """ + # triplet encoding + anchor_embedding = self.multi_embedding_network(anchor) + positive_embedding = self.multi_embedding_network(positive) + negative_embedding = self.multi_embedding_network(negative) + + #run the supervisor heads using the anchor embeddings as input + outputs = {} + for var, mlp in self.MLPs.items(): + outputs[var] = mlp(anchor_embedding) + return anchor_embedding, positive_embedding, negative_embedding, outputs + + def configure_optimizers(self): + """ + Configure the optimizer for the MultiTripletNetwork. + + Returns: + torch.optim.Optimizer: The configured optimizer. + """ + optimizer = torch.optim.Adam(self.parameters(), lr=self.config['lr']) + return optimizer + + def triplet_loss(self, anchor, positive, negative, margin=1.0): + """ + Compute the triplet loss for the given anchor, positive, and negative embeddings. + + Args: + anchor (torch.Tensor): The anchor embedding tensor. + positive (torch.Tensor): The positive embedding tensor. + negative (torch.Tensor): The negative embedding tensor. + margin (float, optional): The margin for the triplet loss. Default is 1.0. + + Returns: + torch.Tensor: The computed triplet loss. + """ + distance_positive = (anchor - positive).pow(2).sum(1) + distance_negative = (anchor - negative).pow(2).sum(1) + losses = torch.relu(distance_positive - distance_negative + margin) + return losses.mean() + + def compute_loss(self, var, y, y_hat): + if self.variable_types[var] == 'numerical': + # Ignore instances with missing labels for numerical variables + valid_indices = ~torch.isnan(y) + if valid_indices.sum() > 0: # only calculate loss if there are valid targets + y_hat = y_hat[valid_indices] + y = y[valid_indices] + + loss = F.mse_loss(torch.flatten(y_hat), y.float()) + else: + loss = 0 # if no valid labels, set loss to 0 + else: + # Ignore instances with missing labels for categorical variables + # Assuming that missing values were encoded as -1 + valid_indices = (y != -1) & (~torch.isnan(y)) + if valid_indices.sum() > 0: # only calculate loss if there are valid targets + y_hat = y_hat[valid_indices] + y = y[valid_indices] + loss = F.cross_entropy(y_hat, y.long()) + else: + loss = 0 + return loss + def training_step(self, train_batch, batch_idx): + anchor, positive, negative, y_dict = train_batch[0], train_batch[1], train_batch[2], train_batch[3] + anchor_embedding, positive_embedding, negative_embedding, outputs = self.forward(anchor, positive, negative) + triplet_loss = self.triplet_loss(anchor_embedding, positive_embedding, negative_embedding) + + # compute loss values for the supervisor heads + losses = {'triplet_loss': triplet_loss} + for var in self.target_variables: + y_hat = outputs[var] + y = y_dict[var] + loss = self.compute_loss(var, y, y_hat) + losses[var] = loss + + total_loss = sum(losses.values()) + losses['train_loss'] = total_loss + self.log_dict(losses, on_step=False, on_epoch=True, prog_bar=True) + return total_loss + + def validation_step(self, val_batch, batch_idx): + anchor, positive, negative, y_dict = val_batch[0], val_batch[1], val_batch[2], val_batch[3] + anchor_embedding, positive_embedding, negative_embedding, outputs = self.forward(anchor, positive, negative) + triplet_loss = self.triplet_loss(anchor_embedding, positive_embedding, negative_embedding) + + # compute loss values for the supervisor heads + losses = {'triplet_loss': triplet_loss} + for var in self.target_variables: + y_hat = outputs[var] + y = y_dict[var] + loss = self.compute_loss(var, y, y_hat) + losses[var] = loss + + total_loss = sum(losses.values()) + losses['val_loss'] = total_loss + self.log_dict(losses, on_step=False, on_epoch=True, prog_bar=True) + return total_loss + + def prepare_data(self): + lt = int(len(self.dataset)*(1-self.val_size)) + lv = len(self.dataset)-lt + dat_train, dat_val = random_split(self.dataset, [lt, lv], + generator=torch.Generator().manual_seed(42)) + return dat_train, dat_val + + def train_dataloader(self): + return DataLoader(self.dat_train, batch_size=int(self.config['batch_size']), num_workers=0, pin_memory=True, shuffle=True, drop_last=True) + + def val_dataloader(self): + return DataLoader(self.dat_val, batch_size=int(self.config['batch_size']), num_workers=0, pin_memory=True, shuffle=False) + + # dataset: MultiOmicDataset + def transform(self, dataset): + """ + Transforms the input dataset by generating embeddings and predictions. + + Args: + dataset (MultiOmicDataset): An instance of the MultiOmicDataset class. + + Returns: + z (pd.DataFrame): A dataframe containing the computed embeddings. + y_pred (np.ndarray): A numpy array containing the predicted labels. + """ + self.eval() + # get anchor embeddings + z = pd.DataFrame(self.multi_embedding_network(dataset.dat).detach().numpy()) + z.columns = [''.join(['E', str(x)]) for x in z.columns] + z.index = dataset.samples + return z + + def predict(self, dataset): + """ + Evaluate the model on a given dataset. + + Args: + dataset: The dataset to evaluate the model on. + + Returns: + A dictionary where each key is a target variable and the corresponding value is the predicted output for that variable. + """ + self.eval() + # get anchor embedding + anchor_embedding = self.multi_embedding_network(dataset.dat) + # get MLP outputs for each var + outputs = {} + for var, mlp in self.MLPs.items(): + outputs[var] = mlp(anchor_embedding) + + # get predictions from the mlp outputs for each var + predictions = {} + for var in self.target_variables: + y_pred = outputs[var].detach().numpy() + if self.variable_types[var] == 'categorical': + predictions[var] = np.argmax(y_pred, axis=1) + else: + predictions[var] = y_pred + return predictions + + + # Adaptor forward function for captum integrated gradients. + # layer_sizes: number of features in each omic layer + def forward_target(self, input_data, layer_sizes, target_var, steps): + outputs_list = [] + for i in range(steps): + # for each step, get anchor/positive/negative tensors + # (split the concatenated omics layers) + anchor = input_data[i][0].split(layer_sizes, dim = 1) + positive = input_data[i][1].split(layer_sizes, dim = 1) + negative = input_data[i][2].split(layer_sizes, dim = 1) + + # convert to dict + anchor = {k: anchor[k] for k in range(len(anchor))} + positive = {k: anchor[k] for k in range(len(positive))} + negative = {k: anchor[k] for k in range(len(negative))} + anchor_embedding, positive_embedding, negative_embedding, outputs = self.forward(anchor, positive, negative) + outputs_list.append(outputs[target_var]) + return torch.cat(outputs_list, dim = 0) + + def compute_feature_importance(self, target_var, steps = 5): + """ + Compute the feature importance. + + Args: + input_data (torch.Tensor): The input data to compute the feature importance for. + target_var (str): The target variable to compute the feature importance for. + Returns: + attributions (list of torch.Tensor): The feature importances for each class. + """ + + # self.dataset is a TripletMultiomicDataset, which has a different + # structure than the MultiomicDataset. We use data loader to + # read the triplets and get anchor/positive/negative tensors + # read the whole dataset + dl = DataLoader(self.dataset, batch_size=len(self.dataset)) + it = iter(dl) + anchor, positive, negative, y_dict = next(it) + + # Initialize the Integrated Gradients method + ig = IntegratedGradients(self.forward_target) + + anchor = [data.requires_grad_() for data in list(anchor.values())] + positive = [data.requires_grad_() for data in list(positive.values())] + negative = [data.requires_grad_() for data in list(negative.values())] + + # concatenate multiomic layers of each list element + # then stack the anchor/positive/negative + # the purpose is to get a single tensor + input_data = torch.stack([torch.cat(sublist, dim = 1) for sublist in [anchor, positive, negative]]).unsqueeze(0) + + # layer sizes will be needed to revert the concatenated tensor + # anchor/positive/negative have the same shape + layer_sizes = [anchor[i].shape[1] for i in range(len(anchor))] + + # Define a baseline + baseline = torch.zeros_like(input_data) + + # Get the number of classes for the target variable + if self.variable_types[target_var] == 'numerical': + num_class = 1 + else: + num_class = len(np.unique(self.ann[target_var])) + + # Compute the feature importance for each class + attributions = [] + if num_class > 1: + for target_class in range(num_class): + attributions.append(ig.attribute(input_data, baseline, additional_forward_args=(layer_sizes, target_var, steps), target=target_class, n_steps=steps)) + else: + attributions.append(ig.attribute(input_data, baseline, additional_forward_args=(layer_sizes, target_var, steps), n_steps=steps)) + + # summarize feature importances + # Compute absolute attributions + abs_attr = [[torch.abs(a) for a in attr_class] for attr_class in attributions] + # average over samples + imp = [[a.mean(dim=1) for a in attr_class] for attr_class in abs_attr] + + # combine into a single data frame + df_list = [] + layers = list(self.dataset.dataset.dat.keys()) # accessing multiomicdataset within tripletmultiomic dataset here + for i in range(num_class): + imp_layerwise = imp[i][0].split(layer_sizes, dim = 1) + for j in range(len(layers)): + features = self.dataset.dataset.features[layers[j]] # accessing multiomicdataset within tripletmultiomic dataset here + importances = imp_layerwise[j][0].detach().numpy() # 0 => extract importances only for the anchor + df_list.append(pd.DataFrame({'target_variable': target_var, 'target_class': i, 'layer': layers[j], 'name': features, 'importance': importances})) + df_imp = pd.concat(df_list, ignore_index = True) + + # save scores in model + self.feature_importances[target_var] = df_imp diff --git a/flexynesis/models_shared.py b/flexynesis/models_shared.py deleted file mode 100644 index 900887e..0000000 --- a/flexynesis/models_shared.py +++ /dev/null @@ -1,223 +0,0 @@ -# Networks that can be reused across different architectures - -import torch -from torch import nn - - -class Encoder(nn.Module): - """ - Encoder class for a Variational Autoencoder (VAE). - - The Encoder class is responsible for taking input data and generating the mean and - log variance for the latent space representation. - """ - def __init__(self, input_dim, hidden_dims, latent_dim): - super(Encoder, self).__init__() - - self.LeakyReLU = nn.LeakyReLU(0.2) - - hidden_layers = [] - - hidden_layers.append(nn.Linear(input_dim, hidden_dims[0])) - nn.init.xavier_uniform_(hidden_layers[-1].weight) - hidden_layers.append(self.LeakyReLU) - hidden_layers.append(nn.BatchNorm1d(hidden_dims[0])) - - for i in range(len(hidden_dims)-1): - hidden_layers.append(nn.Linear(hidden_dims[i], hidden_dims[i+1])) - nn.init.xavier_uniform_(hidden_layers[-1].weight) - hidden_layers.append(self.LeakyReLU) - hidden_layers.append(nn.BatchNorm1d(hidden_dims[i+1])) - - self.hidden_layers = nn.Sequential(*hidden_layers) - - self.FC_mean = nn.Linear(hidden_dims[-1], latent_dim) - nn.init.xavier_uniform_(self.FC_mean.weight) - self.FC_var = nn.Linear(hidden_dims[-1], latent_dim) - nn.init.xavier_uniform_(self.FC_var.weight) - - def forward(self, x): - """ - Performs a forward pass through the Encoder network. - - Args: - x (torch.Tensor): The input data tensor. - - Returns: - mean (torch.Tensor): The mean of the latent space representation. - log_var (torch.Tensor): The log variance of the latent space representation. - """ - h_ = self.hidden_layers(x) - mean = self.FC_mean(h_) - log_var = self.FC_var(h_) - return mean, log_var - - -class Decoder(nn.Module): - """ - Decoder class for a Variational Autoencoder (VAE). - - The Decoder class is responsible for taking the latent space representation and - generating the reconstructed output data. - """ - def __init__(self, latent_dim, hidden_dims, output_dim): - super(Decoder, self).__init__() - - self.LeakyReLU = nn.LeakyReLU(0.2) - - hidden_layers = [] - - hidden_layers.append(nn.Linear(latent_dim, hidden_dims[0])) - nn.init.xavier_uniform_(hidden_layers[-1].weight) - hidden_layers.append(self.LeakyReLU) - hidden_layers.append(nn.BatchNorm1d(hidden_dims[0])) - - for i in range(len(hidden_dims) - 1): - hidden_layers.append(nn.Linear(hidden_dims[i], hidden_dims[i + 1])) - nn.init.xavier_uniform_(hidden_layers[-1].weight) - hidden_layers.append(self.LeakyReLU) - hidden_layers.append(nn.BatchNorm1d(hidden_dims[i+1])) - - self.hidden_layers = nn.Sequential(*hidden_layers) - - self.FC_output = nn.Linear(hidden_dims[-1], output_dim) - nn.init.xavier_uniform_(self.FC_output.weight) - - def forward(self, x): - """ - Performs a forward pass through the Decoder network. - - Args: - x (torch.Tensor): The input tensor representing the latent space. - - Returns: - x_hat (torch.Tensor): The reconstructed output tensor. - """ - h = self.hidden_layers(x) - x_hat = torch.sigmoid(self.FC_output(h)) - return x_hat - - -class MLP(nn.Module): - """ - A Multi-Layer Perceptron (MLP) model for regression or classification tasks. - - The MLP class is a simple feed-forward neural network that can be used for regression - when `output_dim` is set to 1 or for classification when `output_dim` is greater than 1. - """ - def __init__(self, input_dim, hidden_dim, output_dim): - """ - Initializes the MLP class with the given input dimension, output dimension, and hidden layer size. - - Args: - input_dim (int): The input dimension. - hidden_dim (int, optional): The size of the hidden layer. Default is 32. - output_dim (int): The output dimension. Set to 1 for regression tasks, and > 1 for classification tasks. - """ - super(MLP, self).__init__() - self.layer_1 = nn.Linear(input_dim, hidden_dim) - self.layer_out = nn.Linear(hidden_dim, output_dim) if output_dim > 1 else nn.Linear(hidden_dim, 1, bias=False) - self.relu = nn.ReLU() - self.dropout = nn.Dropout(p=0.1) - self.batchnorm = nn.BatchNorm1d(hidden_dim) - - def forward(self, x): - """ - Performs a forward pass through the MLP network. - - Args: - x (torch.Tensor): The input data tensor. - - Returns: - x (torch.Tensor): The output tensor after passing through the MLP network. - """ - x = self.layer_1(x) - # x = self.dropout(x) - if (x.size(0) != 1) and self.training: # Skip BatchNorm if batch size is 1 - x = self.batchnorm(x) - x = self.relu(x) - x = self.dropout(x) - x = self.layer_out(x) - return x - - -class EmbeddingNetwork(nn.Module): - """ - A simple feed-forward neural network for generating embeddings. - - The EmbeddingNetwork class is a straightforward feed-forward network - that can be used to generate embeddings from input data. - """ - def __init__(self, input_size, hidden_size, output_size): - """ - Initializes the EmbeddingNetwork class with the given input size, hidden layer size, and output size. - - Args: - input_size (int): The size of the input data. - hidden_size (int): The size of the hidden layer. - output_size (int): The size of the output layer, representing the dimensionality of the embeddings. - """ - super(EmbeddingNetwork, self).__init__() - self.fc1 = nn.Linear(input_size, hidden_size) - self.relu = nn.ReLU() - self.fc2 = nn.Linear(hidden_size, output_size) - - def forward(self, x): - """ - Performs a forward pass through the EmbeddingNetwork. - - Args: - x (torch.Tensor): The input data tensor. - - Returns: - x (torch.Tensor): The output tensor representing the generated embeddings. - """ - x = self.fc1(x) - x = self.relu(x) - x = self.fc2(x) - return x - -# Simple feed-forward multi-class classifier -class Classifier(nn.Module): - """ - A simple feed-forward neural network for multi-class classification tasks. - - The Classifier class is a straightforward feed-forward network that can be used - to perform multi-class classification on input data. - """ - def __init__(self, input_size, hidden_dims, num_classes): - """ - Initializes the Classifier class with the given input size, hidden layer dimensions, and number of classes. - - Args: - input_size (int): The size of the input data. - hidden_dims (list): A list of integers representing the dimensions of the hidden layers. - num_classes (int): The number of output classes. - """ - super(Classifier, self).__init__() - self.layers = nn.ModuleList() - - # Input layer - self.layers.append(nn.Linear(input_size, hidden_dims[0])) - - # Hidden layers - for i in range(len(hidden_dims) - 1): - self.layers.append(nn.Linear(hidden_dims[i], hidden_dims[i + 1])) - - # Output layer - self.layers.append(nn.Linear(hidden_dims[-1], num_classes)) - - def forward(self, x): - """ - Performs a forward pass through the Classifier network. - - Args: - x (torch.Tensor): The input data tensor. - - Returns: - x (torch.Tensor): The output tensor after passing through the Classifier network. - """ - for layer in self.layers[:-1]: - x = torch.relu(layer(x)) - x = self.layers[-1](x) - return x \ No newline at end of file diff --git a/flexynesis/modules.py b/flexynesis/modules.py index d5c0169..ee0ff49 100644 --- a/flexynesis/modules.py +++ b/flexynesis/modules.py @@ -1,10 +1,228 @@ +# Networks that can be reused across different architectures + import torch from torch import nn -from .models_shared import Encoder, Decoder, MLP, EmbeddingNetwork, Classifier -from .model_TripletEncoder import MultiEmbeddingNetwork +__all__ = ["Encoder", "Decoder", "MLP", "EmbeddingNetwork", "Classifier", "CNN"] + + +class Encoder(nn.Module): + """ + Encoder class for a Variational Autoencoder (VAE). + + The Encoder class is responsible for taking input data and generating the mean and + log variance for the latent space representation. + """ + def __init__(self, input_dim, hidden_dims, latent_dim): + super(Encoder, self).__init__() + + self.LeakyReLU = nn.LeakyReLU(0.2) + + hidden_layers = [] + + hidden_layers.append(nn.Linear(input_dim, hidden_dims[0])) + nn.init.xavier_uniform_(hidden_layers[-1].weight) + hidden_layers.append(self.LeakyReLU) + hidden_layers.append(nn.BatchNorm1d(hidden_dims[0])) + + for i in range(len(hidden_dims)-1): + hidden_layers.append(nn.Linear(hidden_dims[i], hidden_dims[i+1])) + nn.init.xavier_uniform_(hidden_layers[-1].weight) + hidden_layers.append(self.LeakyReLU) + hidden_layers.append(nn.BatchNorm1d(hidden_dims[i+1])) + + self.hidden_layers = nn.Sequential(*hidden_layers) + + self.FC_mean = nn.Linear(hidden_dims[-1], latent_dim) + nn.init.xavier_uniform_(self.FC_mean.weight) + self.FC_var = nn.Linear(hidden_dims[-1], latent_dim) + nn.init.xavier_uniform_(self.FC_var.weight) + + def forward(self, x): + """ + Performs a forward pass through the Encoder network. + + Args: + x (torch.Tensor): The input data tensor. + + Returns: + mean (torch.Tensor): The mean of the latent space representation. + log_var (torch.Tensor): The log variance of the latent space representation. + """ + h_ = self.hidden_layers(x) + mean = self.FC_mean(h_) + log_var = self.FC_var(h_) + return mean, log_var + + +class Decoder(nn.Module): + """ + Decoder class for a Variational Autoencoder (VAE). + + The Decoder class is responsible for taking the latent space representation and + generating the reconstructed output data. + """ + def __init__(self, latent_dim, hidden_dims, output_dim): + super(Decoder, self).__init__() + + self.LeakyReLU = nn.LeakyReLU(0.2) + + hidden_layers = [] + + hidden_layers.append(nn.Linear(latent_dim, hidden_dims[0])) + nn.init.xavier_uniform_(hidden_layers[-1].weight) + hidden_layers.append(self.LeakyReLU) + hidden_layers.append(nn.BatchNorm1d(hidden_dims[0])) + + for i in range(len(hidden_dims) - 1): + hidden_layers.append(nn.Linear(hidden_dims[i], hidden_dims[i + 1])) + nn.init.xavier_uniform_(hidden_layers[-1].weight) + hidden_layers.append(self.LeakyReLU) + hidden_layers.append(nn.BatchNorm1d(hidden_dims[i+1])) -__all__ = ["Encoder", "Decoder", "MLP", "EmbeddingNetwork", "MultiEmbeddingNetwork", "Classifier", "CNN"] + self.hidden_layers = nn.Sequential(*hidden_layers) + + self.FC_output = nn.Linear(hidden_dims[-1], output_dim) + nn.init.xavier_uniform_(self.FC_output.weight) + + def forward(self, x): + """ + Performs a forward pass through the Decoder network. + + Args: + x (torch.Tensor): The input tensor representing the latent space. + + Returns: + x_hat (torch.Tensor): The reconstructed output tensor. + """ + h = self.hidden_layers(x) + x_hat = torch.sigmoid(self.FC_output(h)) + return x_hat + + +class MLP(nn.Module): + """ + A Multi-Layer Perceptron (MLP) model for regression or classification tasks. + + The MLP class is a simple feed-forward neural network that can be used for regression + when `output_dim` is set to 1 or for classification when `output_dim` is greater than 1. + """ + def __init__(self, input_dim, hidden_dim, output_dim): + """ + Initializes the MLP class with the given input dimension, output dimension, and hidden layer size. + + Args: + input_dim (int): The input dimension. + hidden_dim (int, optional): The size of the hidden layer. Default is 32. + output_dim (int): The output dimension. Set to 1 for regression tasks, and > 1 for classification tasks. + """ + super(MLP, self).__init__() + self.layer_1 = nn.Linear(input_dim, hidden_dim) + self.layer_out = nn.Linear(hidden_dim, output_dim) if output_dim > 1 else nn.Linear(hidden_dim, 1, bias=False) + self.relu = nn.ReLU() + self.dropout = nn.Dropout(p=0.1) + self.batchnorm = nn.BatchNorm1d(hidden_dim) + + def forward(self, x): + """ + Performs a forward pass through the MLP network. + + Args: + x (torch.Tensor): The input data tensor. + + Returns: + x (torch.Tensor): The output tensor after passing through the MLP network. + """ + x = self.layer_1(x) + # x = self.dropout(x) + if (x.size(0) != 1) and self.training: # Skip BatchNorm if batch size is 1 + x = self.batchnorm(x) + x = self.relu(x) + x = self.dropout(x) + x = self.layer_out(x) + return x + + +class EmbeddingNetwork(nn.Module): + """ + A simple feed-forward neural network for generating embeddings. + + The EmbeddingNetwork class is a straightforward feed-forward network + that can be used to generate embeddings from input data. + """ + def __init__(self, input_size, hidden_size, output_size): + """ + Initializes the EmbeddingNetwork class with the given input size, hidden layer size, and output size. + + Args: + input_size (int): The size of the input data. + hidden_size (int): The size of the hidden layer. + output_size (int): The size of the output layer, representing the dimensionality of the embeddings. + """ + super(EmbeddingNetwork, self).__init__() + self.fc1 = nn.Linear(input_size, hidden_size) + self.relu = nn.ReLU() + self.fc2 = nn.Linear(hidden_size, output_size) + + def forward(self, x): + """ + Performs a forward pass through the EmbeddingNetwork. + + Args: + x (torch.Tensor): The input data tensor. + + Returns: + x (torch.Tensor): The output tensor representing the generated embeddings. + """ + x = self.fc1(x) + x = self.relu(x) + x = self.fc2(x) + return x + +# Simple feed-forward multi-class classifier +class Classifier(nn.Module): + """ + A simple feed-forward neural network for multi-class classification tasks. + + The Classifier class is a straightforward feed-forward network that can be used + to perform multi-class classification on input data. + """ + def __init__(self, input_size, hidden_dims, num_classes): + """ + Initializes the Classifier class with the given input size, hidden layer dimensions, and number of classes. + + Args: + input_size (int): The size of the input data. + hidden_dims (list): A list of integers representing the dimensions of the hidden layers. + num_classes (int): The number of output classes. + """ + super(Classifier, self).__init__() + self.layers = nn.ModuleList() + + # Input layer + self.layers.append(nn.Linear(input_size, hidden_dims[0])) + + # Hidden layers + for i in range(len(hidden_dims) - 1): + self.layers.append(nn.Linear(hidden_dims[i], hidden_dims[i + 1])) + + # Output layer + self.layers.append(nn.Linear(hidden_dims[-1], num_classes)) + + def forward(self, x): + """ + Performs a forward pass through the Classifier network. + + Args: + x (torch.Tensor): The input data tensor. + + Returns: + x (torch.Tensor): The output tensor after passing through the Classifier network. + """ + for layer in self.layers[:-1]: + x = torch.relu(layer(x)) + x = self.layers[-1](x) + return x class CNN(nn.Module):