From 20622942f60c1a7cf114016d40e52fdcd151ca8c Mon Sep 17 00:00:00 2001 From: nisthapanda Date: Tue, 26 Sep 2023 11:09:48 -0500 Subject: [PATCH 01/21] Add dsub to conda environment --- environment.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/environment.yml b/environment.yml index 3d3fb949..7b81df8e 100644 --- a/environment.yml +++ b/environment.yml @@ -26,3 +26,4 @@ dependencies: - pip: - graphspace_python==1.3.1 - sphinx-rtd-theme==1.0.0 + - dsub==0.4.9 From 863bdcfe4f185b6cdc0597e88a7e46538852f72e Mon Sep 17 00:00:00 2001 From: nisthapanda Date: Tue, 26 Sep 2023 11:10:15 -0500 Subject: [PATCH 02/21] Add functions for running SPRAS with dsub in All of Us --- src/util.py | 114 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 114 insertions(+) diff --git a/src/util.py b/src/util.py index 05a35a01..33dac551 100644 --- a/src/util.py +++ b/src/util.py @@ -39,6 +39,7 @@ def prepare_path_docker(orig_path: PurePath) -> str: return prepared_path + def convert_docker_path(src_path: PurePath, dest_path: PurePath, file_path: Union[str, PurePath]) -> PurePosixPath: """ Convert a file_path that is in src_path to be in dest_path instead. @@ -51,6 +52,61 @@ def convert_docker_path(src_path: PurePath, dest_path: PurePath, file_path: Unio rel_path = file_path.relative_to(src_path) return PurePosixPath(dest_path, rel_path) +def download_gcs(gcs_path, local_path, is_dir, multithread): + # check that output path exists + if not os.path.exists(Path(local_path).parent): + os.makedirs(Path(local_path).parent) + + # build command + cmd = 'gsutil' + if multithread: + cmd = cmd + ' -m' + cmd = cmd + ' cp' + if is_dir: + cmd = cmd + ' -r' + cmd = cmd + ' ' + gcs_path + ' ' + local_path + + print(cmd) + subprocess.run(cmd, shell=True +def upload_gcs(local_path, gcs_path, is_dir, multithread): + # build command + cmd = 'gsutil' + if multithread: + cmd = cmd + ' -m' + cmd = cmd + ' cp' + if is_dir: + cmd = cmd + ' -r' + cmd = cmd + ' ' + str(Path(local_path).resolve()) + ' ' + gcs_path + + print(cmd) + subprocess.run(cmd, shell=True) + +def prepare_dsub_cmd(flags): + # set constant flags + dsub_command = 'dsub' + flags['provider'] = 'google-cls-v2' + flags['regions'] = 'us-central1' + #flags['user-project'] = os.getenv('GOOGLE_PROJECT') + #flags['project'] = os.getenv('GOOGLE_PROJECT') + flags['network'] = 'network' + flags['subnetwork'] = 'subnetwork' + flags['service-account'] = subprocess.run(['gcloud', 'config' ,'get-value' ,'account'], capture_output=True, text=True).stdout.replace('\n', '') + + # order flags according to flag_list + flag_list = ["provider", "regions", "zones", "location", "user-project", "project", "network", "subnetwork", "service-account", "image", "env", "logging", "input", "input-recursive", "mount", "output", "output-recursive", "command", "script"] + ordered_flags = {f:flags[f] for f in flag_list if f in flags.keys()} + + # parse through keyword arguments + for flag in ordered_flags.keys(): + if isinstance(ordered_flags.get(flag), list): + for f in ordered_flags.get(flag): + dsub_command = dsub_command + " --" + flag + " " + f + else: + dsub_command = dsub_command + " --" + flag + " " + ordered_flags.get(flag) + + dsub_command = dsub_command + " --wait" + print(f"Command: {dsub_command}") + return dsub_command # TODO consider a better default environment variable # Follow docker-py's naming conventions (https://docker-py.readthedocs.io/en/stable/containers.html) @@ -71,6 +127,8 @@ def run_container(framework: str, container: str, command: List[str], volumes: L return run_container_docker(container, command, volumes, working_dir, environment) elif normalized_framework == 'singularity': return run_container_singularity(container, command, volumes, working_dir, environment) + elif normalized_framework == 'dsub': + return run_container_dsub(container, command, volumes, working_dir, environment) else: raise ValueError(f'{framework} is not a recognized container framework. Choose "docker" or "singularity".') @@ -166,6 +224,62 @@ def run_container_docker(container: str, command: List[str], volumes: List[Tuple # finally: return out +def run_container_dsub(container: str, command: List[str], volumes: List[Tuple[PurePath, PurePath]], working_dir: str, environment: str = 'SPRAS=True'): + """ + Runs a command in the container using Docker. + Attempts to automatically correct file owner and group for new files created by the container, setting them to the + current owner and group IDs. + Does not modify the owner or group for existing files modified by the container. + @param container: name of the container in the Google Cloud Container Registry + @param command: command to run in the container + @param volumes: a list of volumes to mount where each item is a (source, destination) tuple + @param working_dir: the working directory in the container + @param environment: environment variables to set in the container + @return: path of output from dsub + """ + # Dictionary of flags for dsub command + flags = dict() + # Add path in the workspace bucket and label for dsub command for each volume + dsub_volumes = [(src, dst, "${WORKSPACE}"+str(dst), "INPUT_" + str(i),) for i, (src, dst) in enumerate(volumes)] + + workspace_bucket = os.getenv('WORKSPACE_BUCKET') + # Prepare command that will be run inside the container for dsub + container_command = list() + for item in command: + # Replace each volume with path in workspace + to_replace = ["${"+path[3]+'}' for path in dsub_volumes if str(path[1]) in item] + container_command.append(to_replace[0] if len(to_replace) == 1 else item) + # Add a command to copy the volumes to the workspace buckets + container_command.extend([';', 'cp', '-r']) + container_command.extend(['${'+volume[3]+'}' for volume in dsub_volumes]) + container_command.append('${OUTPUT}') + # Make the command into a string + flags['command'] = ' '.join(container_command) + flags['command'] = '"' + flags['command'] + '"' + + ## Push volumes to WORKSPACE_BUCKET + for src, dst, gcs_path, env in dsub_volumes: + upload_gcs(local_path=str(src), gcs_path=gcs_path, is_dir=True, multithread=True) + + ## Prepare flags for dsub command + flags['image'] = container + flags['env'] = environment + flags['input-recursive'] = [vol[3]+'='+vol[2] for vol in dsub_volumes] + flags['output-recursive'] = '${WORKSPACE}'+work_dir + flags['logging'] = '${WORKSPACE}/dsub/'+ datetime.now().isoformat().replace('.', '-').replace(':', '-') + + # Create dsub command + dsub_command = prepare_dsub_cmd(flags) + + # Run dsub as subprocess + subprocess.run(dsub_command, shell=True) + + # Pull output volumes from WORKSPACE_BUCKET + for src, dst, gcs_path, env in dsub_volumes: + download_gcs(local_path=str(src), gcs_path=gcs_path, is_dir=True, multithread=True) + + # return location of dsub logs in WORKSPACE_BUCKET + return 'dsub logs: {logs}'.format(logs = flags['logging']).replace('${WORKSPACE}', str(workspace_bucket)) def run_container_singularity(container: str, command: List[str], volumes: List[Tuple[PurePath, PurePath]], working_dir: str, environment: str = 'SPRAS=True'): """ From 92c2a97f1fadbacbae35ee6855944b2da89460e1 Mon Sep 17 00:00:00 2001 From: nisthapanda Date: Wed, 27 Sep 2023 16:40:15 +0000 Subject: [PATCH 03/21] Add functions for running with dsub in All of Us --- src/util.py | 42 ++++++++++++++++++++++++++++-------------- 1 file changed, 28 insertions(+), 14 deletions(-) diff --git a/src/util.py b/src/util.py index 66d6fb35..dc9d574c 100644 --- a/src/util.py +++ b/src/util.py @@ -15,6 +15,8 @@ import docker import numpy as np # Required to eval some forms of parameter ranges import pandas as pd +import subprocess +from datetime import datetime # The default length of the truncated hash used to identify parameter combinations DEFAULT_HASH_LENGTH = 7 @@ -64,10 +66,10 @@ def download_gcs(gcs_path, local_path, is_dir, multithread): cmd = cmd + ' cp' if is_dir: cmd = cmd + ' -r' - cmd = cmd + ' ' + gcs_path + ' ' + local_path + cmd = cmd + ' ' + gcs_path + '/ ' + str(Path(local_path).parent) print(cmd) - subprocess.run(cmd, shell=True + subprocess.run(cmd, shell=True) def upload_gcs(local_path, gcs_path, is_dir, multithread): # build command cmd = 'gsutil' @@ -86,8 +88,8 @@ def prepare_dsub_cmd(flags): dsub_command = 'dsub' flags['provider'] = 'google-cls-v2' flags['regions'] = 'us-central1' - #flags['user-project'] = os.getenv('GOOGLE_PROJECT') - #flags['project'] = os.getenv('GOOGLE_PROJECT') + flags['user-project'] = os.getenv('GOOGLE_PROJECT') + flags['project'] = os.getenv('GOOGLE_PROJECT') flags['network'] = 'network' flags['subnetwork'] = 'subnetwork' flags['service-account'] = subprocess.run(['gcloud', 'config' ,'get-value' ,'account'], capture_output=True, text=True).stdout.replace('\n', '') @@ -240,23 +242,32 @@ def run_container_dsub(container: str, command: List[str], volumes: List[Tuple[P """ # Dictionary of flags for dsub command flags = dict() + + workspace_bucket = os.getenv('WORKSPACE_BUCKET') # Add path in the workspace bucket and label for dsub command for each volume - dsub_volumes = [(src, dst, "${WORKSPACE}"+str(dst), "INPUT_" + str(i),) for i, (src, dst) in enumerate(volumes)] + dsub_volumes = [(src, dst, workspace_bucket + str(dst), "INPUT_" + str(i),) for i, (src, dst) in enumerate(volumes)] - workspace_bucket = os.getenv('WORKSPACE_BUCKET') # Prepare command that will be run inside the container for dsub container_command = list() for item in command: # Replace each volume with path in workspace to_replace = ["${"+path[3]+'}' for path in dsub_volumes if str(path[1]) in item] - container_command.append(to_replace[0] if len(to_replace) == 1 else item) + if len(to_replace) == 1 and len(PurePath(item).suffix) > 0: + container_command.append(to_replace[0]+'/'+item.split('/')[-1]) + elif len(to_replace) == 1 and len(PurePath(item).suffix) == 0: + container_command.append(to_replace[0]+'/') + else: + container_command.append(item) # Add a command to copy the volumes to the workspace buckets - container_command.extend([';', 'cp', '-r']) - container_command.extend(['${'+volume[3]+'}' for volume in dsub_volumes]) - container_command.append('${OUTPUT}') + #container_command.extend([';', 'cp', '-r']) + #container_command.extend(['${'+volume[3]+'}' for volume in dsub_volumes]) + #container_command.append('${OUTPUT}') +# container_command.extend([('; cp -r ' + f'/mnt/data/input/gs/{workspace_bucket}' +str(volume[1]) + ' $OUTPUT').replace('gs://', '') for volume in dsub_volumes]) + container_command.append(('; cp -rf ' + f'/mnt/data/input/gs/{workspace_bucket}{working_dir}' + ' $OUTPUT').replace('gs://', '')) + # Make the command into a string flags['command'] = ' '.join(container_command) - flags['command'] = '"' + flags['command'] + '"' + flags['command'] = "'" + flags['command'] + "'" ## Push volumes to WORKSPACE_BUCKET for src, dst, gcs_path, env in dsub_volumes: @@ -266,8 +277,8 @@ def run_container_dsub(container: str, command: List[str], volumes: List[Tuple[P flags['image'] = container flags['env'] = environment flags['input-recursive'] = [vol[3]+'='+vol[2] for vol in dsub_volumes] - flags['output-recursive'] = '${WORKSPACE}'+work_dir - flags['logging'] = '${WORKSPACE}/dsub/'+ datetime.now().isoformat().replace('.', '-').replace(':', '-') + flags['output-recursive'] = "OUTPUT=" + workspace_bucket + working_dir + flags['logging'] = workspace_bucket + '/dsub/'+ datetime.now().isoformat().replace('.', '-').replace(':', '-') # Create dsub command dsub_command = prepare_dsub_cmd(flags) @@ -277,10 +288,13 @@ def run_container_dsub(container: str, command: List[str], volumes: List[Tuple[P # Pull output volumes from WORKSPACE_BUCKET for src, dst, gcs_path, env in dsub_volumes: + gcs_path = gcs_path.split('/') + gcs_path.insert(gcs_path.index('spras')+1, 'spras') + gcs_path = '/'.join(gcs_path) download_gcs(local_path=str(src), gcs_path=gcs_path, is_dir=True, multithread=True) # return location of dsub logs in WORKSPACE_BUCKET - return 'dsub logs: {logs}'.format(logs = flags['logging']).replace('${WORKSPACE}', str(workspace_bucket)) + return 'dsub logs: {logs}'.format(logs = flags['logging']) def run_container_singularity(container: str, command: List[str], volumes: List[Tuple[PurePath, PurePath]], working_dir: str, environment: str = 'SPRAS=True'): """ From 61976b720ff4879771e7b737a26079b64616dff3 Mon Sep 17 00:00:00 2001 From: nisthapanda Date: Wed, 27 Sep 2023 17:56:50 +0000 Subject: [PATCH 04/21] Resolved volume mapping issues --- src/util.py | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/src/util.py b/src/util.py index dc9d574c..ea662817 100644 --- a/src/util.py +++ b/src/util.py @@ -54,7 +54,7 @@ def convert_docker_path(src_path: PurePath, dest_path: PurePath, file_path: Unio rel_path = file_path.relative_to(src_path) return PurePosixPath(dest_path, rel_path) -def download_gcs(gcs_path, local_path, is_dir, multithread): +def download_gcs(gcs_path: str, local_path: str, is_dir: bool, multithread: bool): # check that output path exists if not os.path.exists(Path(local_path).parent): os.makedirs(Path(local_path).parent) @@ -66,11 +66,13 @@ def download_gcs(gcs_path, local_path, is_dir, multithread): cmd = cmd + ' cp' if is_dir: cmd = cmd + ' -r' - cmd = cmd + ' ' + gcs_path + '/ ' + str(Path(local_path).parent) + cmd = cmd + ' ' + gcs_path + '/* ' + local_path print(cmd) + # run command subprocess.run(cmd, shell=True) -def upload_gcs(local_path, gcs_path, is_dir, multithread): + +def upload_gcs(local_path: str, gcs_path: str, is_dir: bool, multithread: bool): # build command cmd = 'gsutil' if multithread: @@ -81,9 +83,10 @@ def upload_gcs(local_path, gcs_path, is_dir, multithread): cmd = cmd + ' ' + str(Path(local_path).resolve()) + ' ' + gcs_path print(cmd) + # run command subprocess.run(cmd, shell=True) -def prepare_dsub_cmd(flags): +def prepare_dsub_cmd(flags: dict): # set constant flags dsub_command = 'dsub' flags['provider'] = 'google-cls-v2' @@ -98,7 +101,7 @@ def prepare_dsub_cmd(flags): flag_list = ["provider", "regions", "zones", "location", "user-project", "project", "network", "subnetwork", "service-account", "image", "env", "logging", "input", "input-recursive", "mount", "output", "output-recursive", "command", "script"] ordered_flags = {f:flags[f] for f in flag_list if f in flags.keys()} - # parse through keyword arguments + # iteratively add flags to the command for flag in ordered_flags.keys(): if isinstance(ordered_flags.get(flag), list): for f in ordered_flags.get(flag): @@ -106,6 +109,7 @@ def prepare_dsub_cmd(flags): else: dsub_command = dsub_command + " --" + flag + " " + ordered_flags.get(flag) + # Wait for dsub job to complegte dsub_command = dsub_command + " --wait" print(f"Command: {dsub_command}") return dsub_command @@ -259,11 +263,7 @@ def run_container_dsub(container: str, command: List[str], volumes: List[Tuple[P else: container_command.append(item) # Add a command to copy the volumes to the workspace buckets - #container_command.extend([';', 'cp', '-r']) - #container_command.extend(['${'+volume[3]+'}' for volume in dsub_volumes]) - #container_command.append('${OUTPUT}') -# container_command.extend([('; cp -r ' + f'/mnt/data/input/gs/{workspace_bucket}' +str(volume[1]) + ' $OUTPUT').replace('gs://', '') for volume in dsub_volumes]) - container_command.append(('; cp -rf ' + f'/mnt/data/input/gs/{workspace_bucket}{working_dir}' + ' $OUTPUT').replace('gs://', '')) + container_command.append(('; cp -rf ' + f'/mnt/data/input/gs/{workspace_bucket}{working_dir}/*' + ' $OUTPUT').replace('gs://', '')) # Make the command into a string flags['command'] = ' '.join(container_command) @@ -288,9 +288,6 @@ def run_container_dsub(container: str, command: List[str], volumes: List[Tuple[P # Pull output volumes from WORKSPACE_BUCKET for src, dst, gcs_path, env in dsub_volumes: - gcs_path = gcs_path.split('/') - gcs_path.insert(gcs_path.index('spras')+1, 'spras') - gcs_path = '/'.join(gcs_path) download_gcs(local_path=str(src), gcs_path=gcs_path, is_dir=True, multithread=True) # return location of dsub logs in WORKSPACE_BUCKET From 1725d1bd6c25f61ce1b635b72ce09015dbece9e5 Mon Sep 17 00:00:00 2001 From: nisthapanda Date: Wed, 27 Sep 2023 18:06:18 +0000 Subject: [PATCH 05/21] Change location of dsub logs in Google Cloud Storage --- src/util.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/util.py b/src/util.py index ea662817..08837e2f 100644 --- a/src/util.py +++ b/src/util.py @@ -278,7 +278,7 @@ def run_container_dsub(container: str, command: List[str], volumes: List[Tuple[P flags['env'] = environment flags['input-recursive'] = [vol[3]+'='+vol[2] for vol in dsub_volumes] flags['output-recursive'] = "OUTPUT=" + workspace_bucket + working_dir - flags['logging'] = workspace_bucket + '/dsub/'+ datetime.now().isoformat().replace('.', '-').replace(':', '-') + flags['logging'] = workspace_bucket + '/dsub/' # Create dsub command dsub_command = prepare_dsub_cmd(flags) From 616cd67c5595dcba92e7c619d19dab5cab757b87 Mon Sep 17 00:00:00 2001 From: nisthapanda Date: Tue, 16 Jan 2024 19:38:46 -0800 Subject: [PATCH 06/21] Move run_container_dsub and helper functions to contianers.py, add temporary fix to use dsub as framework with oi1, change to gcloud storage cp for larger file transfers --- spras/containers.py | 120 ++++++++++++++++++++++++++++++++++++++ spras/omicsintegrator1.py | 4 ++ 2 files changed, 124 insertions(+) diff --git a/spras/containers.py b/spras/containers.py index bdb18acd..e6140338 100644 --- a/spras/containers.py +++ b/spras/containers.py @@ -41,6 +41,62 @@ def convert_docker_path(src_path: PurePath, dest_path: PurePath, file_path: Unio rel_path = file_path.relative_to(src_path) return PurePosixPath(dest_path, rel_path) +def download_gcs(gcs_path: str, local_path: str, is_dir: bool): + # check that output path exists + if not os.path.exists(Path(local_path).parent): + os.makedirs(Path(local_path).parent) + + # build command + cmd = 'gcloud storage' + cmd = cmd + ' cp' + if is_dir: + cmd = cmd + ' -r' + cmd = cmd + ' ' + gcs_path + '/* ' + local_path + + print(cmd) + # run command + subprocess.run(cmd, shell=True) + +def upload_gcs(local_path: str, gcs_path: str, is_dir: bool): + # build command + cmd = 'gcloud storage' + cmd = cmd + ' cp' + if is_dir: + cmd = cmd + ' -r' + cmd = cmd + ' ' + str(Path(local_path).resolve()) + ' ' + gcs_path + + print(cmd) + # run command + subprocess.run(cmd, shell=True) + +def prepare_dsub_cmd(flags: dict): + # set constant flags + dsub_command = 'dsub' + flags['provider'] = 'google-cls-v2' + flags['regions'] = 'us-central1' + flags['user-project'] = os.getenv('GOOGLE_PROJECT') + flags['project'] = os.getenv('GOOGLE_PROJECT') + flags['network'] = 'network' + flags['subnetwork'] = 'subnetwork' + flags['service-account'] = subprocess.run(['gcloud', 'config' ,'get-value' ,'account'], capture_output=True, text=True).stdout.replace('\n', '') + + # order flags according to flag_list + flag_list = ["provider", "regions", "zones", "location", "user-project", "project", "network", "subnetwork", "service-account", "image", "env", "logging", "input", "input-recursive", "mount", "output", "output-recursive", "command", "script"] + ordered_flags = {f:flags[f] for f in flag_list if f in flags.keys()} + + # iteratively add flags to the command + for flag in ordered_flags.keys(): + if isinstance(ordered_flags.get(flag), list): + for f in ordered_flags.get(flag): + dsub_command = dsub_command + " --" + flag + " " + f + else: + dsub_command = dsub_command + " --" + flag + " " + ordered_flags.get(flag) + + # Wait for dsub job to complegte + dsub_command = dsub_command + " --wait" + print(f"Command: {dsub_command}") + return dsub_command + # TODO consider a better default environment variable # TODO environment currently a single string (e.g. 'TMPDIR=/OmicsIntegrator1'), should it be a list? @@ -65,6 +121,8 @@ def run_container(framework: str, container_suffix: str, command: List[str], vol return run_container_docker(container, command, volumes, working_dir, environment) elif normalized_framework == 'singularity': return run_container_singularity(container, command, volumes, working_dir, environment) + elif normalized_framework == 'dsub': + return run_container_dsub(container, command, volumes, working_dir, environment) else: raise ValueError(f'{framework} is not a recognized container framework. Choose "docker" or "singularity".') @@ -232,3 +290,65 @@ def prepare_volume(filename: Union[str, PurePath], volume_base: Union[str, PureP src = parent return (src, dest), container_filename + +def run_container_dsub(container: str, command: List[str], volumes: List[Tuple[PurePath, PurePath]], working_dir: str, environment: str = 'SPRAS=True'): + """ + Runs a command in the container using Docker. + Attempts to automatically correct file owner and group for new files created by the container, setting them to the + current owner and group IDs. + Does not modify the owner or group for existing files modified by the container. + @param container: name of the container in the Google Cloud Container Registry + @param command: command to run in the container + @param volumes: a list of volumes to mount where each item is a (source, destination) tuple + @param working_dir: the working directory in the container + @param environment: environment variables to set in the container + @return: path of output from dsub + """ + # Dictionary of flags for dsub command + flags = dict() + + workspace_bucket = os.getenv('WORKSPACE_BUCKET') + # Add path in the workspace bucket and label for dsub command for each volume + dsub_volumes = [(src, dst, workspace_bucket + str(dst), "INPUT_" + str(i),) for i, (src, dst) in enumerate(volumes)] + + # Prepare command that will be run inside the container for dsub + container_command = list() + for item in command: + # Replace each volume with path in workspace + to_replace = ["${"+path[3]+'}' for path in dsub_volumes if str(path[1]) in item] + if len(to_replace) == 1 and len(PurePath(item).suffix) > 0: + container_command.append(to_replace[0]+'/'+item.split('/')[-1]) + elif len(to_replace) == 1 and len(PurePath(item).suffix) == 0: + container_command.append(to_replace[0]+'/') + else: + container_command.append(item) + # Add a command to copy the volumes to the workspace buckets + container_command.append(('; cp -rf ' + f'/mnt/data/input/gs/{workspace_bucket}{working_dir}/*' + ' $OUTPUT').replace('gs://', '')) + + # Make the command into a string + flags['command'] = ' '.join(container_command) + flags['command'] = "'" + flags['command'] + "'" + + ## Push volumes to WORKSPACE_BUCKET + for src, dst, gcs_path, env in dsub_volumes: + upload_gcs(local_path=str(src), gcs_path=gcs_path, is_dir=True) + + ## Prepare flags for dsub command + flags['image'] = container + flags['env'] = environment + flags['input-recursive'] = [vol[3]+'='+vol[2] for vol in dsub_volumes] + flags['output-recursive'] = "OUTPUT=" + workspace_bucket + working_dir + flags['logging'] = workspace_bucket + '/dsub/' + + # Create dsub command + dsub_command = prepare_dsub_cmd(flags) + + # Run dsub as subprocess + subprocess.run(dsub_command, shell=True) + + # Pull output volumes from WORKSPACE_BUCKET + for src, dst, gcs_path, env in dsub_volumes: + download_gcs(local_path=str(src), gcs_path=gcs_path, is_dir=True) + + # return location of dsub logs in WORKSPACE_BUCKET + return 'dsub logs: {logs}'.format(logs = flags['logging']) \ No newline at end of file diff --git a/spras/omicsintegrator1.py b/spras/omicsintegrator1.py index e23ea9f9..a820862d 100644 --- a/spras/omicsintegrator1.py +++ b/spras/omicsintegrator1.py @@ -160,6 +160,10 @@ def run(edges=None, prizes=None, dummy_mode=None, mu_squared=None, exclude_terms print('Running Omics Integrator 1 with arguments: {}'.format(' '.join(command)), flush=True) container_suffix = "omics-integrator-1:no-conda" # no-conda version is the default + + # temporary solution for dsub backend + container_framework = 'dsub' + out = run_container(container_framework, container_suffix, # no-conda version is the default command, From 11d045038fd90795a8ac02be0f80a6d9e01b90d4 Mon Sep 17 00:00:00 2001 From: nisthapanda Date: Tue, 16 Jan 2024 19:40:49 -0800 Subject: [PATCH 07/21] Update dsub version in environment.yml --- environment.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/environment.yml b/environment.yml index dad7c91c..07df8a5f 100644 --- a/environment.yml +++ b/environment.yml @@ -26,4 +26,4 @@ dependencies: - pip: - graphspace_python==1.3.1 - sphinx-rtd-theme==1.0.0 - - dsub==0.4.9 + - dsub==0.4.10 From 2769ab07d2f5eed14966e881d5674485e86107a3 Mon Sep 17 00:00:00 2001 From: nisthapanda Date: Tue, 16 Jan 2024 19:48:35 -0800 Subject: [PATCH 08/21] Update dsub version in environment.yml --- spras/containers.py | 2 ++ spras/util.py | 2 -- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/spras/containers.py b/spras/containers.py index e6140338..0d98bdea 100644 --- a/spras/containers.py +++ b/spras/containers.py @@ -9,6 +9,8 @@ import spras.config as config from spras.util import hash_filename +import subprocess +from datetime import datetime def prepare_path_docker(orig_path: PurePath) -> str: """ diff --git a/spras/util.py b/spras/util.py index 8f43a295..90b4c101 100644 --- a/spras/util.py +++ b/spras/util.py @@ -9,8 +9,6 @@ from typing import Any, Dict, List, Optional, Tuple, Union import pandas as pd -import subprocess -from datetime import datetime def hash_params_sha1_base32(params_dict: Dict[str, Any], length: Optional[int] = None) -> str: """ From 6b2a3c3667b2f7719cd3fe40db41550bb71dade9 Mon Sep 17 00:00:00 2001 From: nisthapanda Date: Tue, 16 Jan 2024 19:53:50 -0800 Subject: [PATCH 09/21] Update sphinx-rtd-theme version in environment.yml --- environment.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/environment.yml b/environment.yml index 07df8a5f..95207496 100644 --- a/environment.yml +++ b/environment.yml @@ -25,5 +25,5 @@ dependencies: - sphinx=5.0 - pip: - graphspace_python==1.3.1 - - sphinx-rtd-theme==1.0.0 + - sphinx-rtd-theme==1.2.0 - dsub==0.4.10 From 28682a8af67e433e4fce5b51ca7d163bcc328135 Mon Sep 17 00:00:00 2001 From: nisthapanda Date: Thu, 13 Jun 2024 12:33:32 -0500 Subject: [PATCH 10/21] Change to use gcloud storage rsync to copy files for faster transfer of larger files --- spras/containers.py | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/spras/containers.py b/spras/containers.py index 0d98bdea..371aba0d 100644 --- a/spras/containers.py +++ b/spras/containers.py @@ -50,19 +50,28 @@ def download_gcs(gcs_path: str, local_path: str, is_dir: bool): # build command cmd = 'gcloud storage' - cmd = cmd + ' cp' + # rsync with checksums to make file transfer faster for larger files + cmd = cmd + ' rsync --checksums-only' + # check if directory if is_dir: cmd = cmd + ' -r' - cmd = cmd + ' ' + gcs_path + '/* ' + local_path + cmd = cmd + ' ' + gcs_path + ' ' + local_path print(cmd) # run command subprocess.run(cmd, shell=True) def upload_gcs(local_path: str, gcs_path: str, is_dir: bool): - # build command - cmd = 'gcloud storage' - cmd = cmd + ' cp' + # check if path exists in cloud storage + exists = len(subprocess.run(f'gcloud storage ls {gcs_path}', shell=True, capture_output=True, text=True).stdout) + # if path exists rsyc + if exists > 0: + cmd = 'gcloud storage rsync --checksums-only' + # else copy path to cloud storage + else: + cmd = 'gcloud storage cp -c' + + # check if directory if is_dir: cmd = cmd + ' -r' cmd = cmd + ' ' + str(Path(local_path).resolve()) + ' ' + gcs_path From 2f8723a69f515548016b9df067a2e3f10c7c6b05 Mon Sep 17 00:00:00 2001 From: nisthapanda Date: Thu, 13 Jun 2024 12:35:15 -0500 Subject: [PATCH 11/21] Change the the container tag to match container in GCR --- spras/omicsintegrator1.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spras/omicsintegrator1.py b/spras/omicsintegrator1.py index a820862d..af708957 100644 --- a/spras/omicsintegrator1.py +++ b/spras/omicsintegrator1.py @@ -11,7 +11,7 @@ # TODO decide on default number of processes and threads -def write_conf(filename=Path('config.txt'), w=None, b=None, d=None, mu=None, noise=None, g=None, r=None): +def write_conf(filename=Path('config.txt'), wne, b=None, d=None, mu=None, noise=None, g=None, r=None): """ Write the configuration file for Omics Integrator 1 See https://github.com/fraenkel-lab/OmicsIntegrator#required-inputs @@ -159,7 +159,7 @@ def run(edges=None, prizes=None, dummy_mode=None, mu_squared=None, exclude_terms print('Running Omics Integrator 1 with arguments: {}'.format(' '.join(command)), flush=True) - container_suffix = "omics-integrator-1:no-conda" # no-conda version is the default + container_suffix = "omics-integrator-1:no_conda" # no-conda version is the default # temporary solution for dsub backend container_framework = 'dsub' From cea8505e3dc432f15fcf6d28e263d482327db209 Mon Sep 17 00:00:00 2001 From: nisthapanda Date: Tue, 25 Jun 2024 23:00:35 +0000 Subject: [PATCH 12/21] Remove temporary solution for running OI1 with dsub --- spras/omicsintegrator1.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/spras/omicsintegrator1.py b/spras/omicsintegrator1.py index af708957..7427399b 100644 --- a/spras/omicsintegrator1.py +++ b/spras/omicsintegrator1.py @@ -11,7 +11,7 @@ # TODO decide on default number of processes and threads -def write_conf(filename=Path('config.txt'), wne, b=None, d=None, mu=None, noise=None, g=None, r=None): +def write_conf(filename=Path('config.txt'), w=None, b=None, d=None, mu=None, noise=None, g=None, r=None): """ Write the configuration file for Omics Integrator 1 See https://github.com/fraenkel-lab/OmicsIntegrator#required-inputs @@ -159,10 +159,7 @@ def run(edges=None, prizes=None, dummy_mode=None, mu_squared=None, exclude_terms print('Running Omics Integrator 1 with arguments: {}'.format(' '.join(command)), flush=True) - container_suffix = "omics-integrator-1:no_conda" # no-conda version is the default - - # temporary solution for dsub backend - container_framework = 'dsub' + container_suffix = "omics-integrator-1:no-conda" # no-conda version is the default out = run_container(container_framework, container_suffix, # no-conda version is the default From 6d6f4bb8a800217c046fd46e3cdf4b1af5ccc98b Mon Sep 17 00:00:00 2001 From: nisthapanda Date: Tue, 25 Jun 2024 23:01:32 +0000 Subject: [PATCH 13/21] Add dsub as a possible framework --- spras/config.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spras/config.py b/spras/config.py index 100b2779..78a0ee74 100644 --- a/spras/config.py +++ b/spras/config.py @@ -61,7 +61,7 @@ def __init__(self, raw_config): # __init__ makes clear exactly what is being configured. # Directory used for storing output self.out_dir = None - # Container framework used by PRMs. Valid options are "docker" and "singularity" + # Container framework used by PRMs. Valid options are "docker", "dsub", and "singularity" self.container_framework = None # The container prefix (host and organization) to use for images. Default is "docker.io/reedcompbio" self.container_prefix = DEFAULT_CONTAINER_PREFIX @@ -107,7 +107,7 @@ def process_config(self, raw_config): # However, if we get a bad value, we raise an exception. if "container_framework" in raw_config: container_framework = raw_config["container_framework"].lower() - if container_framework not in ("docker", "singularity"): + if container_framework not in ("docker", "singularity", "dsub"): msg = "SPRAS was configured to run with an unknown container framework: '" + raw_config["container_framework"] + "'. Accepted values are 'docker' or 'singularity'." raise ValueError(msg) self.container_framework = container_framework From 816ec07fc4c267ce25ddf53a3f92953ff772a03d Mon Sep 17 00:00:00 2001 From: nisthapanda Date: Tue, 25 Jun 2024 23:03:45 +0000 Subject: [PATCH 14/21] Create workaround for uploading empty directories to GCS, Change how dsub file paths are handled in container command --- spras/containers.py | 35 ++++++++++++++++++++++++++--------- 1 file changed, 26 insertions(+), 9 deletions(-) diff --git a/spras/containers.py b/spras/containers.py index 371aba0d..6bf7d5fa 100644 --- a/spras/containers.py +++ b/spras/containers.py @@ -61,16 +61,24 @@ def download_gcs(gcs_path: str, local_path: str, is_dir: bool): # run command subprocess.run(cmd, shell=True) + if is_dir and Path(Path(local_path)/'gcs_temp.txt').exists(): + os.remove(Path(Path(local_path)/'gcs_temp.txt')) + def upload_gcs(local_path: str, gcs_path: str, is_dir: bool): # check if path exists in cloud storage exists = len(subprocess.run(f'gcloud storage ls {gcs_path}', shell=True, capture_output=True, text=True).stdout) # if path exists rsyc if exists > 0: cmd = 'gcloud storage rsync --checksums-only' + # if directory is empty + elif exists == 0 and len(os.listdir(local_path)) == 0: + # create a temporary file because GCS will not recognize empty directories + Path(Path(local_path)/'gcs_temp.txt').touch() + # copy path to cloud storage + cmd = 'gcloud storage cp -c' # else copy path to cloud storage else: cmd = 'gcloud storage cp -c' - # check if directory if is_dir: cmd = cmd + ' -r' @@ -321,18 +329,27 @@ def run_container_dsub(container: str, command: List[str], volumes: List[Tuple[P workspace_bucket = os.getenv('WORKSPACE_BUCKET') # Add path in the workspace bucket and label for dsub command for each volume dsub_volumes = [(src, dst, workspace_bucket + str(dst), "INPUT_" + str(i),) for i, (src, dst) in enumerate(volumes)] - + # Prepare command that will be run inside the container for dsub container_command = list() for item in command: - # Replace each volume with path in workspace - to_replace = ["${"+path[3]+'}' for path in dsub_volumes if str(path[1]) in item] - if len(to_replace) == 1 and len(PurePath(item).suffix) > 0: - container_command.append(to_replace[0]+'/'+item.split('/')[-1]) - elif len(to_replace) == 1 and len(PurePath(item).suffix) == 0: - container_command.append(to_replace[0]+'/') + # Find if item is volume + to_replace = [(str(path[1]), "${"+path[3]+'}') for path in dsub_volumes if str(path[1]) in item] + # Replace volume path with dsub volume path + if len(to_replace) == 1: + # Get path that will be replaced + path = to_replace[0][0] + # Get dsub input variable that will replace path + env_variable = to_replace[0][1] + # Replace path with env_variable + container_path = item.replace(path, env_variable) + # Add / if there is no suffix + if container_path == env_variable: + container_path = container_path + '/' + container_command.append(container_path) else: container_command.append(item) + # Add a command to copy the volumes to the workspace buckets container_command.append(('; cp -rf ' + f'/mnt/data/input/gs/{workspace_bucket}{working_dir}/*' + ' $OUTPUT').replace('gs://', '')) @@ -362,4 +379,4 @@ def run_container_dsub(container: str, command: List[str], volumes: List[Tuple[P download_gcs(local_path=str(src), gcs_path=gcs_path, is_dir=True) # return location of dsub logs in WORKSPACE_BUCKET - return 'dsub logs: {logs}'.format(logs = flags['logging']) \ No newline at end of file + return 'dsub logs: {logs}'.format(logs = flags['logging']) From e9be16b519383dbf93b7346250eecdfeb76995c9 Mon Sep 17 00:00:00 2001 From: nisthapanda Date: Tue, 25 Jun 2024 23:04:43 +0000 Subject: [PATCH 15/21] Add dsub volume mount prefix to paths in meo-properties.txt file --- spras/meo.py | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/spras/meo.py b/spras/meo.py index ba962d92..d8fcf1c2 100644 --- a/spras/meo.py +++ b/spras/meo.py @@ -1,6 +1,7 @@ from pathlib import Path import pandas as pd +import os from spras.containers import prepare_volume, run_container from spras.interactome import ( @@ -17,7 +18,7 @@ # Does not support MINSAT or MAXCSP # TODO add parameter validation def write_properties(filename=Path('properties.txt'), edges=None, sources=None, targets=None, edge_output=None, - path_output=None, max_path_length=None, local_search=None, rand_restarts=None): + path_output=None, max_path_length=None, local_search=None, rand_restarts=None, framework='docker'): """ Write the properties file for Maximum Edge Orientation See https://github.com/agitter/meo/blob/master/sample.props for property descriptions and the default values at @@ -29,7 +30,19 @@ def write_properties(filename=Path('properties.txt'), edges=None, sources=None, if edges is None or sources is None or targets is None or edge_output is None or path_output is None: raise ValueError('Required Maximum Edge Orientation properties file arguments are missing') + if framework == 'dsub': + # Get path inside dsub container + workspace_bucket = os.getenv('WORKSPACE_BUCKET') + input_prefix = f'/mnt/data/input/gs/{workspace_bucket}'.replace('gs://', '') + # Add input prefix to all MEO paths + edges = input_prefix + edges + sources = input_prefix + sources + targets = input_prefix + targets + edge_output = input_prefix + edge_output + path_output = input_prefix + path_output + with open(filename, 'w') as f: + print(Path(edges).as_posix(), Path(sources).as_posix(), Path(targets).as_posix(), Path(edge_output).as_posix(), Path(path_output).as_posix()) # Write the required properties f.write(f'edges.file = {Path(edges).as_posix()}\n') f.write(f'sources.file = {Path(sources).as_posix()}\n') @@ -150,7 +163,7 @@ def run(edges=None, sources=None, targets=None, output_file=None, max_path_lengt properties_file_local = Path(out_dir, properties_file) write_properties(filename=properties_file_local, edges=edge_file, sources=source_file, targets=target_file, edge_output=mapped_output_file, path_output=mapped_path_output, - max_path_length=max_path_length, local_search=local_search, rand_restarts=rand_restarts) + max_path_length=max_path_length, local_search=local_search, rand_restarts=rand_restarts, framework=container_framework) bind_path, properties_file = prepare_volume(str(properties_file_local), work_dir) volumes.append(bind_path) From 53eaf35b2ab34e967f8710444a3eeeb30c089694 Mon Sep 17 00:00:00 2001 From: nisthapanda Date: Tue, 25 Jun 2024 23:10:29 +0000 Subject: [PATCH 16/21] Remove unused package and extra spaces --- spras/containers.py | 1 - spras/omicsintegrator1.py | 1 - spras/util.py | 1 + 3 files changed, 1 insertion(+), 2 deletions(-) diff --git a/spras/containers.py b/spras/containers.py index 6bf7d5fa..4c8ebf70 100644 --- a/spras/containers.py +++ b/spras/containers.py @@ -10,7 +10,6 @@ from spras.util import hash_filename import subprocess -from datetime import datetime def prepare_path_docker(orig_path: PurePath) -> str: """ diff --git a/spras/omicsintegrator1.py b/spras/omicsintegrator1.py index 7427399b..e23ea9f9 100644 --- a/spras/omicsintegrator1.py +++ b/spras/omicsintegrator1.py @@ -160,7 +160,6 @@ def run(edges=None, prizes=None, dummy_mode=None, mu_squared=None, exclude_terms print('Running Omics Integrator 1 with arguments: {}'.format(' '.join(command)), flush=True) container_suffix = "omics-integrator-1:no-conda" # no-conda version is the default - out = run_container(container_framework, container_suffix, # no-conda version is the default command, diff --git a/spras/util.py b/spras/util.py index 90b4c101..ea6cd952 100644 --- a/spras/util.py +++ b/spras/util.py @@ -10,6 +10,7 @@ import pandas as pd + def hash_params_sha1_base32(params_dict: Dict[str, Any], length: Optional[int] = None) -> str: """ Hash of a dictionary. From ce951cd002297c3883f0150458c786eb3f990479 Mon Sep 17 00:00:00 2001 From: nisthapanda Date: Tue, 25 Jun 2024 23:38:25 +0000 Subject: [PATCH 17/21] Remove print statement, remove trailing spaces --- spras/containers.py | 38 +++++++++++++++++++------------------- spras/meo.py | 5 ++--- 2 files changed, 21 insertions(+), 22 deletions(-) diff --git a/spras/containers.py b/spras/containers.py index 4c8ebf70..a8ab8bb3 100644 --- a/spras/containers.py +++ b/spras/containers.py @@ -1,6 +1,7 @@ import os import platform import re +import subprocess from pathlib import Path, PurePath, PurePosixPath from typing import Any, Dict, List, Optional, Tuple, Union @@ -9,7 +10,6 @@ import spras.config as config from spras.util import hash_filename -import subprocess def prepare_path_docker(orig_path: PurePath) -> str: """ @@ -57,11 +57,11 @@ def download_gcs(gcs_path: str, local_path: str, is_dir: bool): cmd = cmd + ' ' + gcs_path + ' ' + local_path print(cmd) - # run command + # run command subprocess.run(cmd, shell=True) if is_dir and Path(Path(local_path)/'gcs_temp.txt').exists(): - os.remove(Path(Path(local_path)/'gcs_temp.txt')) + os.remove(Path(Path(local_path)/'gcs_temp.txt')) def upload_gcs(local_path: str, gcs_path: str, is_dir: bool): # check if path exists in cloud storage @@ -78,7 +78,7 @@ def upload_gcs(local_path: str, gcs_path: str, is_dir: bool): # else copy path to cloud storage else: cmd = 'gcloud storage cp -c' - # check if directory + # check if directory if is_dir: cmd = cmd + ' -r' cmd = cmd + ' ' + str(Path(local_path).resolve()) + ' ' + gcs_path @@ -86,7 +86,7 @@ def upload_gcs(local_path: str, gcs_path: str, is_dir: bool): print(cmd) # run command subprocess.run(cmd, shell=True) - + def prepare_dsub_cmd(flags: dict): # set constant flags dsub_command = 'dsub' @@ -102,7 +102,7 @@ def prepare_dsub_cmd(flags: dict): flag_list = ["provider", "regions", "zones", "location", "user-project", "project", "network", "subnetwork", "service-account", "image", "env", "logging", "input", "input-recursive", "mount", "output", "output-recursive", "command", "script"] ordered_flags = {f:flags[f] for f in flag_list if f in flags.keys()} - # iteratively add flags to the command + # iteratively add flags to the command for flag in ordered_flags.keys(): if isinstance(ordered_flags.get(flag), list): for f in ordered_flags.get(flag): @@ -315,7 +315,7 @@ def run_container_dsub(container: str, command: List[str], volumes: List[Tuple[P Attempts to automatically correct file owner and group for new files created by the container, setting them to the current owner and group IDs. Does not modify the owner or group for existing files modified by the container. - @param container: name of the container in the Google Cloud Container Registry + @param container: name of the container in the Google Cloud Container Registry @param command: command to run in the container @param volumes: a list of volumes to mount where each item is a (source, destination) tuple @param working_dir: the working directory in the container @@ -329,18 +329,18 @@ def run_container_dsub(container: str, command: List[str], volumes: List[Tuple[P # Add path in the workspace bucket and label for dsub command for each volume dsub_volumes = [(src, dst, workspace_bucket + str(dst), "INPUT_" + str(i),) for i, (src, dst) in enumerate(volumes)] - # Prepare command that will be run inside the container for dsub + # Prepare command that will be run inside the container for dsub container_command = list() for item in command: - # Find if item is volume + # Find if item is volume to_replace = [(str(path[1]), "${"+path[3]+'}') for path in dsub_volumes if str(path[1]) in item] # Replace volume path with dsub volume path if len(to_replace) == 1: # Get path that will be replaced path = to_replace[0][0] - # Get dsub input variable that will replace path + # Get dsub input variable that will replace path env_variable = to_replace[0][1] - # Replace path with env_variable + # Replace path with env_variable container_path = item.replace(path, env_variable) # Add / if there is no suffix if container_path == env_variable: @@ -355,27 +355,27 @@ def run_container_dsub(container: str, command: List[str], volumes: List[Tuple[P # Make the command into a string flags['command'] = ' '.join(container_command) flags['command'] = "'" + flags['command'] + "'" - + ## Push volumes to WORKSPACE_BUCKET for src, dst, gcs_path, env in dsub_volumes: upload_gcs(local_path=str(src), gcs_path=gcs_path, is_dir=True) - + ## Prepare flags for dsub command flags['image'] = container flags['env'] = environment flags['input-recursive'] = [vol[3]+'='+vol[2] for vol in dsub_volumes] flags['output-recursive'] = "OUTPUT=" + workspace_bucket + working_dir flags['logging'] = workspace_bucket + '/dsub/' - - # Create dsub command + + # Create dsub command dsub_command = prepare_dsub_cmd(flags) - + # Run dsub as subprocess subprocess.run(dsub_command, shell=True) - + # Pull output volumes from WORKSPACE_BUCKET for src, dst, gcs_path, env in dsub_volumes: - download_gcs(local_path=str(src), gcs_path=gcs_path, is_dir=True) - + download_gcs(local_path=str(src), gcs_path=gcs_path, is_dir=True) + # return location of dsub logs in WORKSPACE_BUCKET return 'dsub logs: {logs}'.format(logs = flags['logging']) diff --git a/spras/meo.py b/spras/meo.py index d8fcf1c2..d1dd2a85 100644 --- a/spras/meo.py +++ b/spras/meo.py @@ -1,7 +1,7 @@ +import os from pathlib import Path import pandas as pd -import os from spras.containers import prepare_volume, run_container from spras.interactome import ( @@ -39,10 +39,9 @@ def write_properties(filename=Path('properties.txt'), edges=None, sources=None, sources = input_prefix + sources targets = input_prefix + targets edge_output = input_prefix + edge_output - path_output = input_prefix + path_output + path_output = input_prefix + path_output with open(filename, 'w') as f: - print(Path(edges).as_posix(), Path(sources).as_posix(), Path(targets).as_posix(), Path(edge_output).as_posix(), Path(path_output).as_posix()) # Write the required properties f.write(f'edges.file = {Path(edges).as_posix()}\n') f.write(f'sources.file = {Path(sources).as_posix()}\n') From c225dcf889022928c4631aece087d170f99ff77c Mon Sep 17 00:00:00 2001 From: nisthapanda Date: Tue, 25 Jun 2024 23:52:43 +0000 Subject: [PATCH 18/21] Remove trailing spaces, code formatting --- spras/containers.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/spras/containers.py b/spras/containers.py index a8ab8bb3..a8da60d4 100644 --- a/spras/containers.py +++ b/spras/containers.py @@ -320,7 +320,7 @@ def run_container_dsub(container: str, command: List[str], volumes: List[Tuple[P @param volumes: a list of volumes to mount where each item is a (source, destination) tuple @param working_dir: the working directory in the container @param environment: environment variables to set in the container - @return: path of output from dsub + @return: path of output from dsub """ # Dictionary of flags for dsub command flags = dict() @@ -357,7 +357,7 @@ def run_container_dsub(container: str, command: List[str], volumes: List[Tuple[P flags['command'] = "'" + flags['command'] + "'" ## Push volumes to WORKSPACE_BUCKET - for src, dst, gcs_path, env in dsub_volumes: + for src, _dst, gcs_path, _env in dsub_volumes: upload_gcs(local_path=str(src), gcs_path=gcs_path, is_dir=True) ## Prepare flags for dsub command @@ -374,7 +374,7 @@ def run_container_dsub(container: str, command: List[str], volumes: List[Tuple[P subprocess.run(dsub_command, shell=True) # Pull output volumes from WORKSPACE_BUCKET - for src, dst, gcs_path, env in dsub_volumes: + for src, _dst, gcs_path, _env in dsub_volumes: download_gcs(local_path=str(src), gcs_path=gcs_path, is_dir=True) # return location of dsub logs in WORKSPACE_BUCKET From 5452b6fa960690fac5d36bac1f1cdbbbfe40927d Mon Sep 17 00:00:00 2001 From: nisthapanda Date: Tue, 20 Aug 2024 21:01:53 -0500 Subject: [PATCH 19/21] Add dependencies from dsub to conda environment --- environment.yml | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/environment.yml b/environment.yml index 99a0b855..1689320f 100644 --- a/environment.yml +++ b/environment.yml @@ -17,6 +17,12 @@ dependencies: - scikit-learn=1.2 - seaborn=0.12 - spython=0.2 + # for dsub + - python-dateutil<=2.9.0 + - pytz<=2024.1 + - pyyaml<=6.0.1 + - tenacity<=8.2.3 + - tabulate<=0.9.0 # Only required for GraphSpace - commonmark=0.9 - docutils=0.19 @@ -27,4 +33,4 @@ dependencies: - pip: - graphspace_python==1.3.1 - sphinx-rtd-theme==2.0.0 - - dsub==0.4.10 + - dsub==0.4.13 From f80c3fe6521c3f9c25404912eeaf6994f8f6bb0d Mon Sep 17 00:00:00 2001 From: Justin Hiemstra Date: Tue, 10 Dec 2024 08:59:53 -0600 Subject: [PATCH 20/21] Add minimal `dsub` framework documentation in config.yaml As Tony pointed out in the PR review, Google Cloud Life Sciences is being deprecated, so we're not sure how much longer this will be around. Instead of going through and giving the `dsub` framework verbose documentation, we explicitly state it's usage is experimental and warn the user that they should already know what they're doing if they plan to use it. --- config/config.yaml | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/config/config.yaml b/config/config.yaml index b87bcd45..a270f106 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -3,8 +3,10 @@ # The length of the hash used to identify a parameter combination hash_length: 7 -# Specify the container framework. Current supported versions include 'docker' and -# 'singularity'. If container_framework is not specified, SPRAS will default to docker. +# Specify the container framework used by each PRM wrapper. Valid options include: +# - docker (default if not specified) +# - singularity -- Also known as apptainer, useful in HPC/HTC environments where docker isn't allowed +# - dsub -- experimental with limited support, used for running on Google Cloud container_framework: docker # Only used if container_framework is set to singularity, this will unpack the singularity containers From 5aa72eef13629737a51bed10a820e5402851708f Mon Sep 17 00:00:00 2001 From: Justin Hiemstra Date: Tue, 10 Dec 2024 09:05:18 -0600 Subject: [PATCH 21/21] Add runtime warning if user selects `dsub` framework This prints a warning for the user if they select the `dsub` framework. Since we're unsure whether we'll need to support dsub in the future (as our primary motivation for using it is being deprecated), this warning should prevent potential users from getting their hopes up that we have long-term plans for it. --- spras/config.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/spras/config.py b/spras/config.py index c8d1a450..db1824ab 100644 --- a/spras/config.py +++ b/spras/config.py @@ -117,6 +117,8 @@ def process_config(self, raw_config): if container_framework not in ("docker", "singularity", "dsub"): msg = "SPRAS was configured to run with an unknown container framework: '" + raw_config["container_framework"] + "'. Accepted values are 'docker' or 'singularity'." raise ValueError(msg) + if container_framework == "dsub": + print("Warning: 'dsub' framework integration is experimental and may not be fully supported.") self.container_framework = container_framework else: self.container_framework = "docker"