diff --git a/pcntoolkit/model/hbr.py b/pcntoolkit/model/hbr.py index ea7252c0..77c60f1d 100644 --- a/pcntoolkit/model/hbr.py +++ b/pcntoolkit/model/hbr.py @@ -580,7 +580,7 @@ def predict_on_new_site(self, X, batch_effects): modeler = self.get_modeler() with modeler(X, y, batch_effects, self.configs, idata=self.idata): self.idata = pm.sample_posterior_predictive( - self.idata, extend_inferencedata=True, progressbar=True + self.idata, extend_inferencedata=True, progressbar=True, var_names=self.vars_to_sample ) pred_mean = self.idata.posterior_predictive["y_like"].mean(axis=(0, 1)) pred_var = self.idata.posterior_predictive["y_like"].var(axis=(0, 1)) diff --git a/pcntoolkit/normative.py b/pcntoolkit/normative.py index 8db40074..d8ef7b97 100755 --- a/pcntoolkit/normative.py +++ b/pcntoolkit/normative.py @@ -819,7 +819,6 @@ def predict(covfile, respfile, maskfile=None, **kwargs): Xz = X # estimate the models for all variabels - # TODO Z-scores adaptation for SHASH HBR for i, m in enumerate(models): print("Prediction by model ", i+1, "of", feature_num) nm = norm_init(Xz) @@ -842,6 +841,19 @@ def predict(covfile, respfile, maskfile=None, **kwargs): else: Yhat[:, i] = yhat.squeeze() S2[:, i] = s2.squeeze() + if respfile is not None: + Y, maskvol = load_response_vars(respfile, maskfile) + Y = Y[:, i:i+1] + if alg == 'hbr': + if outscaler in ['standardize', 'minmax', 'robminmax']: + Yz = scaler_resp[fold].transform(Y) + else: + Yz = Y + Z[:,i] = nm.get_mcmc_zscores(Xz, Yz, **kwargs) + else: + Z[:,i] = (Y - Yhat[:, i]) / np.sqrt(S2[:, i]) + + if respfile is None: save_results(None, Yhat, S2, None, outputsuffix=outputsuffix) @@ -881,8 +893,6 @@ def predict(covfile, respfile, maskfile=None, **kwargs): else: warp = False - Z = (Y - Yhat) / np.sqrt(S2) - print("Evaluating the model ...") if meta_data and not warp: diff --git a/pcntoolkit/normative_model/norm_hbr.py b/pcntoolkit/normative_model/norm_hbr.py index a09b6a2b..f972bfd0 100644 --- a/pcntoolkit/normative_model/norm_hbr.py +++ b/pcntoolkit/normative_model/norm_hbr.py @@ -545,14 +545,21 @@ def get_mcmc_quantiles(self, X, batch_effects=None, z_scores=None): if 'posterior_predictive' in self.hbr.idata.groups(): del self.hbr.idata.posterior_predictive + if self.configs["transferred"] == True: + self.predict_on_new_sites( + X=X, + batch_effects=batch_effects + ) + #var_names = ["y_like"] + else: + self.hbr.predict( # Do a forward to get the posterior predictive in the idata - self.hbr.predict( - X=X, - batch_effects=batch_effects, - batch_effects_maps=self.batch_effects_maps, - pred="single", - var_names=var_names+["y_like"], - ) + X=X, + batch_effects=batch_effects, + batch_effects_maps=self.batch_effects_maps, + pred="single", + var_names=var_names+["y_like"], + ) # Extract the relevant samples from the idata post_pred = az.extract( diff --git a/pcntoolkit/normative_parallel.py b/pcntoolkit/normative_parallel.py index 521a6251..4733bf77 100755 --- a/pcntoolkit/normative_parallel.py +++ b/pcntoolkit/normative_parallel.py @@ -30,7 +30,8 @@ import time import numpy as np import pandas as pd -from subprocess import call, check_output +from datetime import datetime +from subprocess import run, check_output try: @@ -87,6 +88,7 @@ def execute_nm(processing_dir, :param testrespfile_path: Full path to a .txt file that contains all test features :param log_path: Path for saving log files :param binary: If True uses binary format for response file otherwise it is text + :param cluster_spec: 'torque' for PBS Torque and 'slurm' for Slurm clusters. :param interactive: If False (default) the user should manually rerun the failed jobs or collect the results. If 'auto' the job status are checked until all @@ -106,7 +108,7 @@ def execute_nm(processing_dir, cv_folds = kwargs.get('cv_folds', None) testcovfile_path = kwargs.get('testcovfile_path', None) testrespfile_path = kwargs.get('testrespfile_path', None) - outputsuffix = kwargs.get('outputsuffix', 'estimate') + outputsuffix = kwargs.get('outputsuffix', '_estimate') cluster_spec = kwargs.pop('cluster_spec', 'torque') log_path = kwargs.get('log_path', None) binary = kwargs.pop('binary', False) @@ -129,6 +131,8 @@ def execute_nm(processing_dir, kwargs.update({'batch_size': str(batch_size)}) job_ids = [] + start_time = datetime.now().strftime("%Y-%m-%dT%H:%M:%S") + for n in range(1, number_of_batches+1): kwargs.update({'job_id': str(n)}) if testrespfile_path is not None: @@ -163,7 +167,7 @@ def execute_nm(processing_dir, memory=memory, duration=duration) job_ids.append(job_id) - elif cluster_spec == 'sbatch': + elif cluster_spec == 'slurm': # update the response file kwargs.update({'testrespfile_path': batch_testrespfile_path}) @@ -177,8 +181,11 @@ def execute_nm(processing_dir, memory=memory, duration=duration, **kwargs) - sbatch_nm(job_path=batch_job_path, - log_path=log_path) + + job_id = sbatch_nm(job_path=batch_job_path) + job_ids.append(job_id) + + elif cluster_spec == 'new': # this part requires addition in different envioronment [ sbatchwrap_nm(processing_dir=batch_processing_dir, @@ -207,7 +214,7 @@ def execute_nm(processing_dir, memory=memory, duration=duration) job_ids.append(job_id) - elif cluster_spec == 'sbatch': + elif cluster_spec == 'slurm': sbatchwrap_nm(batch_processing_dir, python_path, normative_path, @@ -218,8 +225,9 @@ def execute_nm(processing_dir, memory=memory, duration=duration, **kwargs) - sbatch_nm(job_path=batch_job_path, - log_path=log_path) + + job_id = sbatch_nm(job_path=batch_job_path) + job_ids.append(job_id) elif cluster_spec == 'new': # this part requires addition in different envioronment [ bashwrap_nm(processing_dir=batch_processing_dir, func=func, @@ -249,7 +257,7 @@ def execute_nm(processing_dir, memory=memory, duration=duration) job_ids.append(job_id) - elif cluster_spec == 'sbatch': + elif cluster_spec == 'slurm': sbatchwrap_nm(batch_processing_dir, python_path, normative_path, @@ -260,8 +268,11 @@ def execute_nm(processing_dir, memory=memory, duration=duration, **kwargs) - sbatch_nm(job_path=batch_job_path, - log_path=log_path) + + + job_id = sbatch_nm(job_path=batch_job_path) + job_ids.append(job_id) + elif cluster_spec == 'new': # this part requires addition in different envioronment [ bashwrap_nm(processing_dir=batch_processing_dir, func=func, @@ -271,7 +282,7 @@ def execute_nm(processing_dir, if interactive: - check_jobs(job_ids, delay=60) + check_jobs(job_ids, cluster_spec, start_time, delay=60) success = False while (not success): @@ -288,16 +299,33 @@ def execute_nm(processing_dir, if interactive == 'query': response = yes_or_no('Rerun the failed jobs?') if response: - rerun_nm(processing_dir, log_path=log_path, memory=memory, + if cluster_spec == 'torque': + rerun_nm(processing_dir, log_path=log_path, memory=memory, duration=duration, binary=binary, interactive=interactive) + elif cluster_spec == 'slurm': + sbatchrerun_nm(processing_dir, + memory=memory, + duration=duration, + binary=binary, + log_path=log_path, + interactive=interactive) + else: success = True else: print('Reruning the failed jobs ...') - rerun_nm(processing_dir, log_path=log_path, memory=memory, - duration=duration, binary=binary, - interactive=interactive) + if cluster_spec == 'torque': + rerun_nm(processing_dir, log_path=log_path, memory=memory, + duration=duration, binary=binary, + interactive=interactive) + elif cluster_spec == 'slurm': + sbatchrerun_nm(processing_dir, + memory=memory, + duration=duration, + binary=binary, + log_path=log_path, + interactive=interactive) if interactive == 'query': response = yes_or_no('Collect the results?') @@ -476,19 +504,15 @@ def collect_nm(processing_dir, batch_fail = [] if (func != 'fit' and func != 'extend' and func != 'merge' and func != 'tune'): - file_example = [] # TODO: Collect_nm only depends on yhat, thus does not work when no # prediction is made (when test cov is not specified). - for batch in batches: - if file_example == []: - file_example = glob.glob(batch + 'yhat' + outputsuffix + files = glob.glob(processing_dir + 'batch_*/' + 'yhat' + outputsuffix + file_extentions) - else: - break - if binary is False: - file_example = fileio.load(file_example[0]) + if len(files)>0: + file_example = fileio.load(files[0]) else: - file_example = pd.read_pickle(file_example[0]) + raise ValueError(f"Missing output files (yhats at: {processing_dir + 'batch_*/' + 'yhat' + outputsuffix + file_extentions}") + numsubjects = file_example.shape[0] try: # doesn't exist if size=1, and txt file @@ -931,7 +955,7 @@ def bashwrap_nm(processing_dir, job_call = [python_path + ' ' + normative_path + ' -c ' + covfile_path + ' -f ' + func] else: - raise ValueError("""For 'estimate' function either testcov or cvfold + raise ValueError("""For 'estimate' function either testrespfile_path or cvfold must be specified.""") # add algorithm-specific parameters @@ -1001,6 +1025,7 @@ def rerun_nm(processing_dir, log_path, memory, duration, + cluster_spec, binary=False, interactive=False): '''This function reruns all failed batched in processing_dir after collect_nm has identified the failed batches. @@ -1045,7 +1070,7 @@ def rerun_nm(processing_dir, job_ids.append(job_id) if interactive: - check_jobs(job_ids, delay=60) + check_jobs(job_ids, cluster_spec, delay=60) # COPY the rotines above here and aadapt those to your cluster @@ -1059,6 +1084,7 @@ def sbatchwrap_nm(processing_dir, respfile_path, memory, duration, + log_path, func='estimate', **kwargs): '''This function wraps normative modelling into a bash script to run it @@ -1098,13 +1124,14 @@ def sbatchwrap_nm(processing_dir, sbatch_init = '#!/bin/bash\n' sbatch_jobname = '#SBATCH --job-name=' + processing_dir + '\n' - sbatch_account = '#SBATCH --account=p33_norment\n' sbatch_nodes = '#SBATCH --nodes=1\n' sbatch_tasks = '#SBATCH --ntasks=1\n' sbatch_time = '#SBATCH --time=' + str(duration) + '\n' sbatch_memory = '#SBATCH --mem-per-cpu=' + str(memory) + '\n' - sbatch_module = 'module purge\n' - sbatch_anaconda = 'module load anaconda3\n' + sbatch_log_out = '#SBATCH -o ' + log_path + '%j.out' + '\n' + sbatch_log_error = '#SBATCH -e ' + log_path + '%j.err' + '\n' + #sbatch_module = 'module purge\n' + #sbatch_anaconda = 'module load anaconda3\n' sbatch_exit = 'set -o errexit\n' # echo -n "This script is running on " @@ -1112,12 +1139,13 @@ def sbatchwrap_nm(processing_dir, bash_environment = [sbatch_init + sbatch_jobname + - sbatch_account + sbatch_nodes + sbatch_tasks + sbatch_time + - sbatch_module + - sbatch_anaconda] + sbatch_memory+ + sbatch_log_out+ + sbatch_log_error + ] # creates call of function for normative modelling if (testrespfile_path is not None) and (testcovfile_path is not None): @@ -1134,7 +1162,7 @@ def sbatchwrap_nm(processing_dir, job_call = [python_path + ' ' + normative_path + ' -c ' + covfile_path + ' -f ' + func] else: - raise ValueError("""For 'estimate' function either testcov or cvfold + raise ValueError("""For 'estimate' function either testrespfile_path or cv_folds must be specified.""") # add algorithm-specific parameters @@ -1163,19 +1191,17 @@ def sbatchwrap_nm(processing_dir, os.chmod(processing_dir + job_name, 0o770) -def sbatch_nm(job_path, - log_path): +def sbatch_nm(job_path): '''This function submits a job.sh scipt to the torque custer using the qsub command. Basic usage:: - sbatch_nm(job_path, log_path) + sbatch_nm(job_path) :param job_path: Full path to the job.sh file - :param log_path: The logs are currently stored in the working dir - :outputs: Submission of the job to the (torque) cluster. + :outputs: Submission of the job to the slurm cluster. written by (primarily) T Wolfers, (adapted) S Rutherford. ''' @@ -1184,7 +1210,10 @@ def sbatch_nm(job_path, sbatch_call = ['sbatch ' + job_path] # submits job to cluster - call(sbatch_call, shell=True) + job_id = check_output(sbatch_call, shell=True).decode( + sys.stdout.encoding).replace("\n", "") + + return job_id def sbatchrerun_nm(processing_dir, @@ -1193,6 +1222,7 @@ def sbatchrerun_nm(processing_dir, new_memory=False, new_duration=False, binary=False, + interactive=False, **kwargs): '''This function reruns all failed batched in processing_dir after collect_nm has identified he failed batches. @@ -1210,7 +1240,12 @@ def sbatchrerun_nm(processing_dir, written by (primarily) T Wolfers, (adapted) S Rutherford. ''' - log_path = kwargs.pop('log_path', None) + + #log_path = kwargs.pop('log_path', None) + + job_ids = [] + + start_time = datetime.now().strftime("%Y-%m-%dT%H:%M:%S") if binary: file_extentions = '.pkl' @@ -1224,11 +1259,12 @@ def sbatchrerun_nm(processing_dir, with fileinput.FileInput(jobpath, inplace=True) as file: for line in file: print(line.replace(duration, new_duration), end='') - if new_memory != False: - with fileinput.FileInput(jobpath, inplace=True) as file: - for line in file: - print(line.replace(memory, new_memory), end='') - sbatch_nm(jobpath, log_path) + if new_memory != False: + with fileinput.FileInput(jobpath, inplace=True) as file: + for line in file: + print(line.replace(memory, new_memory), end='') + job_id = sbatch_nm(jobpath) + job_ids.append(job_id) else: file_extentions = '.txt' @@ -1242,73 +1278,114 @@ def sbatchrerun_nm(processing_dir, with fileinput.FileInput(jobpath, inplace=True) as file: for line in file: print(line.replace(duration, new_duration), end='') - if new_memory != False: - with fileinput.FileInput(jobpath, inplace=True) as file: - for line in file: - print(line.replace(memory, new_memory), end='') - sbatch_nm(jobpath, - log_path) + if new_memory != False: + with fileinput.FileInput(jobpath, inplace=True) as file: + for line in file: + print(line.replace(memory, new_memory), end='') + job_id = sbatch_nm(jobpath) + job_ids.append(job_id) + + if interactive: + check_jobs(job_ids, cluster_spec='slurm', start_time=start_time, delay=60) -def retrieve_jobs(): +def retrieve_jobs(cluster_spec, start_time=None): """ A utility function to retrieve task status from the outputs of qstat. + + :param cluster_spec: type of cluster, either 'torque' or 'slurm'. :return: a dictionary of jobs. """ - output = check_output('qstat', shell=True).decode(sys.stdout.encoding) - output = output.split('\n') - jobs = dict() - for line in output[2:-1]: - (Job_ID, Job_Name, User, Wall_Time, Status, Queue) = line.split() - jobs[Job_ID] = dict() - jobs[Job_ID]['name'] = Job_Name - jobs[Job_ID]['walltime'] = Wall_Time - jobs[Job_ID]['status'] = Status + if cluster_spec == 'torque': + + output = check_output('qstat', shell=True).decode(sys.stdout.encoding) + output = output.split('\n') + jobs = dict() + for line in output[2:-1]: + (Job_ID, Job_Name, User, Wall_Time, Status, Queue) = line.split() + jobs[Job_ID] = dict() + jobs[Job_ID]['name'] = Job_Name + jobs[Job_ID]['walltime'] = Wall_Time + jobs[Job_ID]['status'] = Status + + elif cluster_spec == 'slurm': + + end_time = datetime.now().strftime("%Y-%m-%dT%H:%M:%S") + cmd = ['sacct', '-n', '-X', '--parsable2', '--noheader', + '-S', start_time, '-E', end_time, '--format=JobName,State'] + jobs = run(cmd, capture_output=True, text=True) return jobs -def check_job_status(jobs): +def check_job_status(jobs, cluster_spec, start_time=None): """ A utility function to count the tasks with different status. :param jobs: List of job ids. - :return: returns the number of taks athat are queued, running, completed etc + :param cluster_spec: type of cluster, either 'torque' or 'slurm'. + :return returns the number of taks athat are queued, running, completed etc """ - running_jobs = retrieve_jobs() + running_jobs = retrieve_jobs(cluster_spec, start_time) r = 0 c = 0 q = 0 u = 0 - for job in jobs: - try: - if running_jobs[job]['status'] == 'C': + + if cluster_spec == 'torque': + + for job in jobs: + try: + if running_jobs[job]['status'] == 'C': + c += 1 + elif running_jobs[job]['status'] == 'Q': + q += 1 + elif running_jobs[job]['status'] == 'R': + r += 1 + else: + u += 1 + except: # probably meanwhile the job is finished. c += 1 - elif running_jobs[job]['status'] == 'Q': - q += 1 - elif running_jobs[job]['status'] == 'R': - r += 1 - else: - u += 1 - except: # probably meanwhile the job is finished. - c += 1 - continue - - print('Total Jobs:%d, Queued:%d, Running:%d, Completed:%d, Unknown:%d' + continue + + print('Total Jobs:%d, Queued:%d, Running:%d, Completed:%d, Unknown:%d' % (len(jobs), q, r, c, u)) + + elif cluster_spec == 'slurm': + + lines = running_jobs.stdout.strip().split('\n') + + for line in lines: + if line: + parts = line.split('|') + if len(parts) >= 2: + job_name, state = parts[0], parts[1] + if state == 'PENDING': + q += 1 + elif state == 'RUNNING': + r += 1 + elif state == 'COMPLETED': + c += 1 + elif state == 'FAILED': + u += 1 + + print('Total Jobs:%d, Pending:%d, Running:%d, Completed:%d, Failed:%d' + % (len(jobs), q, r, c, u)) + return q, r, c, u -def check_jobs(jobs, delay=60): +def check_jobs(jobs, cluster_spec, start_time=None, delay=60): """ A utility function for chacking the status of submitted jobs. :param jobs: list of job ids. + :param cluster_spec: type of cluster, either 'torque' or 'slurm'. :param delay: the delay (in sec) between two consequative checks, defaults to 60. """ @@ -1316,7 +1393,7 @@ def check_jobs(jobs, delay=60): n = len(jobs) while (True): - q, r, c, u = check_job_status(jobs) + q, r, c, u = check_job_status(jobs, cluster_spec, start_time) if c == n: print('All jobs are completed!') break diff --git a/setup.py b/setup.py index bf0cc6a8..f2f96a79 100644 --- a/setup.py +++ b/setup.py @@ -9,8 +9,12 @@ def parse_requirements(filename): requirements = parse_requirements('requirements.txt') +# Note: to force PyPI to overwrite a version without bumping the version number +# use e.g.: +# version = '0.29-1' + setup(name='pcntoolkit', - version='0.29-1', + version='0.30', description='Predictive Clinical Neuroscience toolkit', url='http://github.com/amarquand/PCNtoolkit', author='Andre Marquand', diff --git a/tests/test_normative_parallel.py b/tests/test_normative_parallel.py index 691eca40..89644c59 100644 --- a/tests/test_normative_parallel.py +++ b/tests/test_normative_parallel.py @@ -4,32 +4,46 @@ @author: andmar """ -# import pcntoolkit + import os -import time -from pcntoolkit.normative_parallel import execute_nm, collect_nm, delete_nm +import numpy as np +import pandas as pd +from pcntoolkit.normative_parallel import execute_nm + +# configs +# specify your python path. Make sure you are using the Python in the right environement. +python_path = '/path/to/my/python' + +# specify the working directory to sacve the results. +processing_dir = '/path/to/my/test/directory/' +sample_num = 50 +resp_num = 10 +cov_num = 1 + +# simulating data +pd.DataFrame(np.random.random([sample_num, resp_num])).to_pickle(os.path.join(processing_dir,'train_resp.pkl')) +pd.DataFrame(np.random.random([sample_num, cov_num])).to_pickle(os.path.join(processing_dir,'train_cov.pkl')) +pd.DataFrame(np.random.random([sample_num, resp_num])).to_pickle(os.path.join(processing_dir,'test_resp.pkl')) +pd.DataFrame(np.random.random([sample_num, cov_num])).to_pickle(os.path.join(processing_dir,'test_cov.pkl')) + -data_dir = '/home/preclineu/andmar/py.sandbox/normative_oslo/' -respfile = os.path.join(data_dir, 'ICA100_oslo15_v2_spaces.txt') -covfile = os.path.join(data_dir, 'cov_oslo15_spaces.txt') +respfile = os.path.join(processing_dir,'train_resp.pkl') +covfile = os.path.join(processing_dir,'train_cov.pkl') -cvfolds = 2 +testresp = os.path.join(processing_dir,'test_resp.pkl') +testcov = os.path.join(processing_dir,'test_cov.pkl') -python_path = '/home/preclineu/andmar/sfw/anaconda3/envs/py36/bin/python' -normative_path = '/home/preclineu/andmar/sfw/PCNtoolkit/pcntoolkit/normative.py' -processing_dir = '/home/preclineu/andmar/py.sandbox/demo/' job_name = 'nmp_test' -batch_size = 10 +batch_size = 1 memory = '4gb' duration = '01:00:00' -cluster = 'torque' +cluster = 'slurm' +binary='True' -execute_nm(processing_dir, python_path, normative_path, job_name, covfile, respfile, - batch_size, memory, duration, cluster_spec=cluster, - cv_folds=cvfolds, log_path=processing_dir) # , alg='rfa')#, configparam=4) +execute_nm(processing_dir, python_path, job_name, covfile, respfile, + testcovfile_path=testcov, testrespfile_path=testresp, batch_size=batch_size, + memory=memory, duration=duration, cluster_spec=cluster, + log_path=processing_dir, interactive='auto', binary=binary, + savemodel='True', saveoutput='True') -print("waiting for jobs to finish ...") -time.sleep(60) -collect_nm(processing_dir, job_name, collect=True) -# delete_nm(procedssing_dir)