Skip to content

Commit

Permalink
Merge pull request #59 from BIMSBbioinfo/testgpu
Browse files Browse the repository at this point in the history
Provide support for GPU
  • Loading branch information
borauyar authored Mar 2, 2024
2 parents 6e9b7cb + e8a5c92 commit adf4dcc
Show file tree
Hide file tree
Showing 8 changed files with 152 additions and 57 deletions.
53 changes: 39 additions & 14 deletions flexynesis/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import flexynesis
from flexynesis.models import *
import warnings
import time

def main():
parser = argparse.ArgumentParser(description="Flexynesis - Your PyTorch model training interface",
Expand All @@ -15,7 +16,8 @@ def main():
parser.add_argument("--data_path", help="(Required) Path to the folder with train/test data files", type=str, required = True)
parser.add_argument("--model_class", help="(Required) The kind of model class to instantiate", type=str, choices=["DirectPred", "DirectPredGCNN", "supervised_vae", "MultiTripletNetwork"], required = True)
parser.add_argument("--target_variables",
help="(Optional if survival variables are not set to None) Which variables in 'clin.csv' to use for predictions, comma-separated if multiple",
help="(Optional if survival variables are not set to None)."
"Which variables in 'clin.csv' to use for predictions, comma-separated if multiple",
type = str, default = None)
parser.add_argument("--batch_variables",
help="(Optional) Which variables in 'clin.csv' to use for data integration / batch correction, comma-separated if multiple",
Expand All @@ -33,11 +35,13 @@ def main():
parser.add_argument("--outdir", help="Path to the output folder to save the model outputs", type=str, default = os.getcwd())
parser.add_argument("--prefix", help="Job prefix to use for output files", type=str, default = 'job')
parser.add_argument("--log_transform", help="whether to apply log-transformation to input data matrices", type=str, choices=['True', 'False'], default = 'False')
parser.add_argument("--threads", help="Number of threads to use", type=int, default = 4)
parser.add_argument("--early_stop_patience", help="How many epochs to wait when no improvements in validation loss is observed (default: -1; no early stopping)", type=int, default = -1)
parser.add_argument("--early_stop_patience", help="How many epochs to wait when no improvements in validation loss is observed (default: 10; set to -1 to disable early stopping)", type=int, default = 10)
parser.add_argument("--use_loss_weighting", help="whether to apply loss-balancing using uncertainty weights method", type=str, choices=['True', 'False'], default = 'True')
parser.add_argument("--evaluate_baseline_performance", help="whether to run Random Forest + SVMs to see the performance of off-the-shelf tools on the same dataset", type=str, choices=['True', 'False'], default = 'True')

parser.add_argument("--threads", help="(Optional) How many threads to use when using CPU (default: 4)", type=int, default = 4)
parser.add_argument("--use_gpu", action="store_true",
help="(Optional) If set, the system will attempt to use CUDA/GPU if available.")

warnings.filterwarnings("ignore", ".*does not have many workers.*")
warnings.filterwarnings("ignore", "has been removed as a dependency of the")
warnings.filterwarnings("ignore", "The `srun` command is available on your system but is not used")
Expand All @@ -52,14 +56,33 @@ def main():
# 2. Check for required variables for model classes
if args.model_class != "supervised_vae":
if not any([args.target_variables, args.surv_event_var, args.batch_variables]):
parser.error("When selecting a model other than 'supervised_vae', you must provide at least one of --target_variables, survival variables (--surv_event_var and --surv_time_var), or --batch_variables.")
parser.error(''.join(["When selecting a model other than 'supervised_vae',"
"you must provide at least one of --target_variables, ",
"survival variables (--surv_event_var and --surv_time_var)",
"or --batch_variables."]))

# 3. Check for compatibility of fusion_type with DirectPredGCNN
if args.fusion_type == "early" and args.model_class == "DirectPredGCNN":
parser.error("The 'DirectPredGCNN' model cannot be used with early fusion type. Use --fusion_type intermediate instead.")
parser.error("The 'DirectPredGCNN' model cannot be used with early fusion type. "
"Use --fusion_type intermediate instead.")

# 4. Check for device availability if --accelerator is set.
if args.use_gpu:
if not torch.cuda.is_available():
warnings.warn(''.join(["\n\n!!! WARNING: GPU REQUESTED BUT NOT AVAILABLE. FALLING BACK TO CPU.\n",
"PERFORMANCE MAY BE DEGRADED, PARTICULARLY FOR DirectPredGCNN.\n",
"OTHER MODELS SHOULD HAVE REASONABLE PERFORMANCE ON CPU. \n",
"IF USING A SLURM SCHEDULER, ENSURE YOU REQUEST A GPU WITH: ",
"`srun --gpus=1 --pty flexynesis <rest of your_command>` !!!\n\n"]))
time.sleep(3) #wait a bit to capture user's attention to the warning
device_type = 'cpu'
torch.set_num_threads(args.threads)
else:
device_type = 'gpu'
else:
device_type = 'cpu'
torch.set_num_threads(args.threads)

torch.set_num_threads(args.threads)

# Validate paths
if not os.path.exists(args.data_path):
raise FileNotFoundError(f"Input --data_path doesn't exist at:", {args.data_path})
Expand Down Expand Up @@ -120,13 +143,14 @@ class AvailableModels(NamedTuple):
config_path = args.config_path,
n_iter=int(args.hpo_iter),
use_loss_weighting = args.use_loss_weighting == 'True',
early_stop_patience = int(args.early_stop_patience))
early_stop_patience = int(args.early_stop_patience),
device_type = device_type)

