From bc7d01c393f34c06235573769127fd0e1ec7dc57 Mon Sep 17 00:00:00 2001 From: Lin Guo Date: Tue, 14 Jan 2025 23:00:38 -0800 Subject: [PATCH 1/3] Add partition as a workload_variable --- .../slurm_workflow_manager.py | 12 +++++++++++- .../workflow_managers/slurm/workflow_manager.py | 9 ++++++++- 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/lib/ramble/ramble/test/workflow_manager_functionality/slurm_workflow_manager.py b/lib/ramble/ramble/test/workflow_manager_functionality/slurm_workflow_manager.py index a2aae7605..63ebee6cf 100644 --- a/lib/ramble/ramble/test/workflow_manager_functionality/slurm_workflow_manager.py +++ b/lib/ramble/ramble/test/workflow_manager_functionality/slurm_workflow_manager.py @@ -33,6 +33,7 @@ def test_slurm_workflow(): batch_submit: echo {wm_name} mpi_command: mpirun -n {n_ranks} -hostfile hostfile processes_per_node: 1 + n_nodes: 1 wm_name: ['None', 'slurm'] applications: hostname: @@ -41,8 +42,10 @@ def test_slurm_workflow(): experiments: test_{wm_name}: variables: - n_nodes: 1 extra_sbatch_headers: "#SBATCH --gpus-per-task={n_threads}" + test_{wm_name}_2: + variables: + partition: h3 """ with ramble.workspace.create(workspace_name) as ws: ws.write() @@ -83,9 +86,16 @@ def test_slurm_workflow(): content = f.read() assert "scontrol show hostnames" in content assert "#SBATCH --gpus-per-task=1" in content + assert "#SBATCH -p" not in content with open(os.path.join(path, "batch_query")) as f: content = f.read() assert "squeue" in content with open(os.path.join(path, "batch_cancel")) as f: content = f.read() assert "scancel" in content + + # Assert on the experiment with non-empty partition variable given + path = os.path.join(ws.experiment_dir, "hostname", "local", "test_slurm_2") + with open(os.path.join(path, "slurm_execute_experiment")) as f: + content = f.read() + assert "#SBATCH -p h3" in content diff --git a/var/ramble/repos/builtin/workflow_managers/slurm/workflow_manager.py b/var/ramble/repos/builtin/workflow_managers/slurm/workflow_manager.py index 105a1c855..68b57158a 100644 --- a/var/ramble/repos/builtin/workflow_managers/slurm/workflow_manager.py +++ b/var/ramble/repos/builtin/workflow_managers/slurm/workflow_manager.py @@ -78,6 +78,12 @@ def __init__(self, file_path): description="mpirun prefix, mostly served as an overridable default", ) + workflow_manager_variable( + name="partition", + default="", + description="partition to submit job to, if unspecified, it uses the default partition", + ) + register_template( name="batch_submit", src_name="batch_submit.tpl", @@ -109,13 +115,14 @@ def _execute_vars(self): # Adding pre-defined and custom headers pragmas = [ ("#SBATCH -N {n_nodes}"), - ("#SBATCH -p {partition}"), ("#SBATCH --ntasks-per-node {processes_per_node}"), ("#SBATCH -J {job_name}"), ("#SBATCH -o {experiment_run_dir}/slurm-%j.out"), ("#SBATCH -e {experiment_run_dir}/slurm-%j.err"), ("#SBATCH --gpus-per-node {gpus_per_node}"), ] + if expander.expand_var_name("partition"): + pragmas.append("#SBATCH -p {partition}") try: extra_sbatch_headers_raw = expander.expand_var_name( "extra_sbatch_headers", allow_passthrough=False From cf340c8743ac31f35d4536b7a88c991436c5f99a Mon Sep 17 00:00:00 2001 From: Lin Guo Date: Tue, 14 Jan 2025 23:40:45 -0800 Subject: [PATCH 2/3] Do not dump the whole `sbatch_extra_headers` --- .../slurm_workflow_manager.py | 5 ++++- .../workflow_managers/slurm/workflow_manager.py | 13 ++++--------- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/lib/ramble/ramble/test/workflow_manager_functionality/slurm_workflow_manager.py b/lib/ramble/ramble/test/workflow_manager_functionality/slurm_workflow_manager.py index 63ebee6cf..54c82ed1f 100644 --- a/lib/ramble/ramble/test/workflow_manager_functionality/slurm_workflow_manager.py +++ b/lib/ramble/ramble/test/workflow_manager_functionality/slurm_workflow_manager.py @@ -42,7 +42,9 @@ def test_slurm_workflow(): experiments: test_{wm_name}: variables: - extra_sbatch_headers: "#SBATCH --gpus-per-task={n_threads}" + extra_sbatch_headers: | + #SBATCH --gpus-per-task={n_threads} + #SBATCH --time={time_limit_not_exist} test_{wm_name}_2: variables: partition: h3 @@ -87,6 +89,7 @@ def test_slurm_workflow(): assert "scontrol show hostnames" in content assert "#SBATCH --gpus-per-task=1" in content assert "#SBATCH -p" not in content + assert "#SBATCH --time" not in content with open(os.path.join(path, "batch_query")) as f: content = f.read() assert "squeue" in content diff --git a/var/ramble/repos/builtin/workflow_managers/slurm/workflow_manager.py b/var/ramble/repos/builtin/workflow_managers/slurm/workflow_manager.py index 68b57158a..6d18b6e5c 100644 --- a/var/ramble/repos/builtin/workflow_managers/slurm/workflow_manager.py +++ b/var/ramble/repos/builtin/workflow_managers/slurm/workflow_manager.py @@ -9,7 +9,6 @@ import os from ramble.wmkit import * -from ramble.expander import ExpanderError from ramble.application import experiment_status from spack.util.executable import ProcessError @@ -123,14 +122,10 @@ def _execute_vars(self): ] if expander.expand_var_name("partition"): pragmas.append("#SBATCH -p {partition}") - try: - extra_sbatch_headers_raw = expander.expand_var_name( - "extra_sbatch_headers", allow_passthrough=False - ) - extra_sbatch_headers = extra_sbatch_headers_raw.strip().split("\n") - pragmas = pragmas + extra_sbatch_headers - except ExpanderError: - pass + extra_headers = ( + self.app_inst.variables["extra_sbatch_headers"].strip().split("\n") + ) + pragmas = pragmas + extra_headers header_str = "\n".join(self.conditional_expand(pragmas)) return {"sbatch_headers_str": header_str} From f4f3c4603ddd34879cbe0bee7b2f4f70d1be3685 Mon Sep 17 00:00:00 2001 From: Lin Guo Date: Wed, 15 Jan 2025 12:15:03 -0800 Subject: [PATCH 3/3] Rename to `slurm_partition` --- .../slurm_workflow_manager.py | 2 +- .../builtin/workflow_managers/slurm/workflow_manager.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/ramble/ramble/test/workflow_manager_functionality/slurm_workflow_manager.py b/lib/ramble/ramble/test/workflow_manager_functionality/slurm_workflow_manager.py index 54c82ed1f..2f907ab55 100644 --- a/lib/ramble/ramble/test/workflow_manager_functionality/slurm_workflow_manager.py +++ b/lib/ramble/ramble/test/workflow_manager_functionality/slurm_workflow_manager.py @@ -47,7 +47,7 @@ def test_slurm_workflow(): #SBATCH --time={time_limit_not_exist} test_{wm_name}_2: variables: - partition: h3 + slurm_partition: h3 """ with ramble.workspace.create(workspace_name) as ws: ws.write() diff --git a/var/ramble/repos/builtin/workflow_managers/slurm/workflow_manager.py b/var/ramble/repos/builtin/workflow_managers/slurm/workflow_manager.py index 6d18b6e5c..9477befc0 100644 --- a/var/ramble/repos/builtin/workflow_managers/slurm/workflow_manager.py +++ b/var/ramble/repos/builtin/workflow_managers/slurm/workflow_manager.py @@ -78,7 +78,7 @@ def __init__(self, file_path): ) workflow_manager_variable( - name="partition", + name="slurm_partition", default="", description="partition to submit job to, if unspecified, it uses the default partition", ) @@ -120,8 +120,8 @@ def _execute_vars(self): ("#SBATCH -e {experiment_run_dir}/slurm-%j.err"), ("#SBATCH --gpus-per-node {gpus_per_node}"), ] - if expander.expand_var_name("partition"): - pragmas.append("#SBATCH -p {partition}") + if expander.expand_var_name("slurm_partition"): + pragmas.append("#SBATCH -p {slurm_partition}") extra_headers = ( self.app_inst.variables["extra_sbatch_headers"].strip().split("\n") )