# do a hyperparameter search training multiple models and get the best_configuration
model, best_params = tuner.perform_tuning()

# evaluate predictions
print("Computing model evaluation metrics")
print("[INFO] Computing model evaluation metrics")
metrics_df = flexynesis.evaluate_wrapper(model.predict(test_dataset), test_dataset,
surv_event_var=model.surv_event_var,
surv_time_var=model.surv_time_var)
Expand All @@ -138,23 +162,23 @@ class AvailableModels(NamedTuple):
ignore_index=True)
predicted_labels.to_csv(os.path.join(args.outdir, '.'.join([args.prefix, 'predicted_labels.csv'])), header=True, index=False)
# compute feature importance values
print("Computing variable importance scores")
print("[INFO] Computing variable importance scores")
for var in model.target_variables:
model.compute_feature_importance(var, steps = 30)
df_imp = pd.concat([model.feature_importances[x] for x in model.target_variables],
ignore_index = True)
df_imp.to_csv(os.path.join(args.outdir, '.'.join([args.prefix, 'feature_importance.csv'])), header=True, index=False)

# get sample embeddings and save
print("Extracting sample embeddings")
print("[INFO] Extracting sample embeddings")
embeddings_train = model.transform(train_dataset)
embeddings_test = model.transform(test_dataset)

embeddings_train.to_csv(os.path.join(args.outdir, '.'.join([args.prefix, 'embeddings_train.csv'])), header=True)
embeddings_test.to_csv(os.path.join(args.outdir, '.'.join([args.prefix, 'embeddings_test.csv'])), header=True)

# also filter embeddings to remove batch-associated dims and only keep target-variable associated dims
print("Printing filtered embeddings")
print("[INFO] Printing filtered embeddings")
embeddings_train_filtered = flexynesis.remove_batch_associated_variables(data = embeddings_train,
batch_dict={x: train_dataset.ann[x] for x in model.batch_variables} if model.batch_variables is not None else None,
target_dict={x: train_dataset.ann[x] for x in model.target_variables},
Expand All @@ -168,14 +192,15 @@ class AvailableModels(NamedTuple):

# evaluate off-the-shelf methods on the main target variable
if args.evaluate_baseline_performance == 'True':
print("Computing off-the-shelf method performance on first target variable:",model.target_variables[0])
print("[INFO] Computing off-the-shelf method performance on first target variable:",model.target_variables[0])
var = model.target_variables[0]
metrics = pd.DataFrame()
if var != model.surv_event_var:
metrics = flexynesis.evaluate_baseline_performance(train_dataset, test_dataset,
variable_name = var,
n_folds=5)
if model.surv_event_var and model.surv_time_var:
print("[INFO] Computing off-the-shelf method performance on survival variable:",model.surv_time_var)
metrics_baseline_survival = flexynesis.evaluate_baseline_survival_performance(train_dataset, test_dataset,
model.surv_time_var,
model.surv_event_var,
Expand Down
42 changes: 30 additions & 12 deletions flexynesis/main.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
from pytorch_lightning import seed_everything
from lightning import seed_everything
# Set the seed for all the possible random number generators.
seed_everything(42, workers=True)
import torch
from torch.utils.data import DataLoader, random_split

import pytorch_lightning as pl
from pytorch_lightning.callbacks import RichProgressBar
from pytorch_lightning.callbacks.progress.rich_progress import RichProgressBarTheme
from pytorch_lightning.callbacks import EarlyStopping
import lightning as pl
from lightning.pytorch.callbacks import RichProgressBar
from lightning.pytorch.callbacks.progress.rich_progress import RichProgressBarTheme
from lightning.pytorch.callbacks import EarlyStopping

from tqdm import tqdm

Expand Down Expand Up @@ -39,19 +39,24 @@ class HyperparameterTuning:
val_size: Validation set size as a fraction of the dataset.
use_loss_weighting: Flag to use loss weighting during training.
early_stop_patience: Number of epochs to wait for improvement before stopping.
device_type: Str (cpu, gpu)
Methods:
objective(params, current_step, total_steps): Evaluates a set of parameters.
perform_tuning(): Executes the hyperparameter tuning process.
init_early_stopping(): Initializes early stopping mechanism.
load_and_convert_config(config_path): Loads and converts a configuration file.
"""
def __init__(self, dataset, model_class, config_name, target_variables,
batch_variables = None, surv_event_var = None, surv_time_var = None, n_iter = 10, config_path = None, plot_losses = False,
val_size = 0.2, use_loss_weighting = True, early_stop_patience = -1):
batch_variables = None, surv_event_var = None, surv_time_var = None,
n_iter = 10, config_path = None, plot_losses = False,
val_size = 0.2, use_loss_weighting = True, early_stop_patience = -1,
device_type = None):
self.dataset = dataset
self.model_class = model_class
self.target_variables = target_variables
self.device_type = device_type
if self.device_type is None:
self.device_type = "gpu" if torch.cuda.is_available() else "cpu"
self.surv_event_var = surv_event_var
self.surv_time_var = surv_time_var
self.batch_variables = batch_variables
Expand Down Expand Up @@ -87,7 +92,8 @@ def objective(self, params, current_step, total_steps):
surv_event_var = self.surv_event_var,
surv_time_var = self.surv_time_var,
val_size = self.val_size,
use_loss_weighting = self.use_loss_weighting)
use_loss_weighting = self.use_loss_weighting,
device_type = self.device_type)
print(params)

mycallbacks = [self.progress_bar]
Expand All @@ -99,12 +105,23 @@ def objective(self, params, current_step, total_steps):
mycallbacks.append(self.init_early_stopping())

trainer = pl.Trainer(max_epochs=int(params['epochs']), log_every_n_steps=5,
callbacks = mycallbacks, default_root_dir="./", logger=False, enable_checkpointing=False)
callbacks = mycallbacks, default_root_dir="./", logger=False,
enable_checkpointing=False,
devices=1, accelerator=self.device_type)

# Create a new Trainer instance for validation, ensuring single-device processing
validation_trainer = pl.Trainer(
logger=False,
enable_checkpointing=False,
devices=1, # make sure to a single device for validation
accelerator=self.device_type
)

try:
# Train the model
trainer.fit(model)
# Validate the model
val_loss = trainer.validate(model)[0]['val_loss']
val_loss = validation_trainer.validate(model)[0]['val_loss']
except ValueError as e:
print(str(e))
val_loss = float('inf') # or some other value indicating failure
Expand All @@ -119,6 +136,7 @@ def perform_tuning(self):

with tqdm(total=self.n_iter, desc='Tuning Progress') as pbar:
for i in range(self.n_iter):
np.int = int
suggested_params_list = opt.ask()
suggested_params_dict = {param.name: value for param, value in zip(self.space, suggested_params_list)}
loss, model = self.objective(suggested_params_dict, current_step=i+1, total_steps=self.n_iter)
Expand Down Expand Up @@ -177,7 +195,7 @@ def load_and_convert_config(self, config_path):

import matplotlib.pyplot as plt
from IPython.display import clear_output
from pytorch_lightning import Callback
from lightning import Callback

class LiveLossPlot(Callback):
"""
Expand Down
28 changes: 21 additions & 7 deletions flexynesis/models/direct_pred.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import torch
from torch import nn
from torch.nn import functional as F
import pytorch_lightning as pl
import lightning as pl
from torch.utils.data import Dataset, DataLoader, random_split

import pandas as pd
Expand All @@ -16,7 +16,8 @@

class DirectPred(pl.LightningModule):
def __init__(self, config, dataset, target_variables, batch_variables = None,
surv_event_var = None, surv_time_var = None, val_size = 0.2, use_loss_weighting = True):
surv_event_var = None, surv_time_var = None, val_size = 0.2, use_loss_weighting = True,
device_type = None):
super(DirectPred, self).__init__()
self.config = config
self.dataset = dataset
Expand All @@ -34,6 +35,8 @@ def __init__(self, config, dataset, target_variables, batch_variables = None,
self.feature_importances = {}
self.use_loss_weighting = use_loss_weighting

self.device_type = device_type

if self.use_loss_weighting:
# Initialize log variance parameters for uncertainty weighting
self.log_vars = nn.ParameterDict()
Expand Down Expand Up @@ -274,8 +277,14 @@ def compute_feature_importance(self, target_var, steps = 5):
Returns:
attributions (list of torch.Tensor): The feature importances for each class.
"""
x_list = [self.dataset.dat[x] for x in self.dataset.dat.keys()]

device = torch.device("cuda" if self.device_type == 'gpu' and torch.cuda.is_available() else 'cpu')
self.to(device)

print("[INFO] Computing feature importance for variable:",target_var,"on device:",device)

# Assuming self.dataset.dat is a dictionary of tensors
x_list = [self.dataset.dat[x].to(device) for x in self.dataset.dat.keys()]

# Initialize the Integrated Gradients method
ig = IntegratedGradients(self.forward_target)

Expand All @@ -300,19 +309,24 @@ def compute_feature_importance(self, target_var, steps = 5):

# summarize feature importances
# Compute absolute attributions
abs_attr = [[torch.abs(a) for a in attr_class] for attr_class in attributions]
# Move the processed tensors to CPU for further operations that are not supported on GPU
abs_attr = [[torch.abs(a).cpu() for a in attr_class] for attr_class in attributions]
# average over samples
imp = [[a.mean(dim=1) for a in attr_class] for attr_class in abs_attr]

# combine into a single data frame
# move the model also back to cpu (if not already on cpu)
self.to('cpu')

# combine into a single data frame
df_list = []
layers = list(self.dataset.dat.keys())
for i in range(num_class):
for j in range(len(layers)):
features = self.dataset.features[layers[j]]
# Ensure tensors are already on CPU before converting to numpy
importances = imp[i][j][0].detach().numpy()
df_list.append(pd.DataFrame({'target_variable': target_var, 'target_class': i, 'layer': layers[j], 'name': features, 'importance': importances}))
df_imp = pd.concat(df_list, ignore_index = True)
df_imp = pd.concat(df_list, ignore_index=True)

# save the computed scores in the model
self.feature_importances[target_var] = df_imp
Expand Down
2 changes: 1 addition & 1 deletion flexynesis/models/direct_pred_cnn.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from torch import nn
from torch.nn import functional as F
from torch.utils.data import DataLoader, random_split
import pytorch_lightning as pl
import lightning as pl

import numpy as np
import pandas as pd
Expand Down
Loading

0 comments on commit adf4dcc

Please sign in to comment.