From 4945700f63a79186b99b3090cb8b94e54b4aa77a Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Fri, 28 Apr 2023 16:27:45 -0500 Subject: [PATCH 01/36] first attempt to sketch out cpu affinity bindings --- balsam/platform/app_run/polaris.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/balsam/platform/app_run/polaris.py b/balsam/platform/app_run/polaris.py index 72834f39..4da45bfd 100644 --- a/balsam/platform/app_run/polaris.py +++ b/balsam/platform/app_run/polaris.py @@ -1,3 +1,5 @@ +import os + from .app_run import SubprocessAppRun @@ -9,6 +11,18 @@ class PolarisRun(SubprocessAppRun): def _build_cmdline(self) -> str: node_ids = [h for h in self._node_spec.hostnames] cpu_bind = self._launch_params.get("cpu_bind", "none") + if ( + cpu_bind == "none" + and self._gpus_per_rank > 0 + and self.get_num_ranks() == 8 + and self.get_cpus_per_rank == 1 + ): + gpu_device = int(os.getenv("CUDA_VISIBLE_DEVICES")) + cpu_bind_list = ["list"] + start_cpu = 32 - 8 * (1 + gpu_device) + for i in range(8): + cpu_bind_list.append(":" + str(start_cpu + i)) + cpu_bind = "".join(cpu_bind_list) nid_str = ",".join(map(str, node_ids)) args = [ "mpiexec", From 5c85221db688af073946807a6a95324e67adf07e Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Fri, 19 May 2023 18:09:04 -0500 Subject: [PATCH 02/36] updates to polaris app run --- balsam/platform/app_run/polaris.py | 56 +++++++++++++++---- .../compute_node/alcf_polaris_node.py | 2 + 2 files changed, 47 insertions(+), 11 deletions(-) diff --git a/balsam/platform/app_run/polaris.py b/balsam/platform/app_run/polaris.py index 4da45bfd..8eb806c0 100644 --- a/balsam/platform/app_run/polaris.py +++ b/balsam/platform/app_run/polaris.py @@ -1,7 +1,10 @@ +import logging import os from .app_run import SubprocessAppRun +logger = logging.getLogger(__name__) + class PolarisRun(SubprocessAppRun): """ @@ -10,19 +13,35 @@ class PolarisRun(SubprocessAppRun): def _build_cmdline(self) -> str: node_ids = [h for h in self._node_spec.hostnames] + cpu_bind = self._launch_params.get("cpu_bind", "none") - if ( - cpu_bind == "none" - and self._gpus_per_rank > 0 - and self.get_num_ranks() == 8 - and self.get_cpus_per_rank == 1 - ): - gpu_device = int(os.getenv("CUDA_VISIBLE_DEVICES")) - cpu_bind_list = ["list"] - start_cpu = 32 - 8 * (1 + gpu_device) - for i in range(8): - cpu_bind_list.append(":" + str(start_cpu + i)) + if cpu_bind == "none" and self._gpus_per_rank > 0: + gpu_device = self._envs["CUDA_VISIBLE_DEVICES"] + gpu_ids = gpu_device.split(",") + gpu_ids.reverse() + cpu_ids = self._node_spec.cpu_ids[0] + + cpu_bind_list = ["verbose,list"] + for gid in gpu_ids: + start_cpu = 32 - int(gid) * 8 - self.get_cpus_per_rank() + cpu_bind_list.append(":") + for icpu in range(self.get_cpus_per_rank()): + if icpu > 0: + cpu_bind_list.append(",") + cpu_bind_list.append(str(start_cpu + icpu)) + + # start_cpu = 32 - 8 * (1 + gpu_device) + # for i in range(8): + # cpu_bind_list.append(":" + str(start_cpu + i)) cpu_bind = "".join(cpu_bind_list) + logger.info(f"Polaris app_run: cpu_bind={cpu_bind} cpu_ids={cpu_ids} gpu_ids={gpu_ids}") + + launch_params = [] + for k in self._launch_params.keys(): + if k != "cpu_bind": + launch_params.append("--" + k) + launch_params.append(str(self._launch_params[k])) + nid_str = ",".join(map(str, node_ids)) args = [ "mpiexec", @@ -36,6 +55,21 @@ def _build_cmdline(self) -> str: cpu_bind, "-d", self._threads_per_rank, + *launch_params, self._cmdline, ] return " ".join(str(arg) for arg in args) + + # Overide default because sunspot does not use CUDA + def _set_envs(self) -> None: + + envs = os.environ.copy() + envs.update(self._envs) + # Check the assigned GPU ID list from the first compute node: + gpu_ids = self._node_spec.gpu_ids[0] + + if gpu_ids: + envs["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" + envs["CUDA_VISIBLE_DEVICES"] = ",".join(map(str, gpu_ids)) + envs["OMP_NUM_THREADS"] = str(self._threads_per_rank) + self._envs = envs diff --git a/balsam/platform/compute_node/alcf_polaris_node.py b/balsam/platform/compute_node/alcf_polaris_node.py index b5283c3b..af5925fb 100644 --- a/balsam/platform/compute_node/alcf_polaris_node.py +++ b/balsam/platform/compute_node/alcf_polaris_node.py @@ -12,6 +12,8 @@ class PolarisNode(ComputeNode): # turam: confirm number of cpus cpu_ids = list(range(64)) + # cms21: recommended cpu affinity for polaris nodes is in reverse order to gpu ids + cpu_ids.reverse() gpu_ids: List[IntStr] = list(range(4)) @classmethod From 9f1b1ca717bb400c22e9d32c73cd39e2f7a96d8d Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Fri, 19 May 2023 18:27:51 -0500 Subject: [PATCH 03/36] updates to polaris app run --- balsam/platform/app_run/polaris.py | 1 - 1 file changed, 1 deletion(-) diff --git a/balsam/platform/app_run/polaris.py b/balsam/platform/app_run/polaris.py index 8eb806c0..6b5afdec 100644 --- a/balsam/platform/app_run/polaris.py +++ b/balsam/platform/app_run/polaris.py @@ -18,7 +18,6 @@ def _build_cmdline(self) -> str: if cpu_bind == "none" and self._gpus_per_rank > 0: gpu_device = self._envs["CUDA_VISIBLE_DEVICES"] gpu_ids = gpu_device.split(",") - gpu_ids.reverse() cpu_ids = self._node_spec.cpu_ids[0] cpu_bind_list = ["verbose,list"] From c811f374163ec4c6e164a9fe0c6e670790712e68 Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Wed, 24 May 2023 18:49:37 -0500 Subject: [PATCH 04/36] attempt to fix cpu affinity in Polaris app_run --- balsam/platform/app_run/polaris.py | 44 ++++++++++++------- .../compute_node/alcf_polaris_node.py | 9 ++-- 2 files changed, 34 insertions(+), 19 deletions(-) diff --git a/balsam/platform/app_run/polaris.py b/balsam/platform/app_run/polaris.py index 6b5afdec..f04b6277 100644 --- a/balsam/platform/app_run/polaris.py +++ b/balsam/platform/app_run/polaris.py @@ -1,6 +1,8 @@ import logging import os +from balsam.platform.compute_node.alcf_polaris_node import PolarisNode + from .app_run import SubprocessAppRun logger = logging.getLogger(__name__) @@ -14,26 +16,31 @@ class PolarisRun(SubprocessAppRun): def _build_cmdline(self) -> str: node_ids = [h for h in self._node_spec.hostnames] + # cms21: currently this is broken for multinode jobs + cpu_bind = self._launch_params.get("cpu_bind", "none") if cpu_bind == "none" and self._gpus_per_rank > 0: - gpu_device = self._envs["CUDA_VISIBLE_DEVICES"] - gpu_ids = gpu_device.split(",") - cpu_ids = self._node_spec.cpu_ids[0] + polaris_node = PolarisNode() + # gpu_device = self._envs["CUDA_VISIBLE_DEVICES"] + # gpu_ids = gpu_device.split(",") + # cpu_ids = self._node_spec.cpu_ids[0] + cpu_ids = polaris_node.cpu_ids + gpu_ids = polaris_node.gpu_ids + cpus_per_rank = self.get_cpus_per_rank() + cpu_ids_ns = self._node_spec.cpu_ids cpu_bind_list = ["verbose,list"] - for gid in gpu_ids: - start_cpu = 32 - int(gid) * 8 - self.get_cpus_per_rank() + for irank in range(self._ranks_per_node): cpu_bind_list.append(":") - for icpu in range(self.get_cpus_per_rank()): - if icpu > 0: + for i in range(cpus_per_rank): + if i > 0: cpu_bind_list.append(",") - cpu_bind_list.append(str(start_cpu + icpu)) - - # start_cpu = 32 - 8 * (1 + gpu_device) - # for i in range(8): - # cpu_bind_list.append(":" + str(start_cpu + i)) + cid = str(cpu_ids[i + cpus_per_rank * irank]) + cpu_bind_list.append(cid) cpu_bind = "".join(cpu_bind_list) - logger.info(f"Polaris app_run: cpu_bind={cpu_bind} cpu_ids={cpu_ids} gpu_ids={gpu_ids}") + logger.info( + f"Polaris app_run: cpu_bind={cpu_bind} cpu_ids={cpu_ids} cpu_ids_ns={cpu_ids_ns} gpu_ids={gpu_ids}" + ) launch_params = [] for k in self._launch_params.keys(): @@ -65,9 +72,16 @@ def _set_envs(self) -> None: envs = os.environ.copy() envs.update(self._envs) # Check the assigned GPU ID list from the first compute node: - gpu_ids = self._node_spec.gpu_ids[0] + gpu_ids = self._node_spec.gpu_ids + cpu_ids = self._node_spec.cpu_ids + logger.info(f"Polaris set_envs: gpu_ids={gpu_ids} cpu_ids={cpu_ids}") + if gpu_ids[0] and len(self._node_spec.node_ids) == 1: + envs["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" + envs["CUDA_VISIBLE_DEVICES"] = ",".join(map(str, gpu_ids)) + if not gpu_ids[0] and len(self._node_spec.node_ids) > 1 and self._gpus_per_rank > 0: + polaris_node = PolarisNode() + gpu_ids = polaris_node.gpu_ids - if gpu_ids: envs["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" envs["CUDA_VISIBLE_DEVICES"] = ",".join(map(str, gpu_ids)) envs["OMP_NUM_THREADS"] = str(self._threads_per_rank) diff --git a/balsam/platform/compute_node/alcf_polaris_node.py b/balsam/platform/compute_node/alcf_polaris_node.py index af5925fb..c2788e6d 100644 --- a/balsam/platform/compute_node/alcf_polaris_node.py +++ b/balsam/platform/compute_node/alcf_polaris_node.py @@ -10,12 +10,13 @@ class PolarisNode(ComputeNode): - # turam: confirm number of cpus - cpu_ids = list(range(64)) - # cms21: recommended cpu affinity for polaris nodes is in reverse order to gpu ids - cpu_ids.reverse() + + cpu_ids = list(range(32)) gpu_ids: List[IntStr] = list(range(4)) + # cms21: optimal gpu/cpu binding on Polaris nodes goes in reverse order + gpu_ids.reverse() + @classmethod def get_job_nodelist(cls) -> List["PolarisNode"]: """ From 0936c686177f1c67c0e1980da60ab3b507bd604e Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Thu, 25 May 2023 03:16:41 -0500 Subject: [PATCH 05/36] added polaris gpu affinity script --- balsam/platform/app_run/app_run.py | 11 ++++ balsam/platform/app_run/polaris.py | 84 +++++++++++++++++++++++++----- 2 files changed, 81 insertions(+), 14 deletions(-) diff --git a/balsam/platform/app_run/app_run.py b/balsam/platform/app_run/app_run.py index ff9f2cf7..5d5af973 100644 --- a/balsam/platform/app_run/app_run.py +++ b/balsam/platform/app_run/app_run.py @@ -8,6 +8,7 @@ import psutil # type: ignore +from balsam.platform.compute_node import ComputeNode from balsam.site.launcher import NodeSpec logger = logging.getLogger(__name__) @@ -72,6 +73,16 @@ def get_cpus_per_rank(self) -> int: cpu_per_rank = max(1, int(self._threads_per_rank // self._threads_per_core)) return cpu_per_rank + def get_gpus_per_node_for_job(self) -> int: + gpus_per_node = self._gpus_per_rank * self._ranks_per_node + compute_node = ComputeNode(self._node_spec.node_ids[0], self._node_spec.hostnames[0]) + total_gpus_per_node = len(compute_node.gpu_ids) + if gpus_per_node > total_gpus_per_node: + logger.warning( + f"You have too many gpus per node! Physical gpus={total_gpus_per_node} gpus_per_rank={self._gpus_per_rank} ranks_per_node={self._ranks_per_node}" + ) + return min(gpus_per_node, total_gpus_per_node) + @abstractmethod def start(self) -> None: pass diff --git a/balsam/platform/app_run/polaris.py b/balsam/platform/app_run/polaris.py index f04b6277..1a0bfae7 100644 --- a/balsam/platform/app_run/polaris.py +++ b/balsam/platform/app_run/polaris.py @@ -1,5 +1,6 @@ import logging import os +import stat from balsam.platform.compute_node.alcf_polaris_node import PolarisNode @@ -19,13 +20,66 @@ def _build_cmdline(self) -> str: # cms21: currently this is broken for multinode jobs cpu_bind = self._launch_params.get("cpu_bind", "none") + gpu_affinity_script = "" if cpu_bind == "none" and self._gpus_per_rank > 0: - polaris_node = PolarisNode() - # gpu_device = self._envs["CUDA_VISIBLE_DEVICES"] - # gpu_ids = gpu_device.split(",") - # cpu_ids = self._node_spec.cpu_ids[0] - cpu_ids = polaris_node.cpu_ids - gpu_ids = polaris_node.gpu_ids + if len(self._node_spec.node_ids) == 1 or self._ranks_per_node == 1: + cpu_ids = self._node_spec.cpu_ids[0] + gpu_ids = self._node_spec.gpu_ids[0] + else: + gpu_ids = self._envs["CUDA_VISIBLE_DEVICES"].split( + "," + ) # These should be distributed across local ranks + polaris_node = PolarisNode(self._node_spec.node_ids[0], self._node_spec.hostnames[0]) + cpu_ids = polaris_node.cpu_ids + node_gpu_ids = polaris_node.gpu_ids + gpu_affinity_script = self._cwd.joinpath("set_affinity_gpu_polaris.sh") + with open(gpu_affinity_script, "w") as f: + f.write( + f"""#!/bin/bash -l + gpu_ids=( "{" ".join(gpu_ids)}" ) + num_gpus={len(node_gpu_ids)} + gpus_per_rank={self._gpus_per_rank} + ngpu=0 + gpu_string=""\n + """ + ) + f.write( + """while [ $ngpu -lt $gpus_per_rank ] + do + igpu=$(((${PMI_LOCAL_RANK} * ${gpus_per_rank}) + ${ngpu} % ${num_gpus})) + gpu=${gpu_ids[$igpu]} + ##gpu=$((${num_gpus} - 1 - ${ngpu} - (${PMI_LOCAL_RANK} * ${gpus_per_rank}) % ${num_gpus})) + sep="" + if [ $ngpu -gt 0 ] + then + sep="," + fi + gpu_string=$gpu_string$sep$gpu + ngpu=$((${igpu} + 1)) + done + export CUDA_VISIBLE_DEVICES=$gpu_string + echo “RANK= ${PMI_RANK} LOCAL_RANK= ${PMI_LOCAL_RANK} gpu= $gpu_string” + exec "$@" + """ + ) + st = os.stat(gpu_affinity_script) + os.chmod(gpu_affinity_script, st.st_mode | stat.S_IEXEC) + + # gpu_ids = polaris_node.gpu_ids + # num_gpus = len(gpu_ids) + # gpu_affinity_script = self._cwd.joinpath("set_affinity_gpu_polaris.sh") + # with open(gpu_affinity_script,"w") as f: + # f.write(f"""#!/bin/bash -l + # num_gpus={num_gpus} + # gpus_per_rank={self._gpus_per_rank}\n"""+ + # """gpu=$((${num_gpus} - 1 - ${PMI_LOCAL_RANK} % ${num_gpus}))\n + # export CUDA_VISIBLE_DEVICES=$gpu\n + # echo “RANK= ${PMI_RANK} LOCAL_RANK= ${PMI_LOCAL_RANK} gpu= ${gpu}”\n + # exec "$@"\n + # """) + # st = os.stat(gpu_affinity_script) + # os.chmod(gpu_affinity_script, st.st_mode | stat.S_IEXEC) + cpus_per_rank = self.get_cpus_per_rank() cpu_ids_ns = self._node_spec.cpu_ids @@ -62,6 +116,7 @@ def _build_cmdline(self) -> str: "-d", self._threads_per_rank, *launch_params, + gpu_affinity_script, self._cmdline, ] return " ".join(str(arg) for arg in args) @@ -72,17 +127,18 @@ def _set_envs(self) -> None: envs = os.environ.copy() envs.update(self._envs) # Check the assigned GPU ID list from the first compute node: - gpu_ids = self._node_spec.gpu_ids - cpu_ids = self._node_spec.cpu_ids + gpu_ids = self._node_spec.gpu_ids[0] + cpu_ids = self._node_spec.cpu_ids[0] logger.info(f"Polaris set_envs: gpu_ids={gpu_ids} cpu_ids={cpu_ids}") - if gpu_ids[0] and len(self._node_spec.node_ids) == 1: + if gpu_ids: envs["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" envs["CUDA_VISIBLE_DEVICES"] = ",".join(map(str, gpu_ids)) - if not gpu_ids[0] and len(self._node_spec.node_ids) > 1 and self._gpus_per_rank > 0: - polaris_node = PolarisNode() - gpu_ids = polaris_node.gpu_ids + else: + polaris_node = PolarisNode(self._node_spec.node_ids[0], self._node_spec.hostnames[0]) + if self._gpus_per_rank > 0: + gpu_ids = polaris_node.gpu_ids[0 : self.get_gpus_per_node_for_job()] + envs["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" + envs["CUDA_VISIBLE_DEVICES"] = ",".join(map(str, gpu_ids)) - envs["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" - envs["CUDA_VISIBLE_DEVICES"] = ",".join(map(str, gpu_ids)) envs["OMP_NUM_THREADS"] = str(self._threads_per_rank) self._envs = envs From 08e09766a6fa1ee912e209b76f56e36479ac9424 Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Thu, 25 May 2023 10:20:40 -0500 Subject: [PATCH 06/36] fixes to the affinity script --- balsam/platform/app_run/polaris.py | 22 +++++----------------- 1 file changed, 5 insertions(+), 17 deletions(-) diff --git a/balsam/platform/app_run/polaris.py b/balsam/platform/app_run/polaris.py index 1a0bfae7..c30f4536 100644 --- a/balsam/platform/app_run/polaris.py +++ b/balsam/platform/app_run/polaris.py @@ -22,7 +22,7 @@ def _build_cmdline(self) -> str: cpu_bind = self._launch_params.get("cpu_bind", "none") gpu_affinity_script = "" if cpu_bind == "none" and self._gpus_per_rank > 0: - if len(self._node_spec.node_ids) == 1 or self._ranks_per_node == 1: + if len(self._node_spec.node_ids) == 1: cpu_ids = self._node_spec.cpu_ids[0] gpu_ids = self._node_spec.gpu_ids[0] else: @@ -31,12 +31,15 @@ def _build_cmdline(self) -> str: ) # These should be distributed across local ranks polaris_node = PolarisNode(self._node_spec.node_ids[0], self._node_spec.hostnames[0]) cpu_ids = polaris_node.cpu_ids + + if len(self._node_spec.node_ids) > 1 or self._ranks_per_node > 1: + polaris_node = PolarisNode(self._node_spec.node_ids[0], self._node_spec.hostnames[0]) node_gpu_ids = polaris_node.gpu_ids gpu_affinity_script = self._cwd.joinpath("set_affinity_gpu_polaris.sh") with open(gpu_affinity_script, "w") as f: f.write( f"""#!/bin/bash -l - gpu_ids=( "{" ".join(gpu_ids)}" ) + gpu_ids=( {" ".join(gpu_ids)} ) num_gpus={len(node_gpu_ids)} gpus_per_rank={self._gpus_per_rank} ngpu=0 @@ -65,21 +68,6 @@ def _build_cmdline(self) -> str: st = os.stat(gpu_affinity_script) os.chmod(gpu_affinity_script, st.st_mode | stat.S_IEXEC) - # gpu_ids = polaris_node.gpu_ids - # num_gpus = len(gpu_ids) - # gpu_affinity_script = self._cwd.joinpath("set_affinity_gpu_polaris.sh") - # with open(gpu_affinity_script,"w") as f: - # f.write(f"""#!/bin/bash -l - # num_gpus={num_gpus} - # gpus_per_rank={self._gpus_per_rank}\n"""+ - # """gpu=$((${num_gpus} - 1 - ${PMI_LOCAL_RANK} % ${num_gpus}))\n - # export CUDA_VISIBLE_DEVICES=$gpu\n - # echo “RANK= ${PMI_RANK} LOCAL_RANK= ${PMI_LOCAL_RANK} gpu= ${gpu}”\n - # exec "$@"\n - # """) - # st = os.stat(gpu_affinity_script) - # os.chmod(gpu_affinity_script, st.st_mode | stat.S_IEXEC) - cpus_per_rank = self.get_cpus_per_rank() cpu_ids_ns = self._node_spec.cpu_ids From e61c12cbe79afad2e8ebbcb7aa492ab558aeb329 Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Thu, 25 May 2023 10:49:10 -0500 Subject: [PATCH 07/36] some style changes --- balsam/platform/app_run/polaris.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/balsam/platform/app_run/polaris.py b/balsam/platform/app_run/polaris.py index c30f4536..0f258033 100644 --- a/balsam/platform/app_run/polaris.py +++ b/balsam/platform/app_run/polaris.py @@ -2,7 +2,7 @@ import os import stat -from balsam.platform.compute_node.alcf_polaris_node import PolarisNode +from balsam.platform.compute_node import PolarisNode from .app_run import SubprocessAppRun @@ -17,8 +17,6 @@ class PolarisRun(SubprocessAppRun): def _build_cmdline(self) -> str: node_ids = [h for h in self._node_spec.hostnames] - # cms21: currently this is broken for multinode jobs - cpu_bind = self._launch_params.get("cpu_bind", "none") gpu_affinity_script = "" if cpu_bind == "none" and self._gpus_per_rank > 0: From 7bcbc52c353a06977ee880c6cd02a69d21396519 Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Thu, 25 May 2023 15:39:26 -0500 Subject: [PATCH 08/36] reverting affinity script addition, put in different branch --- balsam/platform/app_run/polaris.py | 62 ++++-------------------------- 1 file changed, 8 insertions(+), 54 deletions(-) diff --git a/balsam/platform/app_run/polaris.py b/balsam/platform/app_run/polaris.py index 0f258033..05b506c1 100644 --- a/balsam/platform/app_run/polaris.py +++ b/balsam/platform/app_run/polaris.py @@ -1,6 +1,5 @@ import logging import os -import stat from balsam.platform.compute_node import PolarisNode @@ -18,56 +17,15 @@ def _build_cmdline(self) -> str: node_ids = [h for h in self._node_spec.hostnames] cpu_bind = self._launch_params.get("cpu_bind", "none") - gpu_affinity_script = "" - if cpu_bind == "none" and self._gpus_per_rank > 0: + if cpu_bind == "none" and self._gpus_per_rank > 0 and self._ranks_per_node == 1: + gpu_ids = self._envs["CUDA_VISIBLE_DEVICES"].split(",") if len(self._node_spec.node_ids) == 1: cpu_ids = self._node_spec.cpu_ids[0] - gpu_ids = self._node_spec.gpu_ids[0] else: - gpu_ids = self._envs["CUDA_VISIBLE_DEVICES"].split( - "," - ) # These should be distributed across local ranks polaris_node = PolarisNode(self._node_spec.node_ids[0], self._node_spec.hostnames[0]) cpu_ids = polaris_node.cpu_ids - if len(self._node_spec.node_ids) > 1 or self._ranks_per_node > 1: - polaris_node = PolarisNode(self._node_spec.node_ids[0], self._node_spec.hostnames[0]) - node_gpu_ids = polaris_node.gpu_ids - gpu_affinity_script = self._cwd.joinpath("set_affinity_gpu_polaris.sh") - with open(gpu_affinity_script, "w") as f: - f.write( - f"""#!/bin/bash -l - gpu_ids=( {" ".join(gpu_ids)} ) - num_gpus={len(node_gpu_ids)} - gpus_per_rank={self._gpus_per_rank} - ngpu=0 - gpu_string=""\n - """ - ) - f.write( - """while [ $ngpu -lt $gpus_per_rank ] - do - igpu=$(((${PMI_LOCAL_RANK} * ${gpus_per_rank}) + ${ngpu} % ${num_gpus})) - gpu=${gpu_ids[$igpu]} - ##gpu=$((${num_gpus} - 1 - ${ngpu} - (${PMI_LOCAL_RANK} * ${gpus_per_rank}) % ${num_gpus})) - sep="" - if [ $ngpu -gt 0 ] - then - sep="," - fi - gpu_string=$gpu_string$sep$gpu - ngpu=$((${igpu} + 1)) - done - export CUDA_VISIBLE_DEVICES=$gpu_string - echo “RANK= ${PMI_RANK} LOCAL_RANK= ${PMI_LOCAL_RANK} gpu= $gpu_string” - exec "$@" - """ - ) - st = os.stat(gpu_affinity_script) - os.chmod(gpu_affinity_script, st.st_mode | stat.S_IEXEC) - cpus_per_rank = self.get_cpus_per_rank() - cpu_ids_ns = self._node_spec.cpu_ids cpu_bind_list = ["verbose,list"] for irank in range(self._ranks_per_node): @@ -78,9 +36,7 @@ def _build_cmdline(self) -> str: cid = str(cpu_ids[i + cpus_per_rank * irank]) cpu_bind_list.append(cid) cpu_bind = "".join(cpu_bind_list) - logger.info( - f"Polaris app_run: cpu_bind={cpu_bind} cpu_ids={cpu_ids} cpu_ids_ns={cpu_ids_ns} gpu_ids={gpu_ids}" - ) + logger.info(f"Polaris app_run: cpu_bind={cpu_bind} cpu_ids={cpu_ids} gpu_ids={gpu_ids}") launch_params = [] for k in self._launch_params.keys(): @@ -102,7 +58,6 @@ def _build_cmdline(self) -> str: "-d", self._threads_per_rank, *launch_params, - gpu_affinity_script, self._cmdline, ] return " ".join(str(arg) for arg in args) @@ -116,15 +71,14 @@ def _set_envs(self) -> None: gpu_ids = self._node_spec.gpu_ids[0] cpu_ids = self._node_spec.cpu_ids[0] logger.info(f"Polaris set_envs: gpu_ids={gpu_ids} cpu_ids={cpu_ids}") - if gpu_ids: + if gpu_ids and self._ranks_per_node == 1: envs["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" envs["CUDA_VISIBLE_DEVICES"] = ",".join(map(str, gpu_ids)) - else: + if not gpu_ids and self._ranks_per_node == 1 and self._gpus_per_rank > 0: polaris_node = PolarisNode(self._node_spec.node_ids[0], self._node_spec.hostnames[0]) - if self._gpus_per_rank > 0: - gpu_ids = polaris_node.gpu_ids[0 : self.get_gpus_per_node_for_job()] - envs["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" - envs["CUDA_VISIBLE_DEVICES"] = ",".join(map(str, gpu_ids)) + gpu_ids = polaris_node.gpu_ids[0 : self._gpus_per_rank] + envs["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" + envs["CUDA_VISIBLE_DEVICES"] = ",".join(map(str, gpu_ids)) envs["OMP_NUM_THREADS"] = str(self._threads_per_rank) self._envs = envs From b0973cf47b60852b84ef77ac5556087152554b0f Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Fri, 26 May 2023 10:39:22 -0500 Subject: [PATCH 09/36] removed helper function --- balsam/platform/app_run/app_run.py | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/balsam/platform/app_run/app_run.py b/balsam/platform/app_run/app_run.py index 5d5af973..ff9f2cf7 100644 --- a/balsam/platform/app_run/app_run.py +++ b/balsam/platform/app_run/app_run.py @@ -8,7 +8,6 @@ import psutil # type: ignore -from balsam.platform.compute_node import ComputeNode from balsam.site.launcher import NodeSpec logger = logging.getLogger(__name__) @@ -73,16 +72,6 @@ def get_cpus_per_rank(self) -> int: cpu_per_rank = max(1, int(self._threads_per_rank // self._threads_per_core)) return cpu_per_rank - def get_gpus_per_node_for_job(self) -> int: - gpus_per_node = self._gpus_per_rank * self._ranks_per_node - compute_node = ComputeNode(self._node_spec.node_ids[0], self._node_spec.hostnames[0]) - total_gpus_per_node = len(compute_node.gpu_ids) - if gpus_per_node > total_gpus_per_node: - logger.warning( - f"You have too many gpus per node! Physical gpus={total_gpus_per_node} gpus_per_rank={self._gpus_per_rank} ranks_per_node={self._ranks_per_node}" - ) - return min(gpus_per_node, total_gpus_per_node) - @abstractmethod def start(self) -> None: pass From 77f8941307b102c088ccda0f1bc2a65d5f24ce0b Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Fri, 26 May 2023 13:54:22 -0500 Subject: [PATCH 10/36] Updates to polaris cmdline implementation after dev discussion; includes notes --- balsam/platform/app_run/polaris.py | 47 +++++++++++++++++++++++------- 1 file changed, 36 insertions(+), 11 deletions(-) diff --git a/balsam/platform/app_run/polaris.py b/balsam/platform/app_run/polaris.py index 05b506c1..6ca5b397 100644 --- a/balsam/platform/app_run/polaris.py +++ b/balsam/platform/app_run/polaris.py @@ -17,9 +17,20 @@ def _build_cmdline(self) -> str: node_ids = [h for h in self._node_spec.hostnames] cpu_bind = self._launch_params.get("cpu_bind", "none") - if cpu_bind == "none" and self._gpus_per_rank > 0 and self._ranks_per_node == 1: - gpu_ids = self._envs["CUDA_VISIBLE_DEVICES"].split(",") - if len(self._node_spec.node_ids) == 1: + + # If the user does not set a cpu_bind option and gpus are being used, + # this code sets cpu-bind to be optimal for the gpus being used. + # This does not handle the case where the application is using less than + # 8 cpus per gpu. This code will not skip the appropriate number of cpus + # in the rank binding assignments. + if cpu_bind == "none" and self._gpus_per_rank > 0: + + # Here we grab the cpu_ids assigned to the job in the NodeSpec object + # If this is not set in NodeSpec (it is only set for single node jobs), + # then we take the cpu_id list from the Polaris ComputeNode subclass, + # assuming the job will have use of all the cpus in nodes assigned to it. + cpu_ids_ns = self._node_spec.cpu_ids[0] + if cpu_ids_ns: cpu_ids = self._node_spec.cpu_ids[0] else: polaris_node = PolarisNode(self._node_spec.node_ids[0], self._node_spec.hostnames[0]) @@ -27,6 +38,8 @@ def _build_cmdline(self) -> str: cpus_per_rank = self.get_cpus_per_rank() + # PolarisNode reverses the order of the gpu_ids, so assigning the cpu-bind + # in ascending cpu order is what we want here. cpu_bind_list = ["verbose,list"] for irank in range(self._ranks_per_node): cpu_bind_list.append(":") @@ -36,6 +49,8 @@ def _build_cmdline(self) -> str: cid = str(cpu_ids[i + cpus_per_rank * irank]) cpu_bind_list.append(cid) cpu_bind = "".join(cpu_bind_list) + gpu_device = self._envs["CUDA_VISIBLE_DEVICES"] + gpu_ids = gpu_device.split(",") logger.info(f"Polaris app_run: cpu_bind={cpu_bind} cpu_ids={cpu_ids} gpu_ids={gpu_ids}") launch_params = [] @@ -67,18 +82,28 @@ def _set_envs(self) -> None: envs = os.environ.copy() envs.update(self._envs) - # Check the assigned GPU ID list from the first compute node: + + # Here we grab the gpus assigned to the job from NodeSpec. NodeSpec only + # sets this for single node jobs. For multinode jobs, gpu_ids below will + # be an empty list of lists (e.g. [[], []]). The ordering of the gpu_ids + # is reversed in PolarisNode and therefore the reverse ordering of + # cpus to gpus should be reflected here gpu_ids = self._node_spec.gpu_ids[0] cpu_ids = self._node_spec.cpu_ids[0] logger.info(f"Polaris set_envs: gpu_ids={gpu_ids} cpu_ids={cpu_ids}") - if gpu_ids and self._ranks_per_node == 1: - envs["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" - envs["CUDA_VISIBLE_DEVICES"] = ",".join(map(str, gpu_ids)) - if not gpu_ids and self._ranks_per_node == 1 and self._gpus_per_rank > 0: - polaris_node = PolarisNode(self._node_spec.node_ids[0], self._node_spec.hostnames[0]) - gpu_ids = polaris_node.gpu_ids[0 : self._gpus_per_rank] + + # Here we set CUDA_VISIBLE_DEVICES for single node jobs only. We assume + # for multinode jobs that the job has access to all gpus, and + # CUDA_VISIBLE_DEVICES is set by the user, for example by local rank with an + # gpu_affinity.sh script that wraps around the user application in the + # ApplicationDefinition. + # One special case: if your job has one node, 2 ranks, and 1 gpu per rank, the + # code here will set CUDA_VISIBLE_DEVICES to "3,2" or "1,0". A user provided + # gpu_affinity.sh script should take this assigment and use it to reset + # CUDA_VISIBLE_DEVICES for each local rank. The user script should NOT + # round-robin the setting CUDA_VISIBLE_DEVICES starting from 3. + if gpu_ids: envs["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" envs["CUDA_VISIBLE_DEVICES"] = ",".join(map(str, gpu_ids)) - envs["OMP_NUM_THREADS"] = str(self._threads_per_rank) self._envs = envs From 2efaa8ed82c87487188a1dac01ee0d08aafb5451 Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Fri, 26 May 2023 17:23:17 -0500 Subject: [PATCH 11/36] remove turam path from polaris job-template.sh --- balsam/config/defaults/alcf_polaris/job-template.sh | 2 -- 1 file changed, 2 deletions(-) diff --git a/balsam/config/defaults/alcf_polaris/job-template.sh b/balsam/config/defaults/alcf_polaris/job-template.sh index 8dae69c2..dd090dee 100644 --- a/balsam/config/defaults/alcf_polaris/job-template.sh +++ b/balsam/config/defaults/alcf_polaris/job-template.sh @@ -8,8 +8,6 @@ export http_proxy="http://proxy:3128" export https_proxy="http://proxy:3128" -export PYTHONPATH=/home/turam/dev/polaris/balsam:$PYTHONPATH - #remove export PMI_NO_FORK=1 export BALSAM_SITE_PATH={{balsam_site_path}} cd $BALSAM_SITE_PATH From 1281a794650a3311bfb671e7656153ca60bfda10 Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Fri, 26 May 2023 17:24:04 -0500 Subject: [PATCH 12/36] more updates to polaris cmdline --- balsam/platform/app_run/polaris.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/balsam/platform/app_run/polaris.py b/balsam/platform/app_run/polaris.py index 6ca5b397..46062f2d 100644 --- a/balsam/platform/app_run/polaris.py +++ b/balsam/platform/app_run/polaris.py @@ -49,8 +49,11 @@ def _build_cmdline(self) -> str: cid = str(cpu_ids[i + cpus_per_rank * irank]) cpu_bind_list.append(cid) cpu_bind = "".join(cpu_bind_list) - gpu_device = self._envs["CUDA_VISIBLE_DEVICES"] - gpu_ids = gpu_device.split(",") + if "CUDA_VISIBLE_DEVICES" in self._envs.keys(): + gpu_device = self._envs["CUDA_VISIBLE_DEVICES"] + gpu_ids = gpu_device.split(",") + else: + gpu_ids = [] logger.info(f"Polaris app_run: cpu_bind={cpu_bind} cpu_ids={cpu_ids} gpu_ids={gpu_ids}") launch_params = [] @@ -71,7 +74,7 @@ def _build_cmdline(self) -> str: "--cpu-bind", cpu_bind, "-d", - self._threads_per_rank, + self.get_cpus_per_rank(), *launch_params, self._cmdline, ] From 1b64cdb798f7804b652782149d0b4d07b5449089 Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Wed, 31 May 2023 21:56:52 -0500 Subject: [PATCH 13/36] changes to make depth paramter for Polaris app_run consistent with docs --- balsam/platform/app_run/app_run.py | 23 +++++++++++++++++++---- balsam/platform/app_run/polaris.py | 18 +++++++++++++++++- 2 files changed, 36 insertions(+), 5 deletions(-) diff --git a/balsam/platform/app_run/app_run.py b/balsam/platform/app_run/app_run.py index ff9f2cf7..7713b974 100644 --- a/balsam/platform/app_run/app_run.py +++ b/balsam/platform/app_run/app_run.py @@ -8,6 +8,7 @@ import psutil # type: ignore +from balsam.platform.compute_node import ComputeNode from balsam.site.launcher import NodeSpec logger = logging.getLogger(__name__) @@ -67,10 +68,24 @@ def get_num_ranks(self) -> int: return self._ranks_per_node * len(self._node_spec.node_ids) def get_cpus_per_rank(self) -> int: - cpu_per_rank = len(self._node_spec.cpu_ids[0]) // self._ranks_per_node - if not cpu_per_rank: - cpu_per_rank = max(1, int(self._threads_per_rank // self._threads_per_core)) - return cpu_per_rank + + # Get the list of cpus assigned to the job. If it is a single node job, that is stored in + # the NodeSpec object. If it is a multinode job, the cpu_ids assigned to NodeSpec is empty, + # so we will assume all cpus on a compute node are available to the job. The list of cpus is + # just the list of cpus on the node in that case. + cpu_ids = self._node_spec.cpu_ids[0] + if not cpu_ids: + compute_node = ComputeNode(self._node_spec.node_ids[0], self._node_spec.hostnames[0]) + cpu_ids = compute_node.cpu_ids + + cpus_per_node = len(cpu_ids) + cpus_per_rank = cpus_per_node // self._ranks_per_node + + # If ranks are oversubscribed to cpus (ranks_per_node > cpus_per_node), set it to a minimum of + # 1 cpu per rank or the number of cores per rank from the threading settings + if not cpus_per_rank: + cpus_per_rank = max(1, int(self._threads_per_rank // self._threads_per_core)) + return cpus_per_rank @abstractmethod def start(self) -> None: diff --git a/balsam/platform/app_run/polaris.py b/balsam/platform/app_run/polaris.py index 46062f2d..5f59cd78 100644 --- a/balsam/platform/app_run/polaris.py +++ b/balsam/platform/app_run/polaris.py @@ -32,6 +32,8 @@ def _build_cmdline(self) -> str: cpu_ids_ns = self._node_spec.cpu_ids[0] if cpu_ids_ns: cpu_ids = self._node_spec.cpu_ids[0] + if self._threads_per_core == 2: + polaris_node = PolarisNode(self._node_spec.node_ids[0], self._node_spec.hostnames[0]) else: polaris_node = PolarisNode(self._node_spec.node_ids[0], self._node_spec.hostnames[0]) cpu_ids = polaris_node.cpu_ids @@ -48,6 +50,13 @@ def _build_cmdline(self) -> str: cpu_bind_list.append(",") cid = str(cpu_ids[i + cpus_per_rank * irank]) cpu_bind_list.append(cid) + # If the job is using 2 hardware threads per core, we need to add those threads to the list + # The additional threads should go in the same ascending order (threads 0 and 32 are on the + # same physical core, threads 31 and 63 are on the same physical core) + if self._threads_per_core == 2: + cpu_bind_list.append(",") + cid = str(cpu_ids[i + cpus_per_rank * irank] + len(polaris_node.cpu_ids)) + cpu_bind_list.append(cid) cpu_bind = "".join(cpu_bind_list) if "CUDA_VISIBLE_DEVICES" in self._envs.keys(): gpu_device = self._envs["CUDA_VISIBLE_DEVICES"] @@ -62,6 +71,13 @@ def _build_cmdline(self) -> str: launch_params.append("--" + k) launch_params.append(str(self._launch_params[k])) + # The value of -d depends on the setting of cpu_bind. If cpu-bind=core, -d is the number of + # physical cores per rank, otherwise it is the number of hardware threads per rank + # https://docs.alcf.anl.gov/running-jobs/example-job-scripts/ + depth = self._threads_per_rank + if "core" in cpu_bind: + depth = self.get_cpus_per_rank() + nid_str = ",".join(map(str, node_ids)) args = [ "mpiexec", @@ -74,7 +90,7 @@ def _build_cmdline(self) -> str: "--cpu-bind", cpu_bind, "-d", - self.get_cpus_per_rank(), + depth, *launch_params, self._cmdline, ] From 937947ecfa7f483bb8ee789a6ba4c003f6bd7e28 Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Wed, 31 May 2023 22:41:17 -0500 Subject: [PATCH 14/36] Removed blank lines --- balsam/platform/app_run/app_run.py | 1 - balsam/platform/app_run/polaris.py | 2 -- balsam/platform/compute_node/alcf_polaris_node.py | 1 - 3 files changed, 4 deletions(-) diff --git a/balsam/platform/app_run/app_run.py b/balsam/platform/app_run/app_run.py index 7713b974..2aa06e39 100644 --- a/balsam/platform/app_run/app_run.py +++ b/balsam/platform/app_run/app_run.py @@ -68,7 +68,6 @@ def get_num_ranks(self) -> int: return self._ranks_per_node * len(self._node_spec.node_ids) def get_cpus_per_rank(self) -> int: - # Get the list of cpus assigned to the job. If it is a single node job, that is stored in # the NodeSpec object. If it is a multinode job, the cpu_ids assigned to NodeSpec is empty, # so we will assume all cpus on a compute node are available to the job. The list of cpus is diff --git a/balsam/platform/app_run/polaris.py b/balsam/platform/app_run/polaris.py index 5f59cd78..761878a5 100644 --- a/balsam/platform/app_run/polaris.py +++ b/balsam/platform/app_run/polaris.py @@ -24,7 +24,6 @@ def _build_cmdline(self) -> str: # 8 cpus per gpu. This code will not skip the appropriate number of cpus # in the rank binding assignments. if cpu_bind == "none" and self._gpus_per_rank > 0: - # Here we grab the cpu_ids assigned to the job in the NodeSpec object # If this is not set in NodeSpec (it is only set for single node jobs), # then we take the cpu_id list from the Polaris ComputeNode subclass, @@ -98,7 +97,6 @@ def _build_cmdline(self) -> str: # Overide default because sunspot does not use CUDA def _set_envs(self) -> None: - envs = os.environ.copy() envs.update(self._envs) diff --git a/balsam/platform/compute_node/alcf_polaris_node.py b/balsam/platform/compute_node/alcf_polaris_node.py index c2788e6d..208490a1 100644 --- a/balsam/platform/compute_node/alcf_polaris_node.py +++ b/balsam/platform/compute_node/alcf_polaris_node.py @@ -10,7 +10,6 @@ class PolarisNode(ComputeNode): - cpu_ids = list(range(32)) gpu_ids: List[IntStr] = list(range(4)) From 8d6f5f00f1fd2f14c703998e8955c7074640f478 Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Wed, 31 May 2023 22:51:30 -0500 Subject: [PATCH 15/36] lint fixes --- balsam/_api/model.py | 2 +- balsam/config/config.py | 11 ++++------- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/balsam/_api/model.py b/balsam/_api/model.py index 05d4831a..38ae2c48 100644 --- a/balsam/_api/model.py +++ b/balsam/_api/model.py @@ -186,7 +186,7 @@ def __repr__(self) -> str: def __str__(self) -> str: d = self.display_dict() - return yaml.dump(d, sort_keys=False, indent=4) # type: ignore + return yaml.dump(d, sort_keys=False, indent=4) def __eq__(self, other: Any) -> bool: if not isinstance(other, BalsamModel): diff --git a/balsam/config/config.py b/balsam/config/config.py index 5766afc5..00d95c69 100644 --- a/balsam/config/config.py +++ b/balsam/config/config.py @@ -235,13 +235,10 @@ def save(self, path: Union[str, Path]) -> None: fp.write(self.dump_yaml()) def dump_yaml(self) -> str: - return cast( - str, - yaml.dump( - json.loads(self.json()), - sort_keys=False, - indent=4, - ), + return yaml.dump( + json.loads(self.json()), + sort_keys=False, + indent=4, ) @classmethod From c57beb787ba78471dbf73950cb78434aaf0fc0a4 Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Thu, 1 Jun 2023 11:56:46 -0500 Subject: [PATCH 16/36] fix type error --- balsam/platform/app_run/app_run.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/balsam/platform/app_run/app_run.py b/balsam/platform/app_run/app_run.py index 2aa06e39..c4efb45b 100644 --- a/balsam/platform/app_run/app_run.py +++ b/balsam/platform/app_run/app_run.py @@ -75,7 +75,7 @@ def get_cpus_per_rank(self) -> int: cpu_ids = self._node_spec.cpu_ids[0] if not cpu_ids: compute_node = ComputeNode(self._node_spec.node_ids[0], self._node_spec.hostnames[0]) - cpu_ids = compute_node.cpu_ids + cpu_ids = list(compute_node.cpu_ids) cpus_per_node = len(cpu_ids) cpus_per_rank = cpus_per_node // self._ranks_per_node From 0691ed3d88c2170c4f355808e9c311d95145aecf Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Thu, 1 Jun 2023 12:00:49 -0500 Subject: [PATCH 17/36] fix type error --- balsam/platform/app_run/app_run.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/balsam/platform/app_run/app_run.py b/balsam/platform/app_run/app_run.py index c4efb45b..3c25c4f6 100644 --- a/balsam/platform/app_run/app_run.py +++ b/balsam/platform/app_run/app_run.py @@ -73,11 +73,11 @@ def get_cpus_per_rank(self) -> int: # so we will assume all cpus on a compute node are available to the job. The list of cpus is # just the list of cpus on the node in that case. cpu_ids = self._node_spec.cpu_ids[0] + cpus_per_node = len(cpu_ids) if not cpu_ids: compute_node = ComputeNode(self._node_spec.node_ids[0], self._node_spec.hostnames[0]) - cpu_ids = list(compute_node.cpu_ids) + cpus_per_node = len(compute_node.cpu_ids) - cpus_per_node = len(cpu_ids) cpus_per_rank = cpus_per_node // self._ranks_per_node # If ranks are oversubscribed to cpus (ranks_per_node > cpus_per_node), set it to a minimum of From 367541149ebab6cb0a926645e55e415258f8adda Mon Sep 17 00:00:00 2001 From: Bas van der Vlies Date: Fri, 9 Jun 2023 12:01:00 +0200 Subject: [PATCH 18/36] Problems with docker compose and the latest python:3-slim version We get all kind of errors, See: * https://github.com/argonne-lcf/balsam/issues/343 so pinned it on `FROM python:3.10-slim` --- Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index a916ee3a..c32c9f57 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM python:3-slim +FROM python:3.10-slim WORKDIR /balsam From ad0e661b68a9bfd9f5c37ee537c75dac48043bc5 Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Tue, 13 Jun 2023 16:28:22 -0500 Subject: [PATCH 19/36] made change to accept a user setting cpu_bind to none --- balsam/platform/app_run/polaris.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/balsam/platform/app_run/polaris.py b/balsam/platform/app_run/polaris.py index 761878a5..d18efb12 100644 --- a/balsam/platform/app_run/polaris.py +++ b/balsam/platform/app_run/polaris.py @@ -16,14 +16,14 @@ class PolarisRun(SubprocessAppRun): def _build_cmdline(self) -> str: node_ids = [h for h in self._node_spec.hostnames] - cpu_bind = self._launch_params.get("cpu_bind", "none") - - # If the user does not set a cpu_bind option and gpus are being used, + # If the user does not set a cpu_bind option, # this code sets cpu-bind to be optimal for the gpus being used. # This does not handle the case where the application is using less than # 8 cpus per gpu. This code will not skip the appropriate number of cpus # in the rank binding assignments. - if cpu_bind == "none" and self._gpus_per_rank > 0: + if "cpu_bind" in self._launch_params.keys(): + cpu_bind = self._launch_params.get("cpu_bind", "none") + else: # Here we grab the cpu_ids assigned to the job in the NodeSpec object # If this is not set in NodeSpec (it is only set for single node jobs), # then we take the cpu_id list from the Polaris ComputeNode subclass, @@ -57,12 +57,12 @@ def _build_cmdline(self) -> str: cid = str(cpu_ids[i + cpus_per_rank * irank] + len(polaris_node.cpu_ids)) cpu_bind_list.append(cid) cpu_bind = "".join(cpu_bind_list) - if "CUDA_VISIBLE_DEVICES" in self._envs.keys(): - gpu_device = self._envs["CUDA_VISIBLE_DEVICES"] - gpu_ids = gpu_device.split(",") - else: - gpu_ids = [] - logger.info(f"Polaris app_run: cpu_bind={cpu_bind} cpu_ids={cpu_ids} gpu_ids={gpu_ids}") + # if "CUDA_VISIBLE_DEVICES" in self._envs.keys(): + # gpu_device = self._envs["CUDA_VISIBLE_DEVICES"] + # gpu_ids = gpu_device.split(",") + # else: + # gpu_ids = [] + # logger.info(f"Polaris app_run: cpu_bind={cpu_bind} cpu_ids={cpu_ids} gpu_ids={gpu_ids}") launch_params = [] for k in self._launch_params.keys(): From ee582b30fdb605b87e7214dfe395a0f2cf98dfc0 Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Fri, 14 Jul 2023 13:50:38 -0500 Subject: [PATCH 20/36] updated theta launch params --- balsam/platform/app_run/theta.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/balsam/platform/app_run/theta.py b/balsam/platform/app_run/theta.py index 9576e5f4..614a7411 100644 --- a/balsam/platform/app_run/theta.py +++ b/balsam/platform/app_run/theta.py @@ -14,6 +14,13 @@ def _pre_popen(self) -> None: def _build_cmdline(self) -> str: node_ids = [nid for nid in self._node_spec.node_ids] nid_str = ",".join(map(str, node_ids)) + + launch_params = [] + for k in self._launch_params.keys(): + if k != "cpu_affinity": + launch_params.append(k) + launch_params.append(str(self._launch_params[k])) + cpu_affinity = self._launch_params.get("cpu_affinity", "none") if cpu_affinity not in ["none", "depth"]: cpu_affinity = "none" @@ -31,6 +38,7 @@ def _build_cmdline(self) -> str: self._threads_per_rank, "-j", self._threads_per_core, + *launch_params, self._cmdline, ] return " ".join(str(arg) for arg in args) From ea919c6145e11c3326c3c963ead1f0114494e514 Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Fri, 14 Jul 2023 14:34:14 -0500 Subject: [PATCH 21/36] removed white space --- balsam/platform/app_run/theta.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/balsam/platform/app_run/theta.py b/balsam/platform/app_run/theta.py index 614a7411..e79b3167 100644 --- a/balsam/platform/app_run/theta.py +++ b/balsam/platform/app_run/theta.py @@ -14,13 +14,13 @@ def _pre_popen(self) -> None: def _build_cmdline(self) -> str: node_ids = [nid for nid in self._node_spec.node_ids] nid_str = ",".join(map(str, node_ids)) - + launch_params = [] for k in self._launch_params.keys(): if k != "cpu_affinity": launch_params.append(k) launch_params.append(str(self._launch_params[k])) - + cpu_affinity = self._launch_params.get("cpu_affinity", "none") if cpu_affinity not in ["none", "depth"]: cpu_affinity = "none" From a56bcdc1ea63666a112c8e0edf20cf96068ea44e Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Wed, 19 Jul 2023 15:32:52 -0500 Subject: [PATCH 22/36] allow session to sort jobs according to parameter passed in optional params --- balsam/server/models/crud/sessions.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/balsam/server/models/crud/sessions.py b/balsam/server/models/crud/sessions.py index ff40985a..40c53713 100644 --- a/balsam/server/models/crud/sessions.py +++ b/balsam/server/models/crud/sessions.py @@ -192,6 +192,18 @@ def acquire( .limit(spec.max_num_jobs) .with_for_update(of=models.Job.__table__, skip_locked=True) ) + if "sort_walltime_first" in models.BatchJob.optional_params.keys(): + if models.BatchJob.optional_params["sort_walltime_first"]: + lock_ids_q = ( + job_q.with_only_columns([models.Job.id]) + .order_by( + models.Job.wall_time_min.desc(), + models.Job.node_packing_count.desc(), + models.Job.num_nodes.asc(), + ) + .limit(spec.max_num_jobs) + .with_for_update(of=models.Job.__table__, skip_locked=True) + ) locked_ids = db.execute(lock_ids_q).scalars().all() subq = select(models.Job.__table__, _footprint_func()).where(models.Job.id.in_(locked_ids)).subquery() # type: ignore From 58a3b4476b7abe5d81e9a9fbb3638e51a80a90c8 Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Fri, 21 Jul 2023 11:06:26 -0500 Subject: [PATCH 23/36] change to SessionAcquire --- balsam/_api/bases.py | 1 + balsam/schemas/session.py | 1 + balsam/server/models/crud/sessions.py | 41 +++++++++++---------- balsam/site/job_source.py | 4 ++ balsam/site/launcher/_mpi_mode.py | 1 + balsam/site/launcher/_serial_mode_master.py | 1 + 6 files changed, 29 insertions(+), 20 deletions(-) diff --git a/balsam/_api/bases.py b/balsam/_api/bases.py index e7560bde..ccda194e 100644 --- a/balsam/_api/bases.py +++ b/balsam/_api/bases.py @@ -370,6 +370,7 @@ def acquire_jobs( max_nodes_per_job: Optional[int] = None, max_aggregate_nodes: Optional[float] = None, serial_only: bool = False, + sort_by: Optional[str] = None, filter_tags: Optional[Dict[str, str]] = None, states: Set[JobState] = RUNNABLE_STATES, app_ids: Optional[Set[int]] = None, diff --git a/balsam/schemas/session.py b/balsam/schemas/session.py index e978c7f4..31fb0298 100644 --- a/balsam/schemas/session.py +++ b/balsam/schemas/session.py @@ -29,6 +29,7 @@ class SessionAcquire(BaseModel): max_nodes_per_job: Optional[int] max_aggregate_nodes: Optional[float] serial_only: bool = False + sort_by: Optional[str] = None filter_tags: Dict[str, str] states: Set[JobState] = RUNNABLE_STATES app_ids: Set[int] = set() diff --git a/balsam/server/models/crud/sessions.py b/balsam/server/models/crud/sessions.py index 40c53713..ddedce53 100644 --- a/balsam/server/models/crud/sessions.py +++ b/balsam/server/models/crud/sessions.py @@ -182,28 +182,29 @@ def acquire( return _acquire_jobs(db, job_q, session) # MPI Mode Launcher will take this path: - lock_ids_q = ( - job_q.with_only_columns([models.Job.id]) - .order_by( - models.Job.num_nodes.asc(), - models.Job.node_packing_count.desc(), - models.Job.wall_time_min.desc(), + if spec.sort_by == "wall_time": + lock_ids_q = ( + job_q.with_only_columns([models.Job.id]) + .order_by( + models.Job.wall_time_min.desc(), + models.Job.node_packing_count.desc(), + models.Job.num_nodes.asc(), + ) + .limit(spec.max_num_jobs) + .with_for_update(of=models.Job.__table__, skip_locked=True) ) - .limit(spec.max_num_jobs) - .with_for_update(of=models.Job.__table__, skip_locked=True) - ) - if "sort_walltime_first" in models.BatchJob.optional_params.keys(): - if models.BatchJob.optional_params["sort_walltime_first"]: - lock_ids_q = ( - job_q.with_only_columns([models.Job.id]) - .order_by( - models.Job.wall_time_min.desc(), - models.Job.node_packing_count.desc(), - models.Job.num_nodes.asc(), - ) - .limit(spec.max_num_jobs) - .with_for_update(of=models.Job.__table__, skip_locked=True) + else: + lock_ids_q = ( + job_q.with_only_columns([models.Job.id]) + .order_by( + models.Job.num_nodes.asc(), + models.Job.node_packing_count.desc(), + models.Job.wall_time_min.desc(), ) + .limit(spec.max_num_jobs) + .with_for_update(of=models.Job.__table__, skip_locked=True) + ) + locked_ids = db.execute(lock_ids_q).scalars().all() subq = select(models.Job.__table__, _footprint_func()).where(models.Job.id.in_(locked_ids)).subquery() # type: ignore diff --git a/balsam/site/job_source.py b/balsam/site/job_source.py index fe4a6bef..9fa878da 100644 --- a/balsam/site/job_source.py +++ b/balsam/site/job_source.py @@ -72,6 +72,7 @@ def __init__( filter_tags: Optional[Dict[str, str]] = None, states: Set[str] = {"PREPROCESSED", "RESTART_READY"}, serial_only: bool = False, + sort_by: Optional[str] = None, max_wall_time_min: Optional[int] = None, max_nodes_per_job: Optional[int] = None, max_aggregate_nodes: Optional[float] = None, @@ -158,6 +159,7 @@ def _get_acquire_parameters(self, num_jobs: int) -> Dict[str, Any]: max_aggregate_nodes=self.max_aggregate_nodes, max_wall_time_min=request_time, serial_only=self.serial_only, + sort_by=self.sort_by, filter_tags=self.filter_tags, states=self.states, app_ids=self.app_ids, @@ -182,6 +184,7 @@ def __init__( filter_tags: Optional[Dict[str, str]] = None, states: Set[JobState] = {JobState.preprocessed, JobState.restart_ready}, serial_only: bool = False, + sort_by: Optional[str] = None, max_wall_time_min: Optional[int] = None, scheduler_id: Optional[int] = None, app_ids: Optional[Set[int]] = None, @@ -229,6 +232,7 @@ def get_jobs( max_aggregate_nodes=max_aggregate_nodes, max_wall_time_min=request_time, serial_only=self.serial_only, + sort_by=self.sort_by, filter_tags=self.filter_tags, states=self.states, app_ids=self.app_ids, diff --git a/balsam/site/launcher/_mpi_mode.py b/balsam/site/launcher/_mpi_mode.py index c3662984..6948fa59 100644 --- a/balsam/site/launcher/_mpi_mode.py +++ b/balsam/site/launcher/_mpi_mode.py @@ -275,6 +275,7 @@ def main( client=site_config.client, site_id=site_config.site_id, filter_tags=filter_tags_dict, + sort_by=site_config.settings.launcher.sort_by_quantity, max_wall_time_min=wall_time_min, scheduler_id=scheduler_id, ) diff --git a/balsam/site/launcher/_serial_mode_master.py b/balsam/site/launcher/_serial_mode_master.py index 558fdc9b..29248d84 100644 --- a/balsam/site/launcher/_serial_mode_master.py +++ b/balsam/site/launcher/_serial_mode_master.py @@ -237,6 +237,7 @@ def master_main(wall_time_min: int, master_port: int, log_filename: str, num_wor max_wall_time_min=wall_time_min, scheduler_id=scheduler_id, serial_only=True, + sort_by=site_config.settings.launcher.sort_by_quantity, max_nodes_per_job=1, ) status_updater = BulkStatusUpdater(site_config.client) From 4ef3e75abd3b3760e2c8495e31ea70df087990a7 Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Fri, 21 Jul 2023 13:30:40 -0500 Subject: [PATCH 24/36] change to sort_by option --- balsam/server/models/crud/sessions.py | 4 ++-- balsam/site/launcher/_mpi_mode.py | 2 +- balsam/site/launcher/_serial_mode_master.py | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/balsam/server/models/crud/sessions.py b/balsam/server/models/crud/sessions.py index ddedce53..80d27644 100644 --- a/balsam/server/models/crud/sessions.py +++ b/balsam/server/models/crud/sessions.py @@ -182,13 +182,13 @@ def acquire( return _acquire_jobs(db, job_q, session) # MPI Mode Launcher will take this path: - if spec.sort_by == "wall_time": + if spec.sort_by == "long_large_first": lock_ids_q = ( job_q.with_only_columns([models.Job.id]) .order_by( models.Job.wall_time_min.desc(), + models.Job.num_nodes.desc(), models.Job.node_packing_count.desc(), - models.Job.num_nodes.asc(), ) .limit(spec.max_num_jobs) .with_for_update(of=models.Job.__table__, skip_locked=True) diff --git a/balsam/site/launcher/_mpi_mode.py b/balsam/site/launcher/_mpi_mode.py index 6948fa59..1792fead 100644 --- a/balsam/site/launcher/_mpi_mode.py +++ b/balsam/site/launcher/_mpi_mode.py @@ -275,7 +275,7 @@ def main( client=site_config.client, site_id=site_config.site_id, filter_tags=filter_tags_dict, - sort_by=site_config.settings.launcher.sort_by_quantity, + sort_by=site_config.settings.launcher.sort_by, max_wall_time_min=wall_time_min, scheduler_id=scheduler_id, ) diff --git a/balsam/site/launcher/_serial_mode_master.py b/balsam/site/launcher/_serial_mode_master.py index 29248d84..ccd7ccd1 100644 --- a/balsam/site/launcher/_serial_mode_master.py +++ b/balsam/site/launcher/_serial_mode_master.py @@ -237,7 +237,7 @@ def master_main(wall_time_min: int, master_port: int, log_filename: str, num_wor max_wall_time_min=wall_time_min, scheduler_id=scheduler_id, serial_only=True, - sort_by=site_config.settings.launcher.sort_by_quantity, + sort_by=site_config.settings.launcher.sort_by, max_nodes_per_job=1, ) status_updater = BulkStatusUpdater(site_config.client) From babca798bf8849658f2c359db83c879a748c9e54 Mon Sep 17 00:00:00 2001 From: Christine Simpson <48525133+cms21@users.noreply.github.com> Date: Mon, 24 Jul 2023 11:19:01 -0500 Subject: [PATCH 25/36] Update config.py --- balsam/config/config.py | 1 + 1 file changed, 1 insertion(+) diff --git a/balsam/config/config.py b/balsam/config/config.py index 00d95c69..e4c8f066 100644 --- a/balsam/config/config.py +++ b/balsam/config/config.py @@ -193,6 +193,7 @@ class LauncherSettings(BaseSettings): local_app_launcher: Type[AppRun] = Field("balsam.platform.app_run.LocalAppRun") mpirun_allows_node_packing: bool = False serial_mode_prefetch_per_rank: int = 64 + sort_by: Optional[str] = None serial_mode_startup_params: Dict[str, str] = {"cpu_affinity": "none"} @validator("compute_node", pre=True, always=True) From 08d0986906f18964139fb8304e19f7f2cf0bb5f6 Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Tue, 25 Jul 2023 18:48:51 +0000 Subject: [PATCH 26/36] updates --- balsam/_api/bases.py | 1 + balsam/server/models/crud/sessions.py | 36 +++++++++++++++++++-------- balsam/site/job_source.py | 2 ++ balsam/site/launcher/_mpi_mode.py | 3 +++ 4 files changed, 31 insertions(+), 11 deletions(-) diff --git a/balsam/_api/bases.py b/balsam/_api/bases.py index ccda194e..0ee4840e 100644 --- a/balsam/_api/bases.py +++ b/balsam/_api/bases.py @@ -386,6 +386,7 @@ def acquire_jobs( max_nodes_per_job=max_nodes_per_job, max_aggregate_nodes=max_aggregate_nodes, serial_only=serial_only, + sort_by=sort_by, filter_tags=filter_tags, states=states, app_ids=app_ids, diff --git a/balsam/server/models/crud/sessions.py b/balsam/server/models/crud/sessions.py index 80d27644..d2226d9b 100644 --- a/balsam/server/models/crud/sessions.py +++ b/balsam/server/models/crud/sessions.py @@ -130,20 +130,34 @@ def _acquire_jobs(db: orm.Session, job_q: Select, session: models.Session) -> Li return acquired_jobs -def _footprint_func() -> Any: +def _footprint_func(spec: schemas.SessionAcquire) -> Any: footprint = cast(models.Job.num_nodes, Float) / cast(models.Job.node_packing_count, Float) - return ( - func.sum(footprint) - .over( - order_by=( - models.Job.num_nodes.asc(), - models.Job.node_packing_count.desc(), - models.Job.wall_time_min.desc(), - models.Job.id.asc(), + if spec.sort_by == "long_large_first": + return ( + func.sum(footprint) + .over( + order_by=( + models.Job.wall_time_min.desc(), + models.Job.num_nodes.desc(), + models.Job.node_packing_count.desc(), + models.Job.id.asc(), + ) ) + .label("aggregate_footprint") + ) + else: + return ( + func.sum(footprint) + .over( + order_by=( + models.Job.num_nodes.asc(), + models.Job.node_packing_count.desc(), + models.Job.wall_time_min.desc(), + models.Job.id.asc(), + ) + ) + .label("aggregate_footprint") ) - .label("aggregate_footprint") - ) def acquire( diff --git a/balsam/site/job_source.py b/balsam/site/job_source.py index 9fa878da..537969a3 100644 --- a/balsam/site/job_source.py +++ b/balsam/site/job_source.py @@ -91,6 +91,7 @@ def __init__( self.app_ids = set() if app_ids is None else app_ids self.states = states self.serial_only = serial_only + self.sort_by = sort_by self.max_wall_time_min = max_wall_time_min self.max_nodes_per_job = max_nodes_per_job self.max_aggregate_nodes = max_aggregate_nodes @@ -195,6 +196,7 @@ def __init__( self.app_ids = set() if app_ids is None else app_ids self.states = states self.serial_only = serial_only + self.sort_by = sort_by self.max_wall_time_min = max_wall_time_min self.start_time = time.time() diff --git a/balsam/site/launcher/_mpi_mode.py b/balsam/site/launcher/_mpi_mode.py index 1792fead..7b430d40 100644 --- a/balsam/site/launcher/_mpi_mode.py +++ b/balsam/site/launcher/_mpi_mode.py @@ -127,6 +127,7 @@ def acquire_jobs(self) -> List["Job"]: def launch_runs(self) -> None: acquired = self.acquire_jobs() acquired.extend(self.job_stash) + logger.info(f"acquired jobs: {acquired}") self.job_stash = [] for job in acquired: assert job.id is not None @@ -271,6 +272,8 @@ def main( ) scheduler_id = node_cls.get_scheduler_id() + + job_source = SynchronousJobSource( client=site_config.client, site_id=site_config.site_id, From 172f900b6309161f022bf6cefce9990fd69dbe4e Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Tue, 25 Jul 2023 17:08:56 -0500 Subject: [PATCH 27/36] made arg function --- balsam/server/models/crud/sessions.py | 72 ++++++++++----------------- 1 file changed, 26 insertions(+), 46 deletions(-) diff --git a/balsam/server/models/crud/sessions.py b/balsam/server/models/crud/sessions.py index d2226d9b..e32d2bb0 100644 --- a/balsam/server/models/crud/sessions.py +++ b/balsam/server/models/crud/sessions.py @@ -130,34 +130,28 @@ def _acquire_jobs(db: orm.Session, job_q: Select, session: models.Session) -> Li return acquired_jobs -def _footprint_func(spec: schemas.SessionAcquire) -> Any: - footprint = cast(models.Job.num_nodes, Float) / cast(models.Job.node_packing_count, Float) +def _order_args(spec: schemas.SessionAcquire) -> Any: if spec.sort_by == "long_large_first": - return ( - func.sum(footprint) - .over( - order_by=( - models.Job.wall_time_min.desc(), - models.Job.num_nodes.desc(), - models.Job.node_packing_count.desc(), - models.Job.id.asc(), - ) - ) - .label("aggregate_footprint") + order_args = ( + models.Job.wall_time_min.desc(), + models.Job.num_nodes.desc(), + models.Job.node_packing_count.desc(), + models.Job.id.asc(), ) else: - return ( - func.sum(footprint) - .over( - order_by=( - models.Job.num_nodes.asc(), - models.Job.node_packing_count.desc(), - models.Job.wall_time_min.desc(), - models.Job.id.asc(), - ) - ) - .label("aggregate_footprint") + order_args = ( + models.Job.num_nodes.asc(), + models.Job.node_packing_count.desc(), + models.Job.wall_time_min.desc(), + models.Job.id.asc(), ) + return order_args + + +def _footprint_func(spec: schemas.SessionAcquire) -> Any: + footprint = cast(models.Job.num_nodes, Float) / cast(models.Job.node_packing_count, Float) + order_args = _order_args(spec) + return func.sum(footprint).over(order_by=(*order_args)).label("aggregate_footprint") def acquire( @@ -196,28 +190,14 @@ def acquire( return _acquire_jobs(db, job_q, session) # MPI Mode Launcher will take this path: - if spec.sort_by == "long_large_first": - lock_ids_q = ( - job_q.with_only_columns([models.Job.id]) - .order_by( - models.Job.wall_time_min.desc(), - models.Job.num_nodes.desc(), - models.Job.node_packing_count.desc(), - ) - .limit(spec.max_num_jobs) - .with_for_update(of=models.Job.__table__, skip_locked=True) - ) - else: - lock_ids_q = ( - job_q.with_only_columns([models.Job.id]) - .order_by( - models.Job.num_nodes.asc(), - models.Job.node_packing_count.desc(), - models.Job.wall_time_min.desc(), - ) - .limit(spec.max_num_jobs) - .with_for_update(of=models.Job.__table__, skip_locked=True) - ) + order_args = _order_args(spec) + + lock_ids_q = ( + job_q.with_only_columns([models.Job.id]) + .order_by(*order_args) + .limit(spec.max_num_jobs) + .with_for_update(of=models.Job.__table__, skip_locked=True) + ) locked_ids = db.execute(lock_ids_q).scalars().all() From 54ccb1b9440614c399176d38ceeb8fad1b42c026 Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Mon, 31 Jul 2023 13:14:30 -0500 Subject: [PATCH 28/36] undo changes --- balsam/server/models/crud/sessions.py | 67 ++++++++++++++------------- 1 file changed, 36 insertions(+), 31 deletions(-) diff --git a/balsam/server/models/crud/sessions.py b/balsam/server/models/crud/sessions.py index e32d2bb0..5834b55f 100644 --- a/balsam/server/models/crud/sessions.py +++ b/balsam/server/models/crud/sessions.py @@ -129,30 +129,20 @@ def _acquire_jobs(db: orm.Session, job_q: Select, session: models.Session) -> Li logger.debug(f"Acquired {len(acquired_jobs)} jobs") return acquired_jobs - -def _order_args(spec: schemas.SessionAcquire) -> Any: - if spec.sort_by == "long_large_first": - order_args = ( - models.Job.wall_time_min.desc(), - models.Job.num_nodes.desc(), - models.Job.node_packing_count.desc(), - models.Job.id.asc(), - ) - else: - order_args = ( - models.Job.num_nodes.asc(), - models.Job.node_packing_count.desc(), - models.Job.wall_time_min.desc(), - models.Job.id.asc(), - ) - return order_args - - -def _footprint_func(spec: schemas.SessionAcquire) -> Any: +def _footprint_func() -> Any: footprint = cast(models.Job.num_nodes, Float) / cast(models.Job.node_packing_count, Float) - order_args = _order_args(spec) - return func.sum(footprint).over(order_by=(*order_args)).label("aggregate_footprint") - + return ( + func.sum(footprint) + .over( + order_by=( + models.Job.num_nodes.asc(), + models.Job.node_packing_count.desc(), + models.Job.wall_time_min.desc(), + models.Job.id.asc(), + ) + ) + .label("aggregate_footprint") + ) def acquire( db: Session, owner: schemas.UserOut, session_id: int, spec: schemas.SessionAcquire @@ -190,14 +180,29 @@ def acquire( return _acquire_jobs(db, job_q, session) # MPI Mode Launcher will take this path: - order_args = _order_args(spec) - - lock_ids_q = ( - job_q.with_only_columns([models.Job.id]) - .order_by(*order_args) - .limit(spec.max_num_jobs) - .with_for_update(of=models.Job.__table__, skip_locked=True) - ) + + if spec.sort_by == "long_large_first": + lock_ids_q = ( + job_q.with_only_columns([models.Job.id]) + .order_by( + models.Job.wall_time_min.desc(), + models.Job.num_nodes.desc(), + models.Job.node_packing_count.desc(), + ) + .limit(spec.max_num_jobs) + .with_for_update(of=models.Job.__table__, skip_locked=True) + ) + else: + lock_ids_q = ( + job_q.with_only_columns([models.Job.id]) + .order_by( + models.Job.num_nodes.asc(), + models.Job.node_packing_count.desc(), + models.Job.wall_time_min.desc(), + ) + .limit(spec.max_num_jobs) + .with_for_update(of=models.Job.__table__, skip_locked=True) + ) locked_ids = db.execute(lock_ids_q).scalars().all() From 14d39b3d7b3636f808f4cd854bfc94b89b116f78 Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Tue, 1 Aug 2023 15:13:26 -0500 Subject: [PATCH 29/36] added new window function for node footprint calc --- balsam/server/models/crud/sessions.py | 30 ++++++++++++++++++++++++--- 1 file changed, 27 insertions(+), 3 deletions(-) diff --git a/balsam/server/models/crud/sessions.py b/balsam/server/models/crud/sessions.py index 5834b55f..3aec679c 100644 --- a/balsam/server/models/crud/sessions.py +++ b/balsam/server/models/crud/sessions.py @@ -114,6 +114,7 @@ def create(db: Session, owner: schemas.UserOut, session: schemas.SessionCreate) def _acquire_jobs(db: orm.Session, job_q: Select, session: models.Session) -> List[Dict[str, Any]]: acquired_jobs = [{str(key): value for key, value in job.items()} for job in db.execute(job_q).mappings()] acquired_ids = [job["id"] for job in acquired_jobs] + # logger.info(f"*** in _acquire_jobs acquired_ids={acquired_ids}") stmt = update(models.Job.__table__).where(models.Job.id.in_(acquired_ids)).values(session_id=session.id) @@ -129,7 +130,8 @@ def _acquire_jobs(db: orm.Session, job_q: Select, session: models.Session) -> Li logger.debug(f"Acquired {len(acquired_jobs)} jobs") return acquired_jobs -def _footprint_func() -> Any: + +def _footprint_func_nodes() -> Any: footprint = cast(models.Job.num_nodes, Float) / cast(models.Job.node_packing_count, Float) return ( func.sum(footprint) @@ -144,6 +146,22 @@ def _footprint_func() -> Any: .label("aggregate_footprint") ) +def _footprint_func_walltime() -> Any: + footprint = cast(models.Job.num_nodes, Float) / cast(models.Job.node_packing_count, Float) + return ( + func.sum(footprint) + .over( + order_by=( + models.Job.wall_time_min.desc(), + models.Job.num_nodes.desc(), + models.Job.node_packing_count.desc(), + models.Job.id.asc(), + ) + ) + .label("aggregate_footprint") + ) + + def acquire( db: Session, owner: schemas.UserOut, session_id: int, spec: schemas.SessionAcquire ) -> List[Dict[str, Any]]: @@ -180,7 +198,7 @@ def acquire( return _acquire_jobs(db, job_q, session) # MPI Mode Launcher will take this path: - + # logger.info(f"*** In session.acquire: spec.sort_by = {spec.sort_by}") if spec.sort_by == "long_large_first": lock_ids_q = ( job_q.with_only_columns([models.Job.id]) @@ -205,10 +223,16 @@ def acquire( ) locked_ids = db.execute(lock_ids_q).scalars().all() + # logger.info(f"*** locked_ids: {locked_ids}") + if spec.sort_by == "long_large_first": + subq = select(models.Job.__table__, _footprint_func_walltime()).where(models.Job.id.in_(locked_ids)).subquery() # type: ignore + else: + subq = select(models.Job.__table__, _footprint_func_nodes()).where(models.Job.id.in_(locked_ids)).subquery() # type: ignore - subq = select(models.Job.__table__, _footprint_func()).where(models.Job.id.in_(locked_ids)).subquery() # type: ignore + # logger.info(f"*** max_aggregate_nodes: {spec.max_aggregate_nodes}") cols = [c for c in subq.c if c.name not in ["aggregate_footprint", "session_id"]] job_q = select(cols).where(subq.c.aggregate_footprint <= spec.max_aggregate_nodes) + return _acquire_jobs(db, job_q, session) From 7ab9cf870d0b3cec3e9332e5245b18e572fc9562 Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Tue, 1 Aug 2023 15:37:41 -0500 Subject: [PATCH 30/36] lint fixes --- balsam/site/launcher/_mpi_mode.py | 1 - tests/server/test_sites.py | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/balsam/site/launcher/_mpi_mode.py b/balsam/site/launcher/_mpi_mode.py index 7b430d40..3e484dfd 100644 --- a/balsam/site/launcher/_mpi_mode.py +++ b/balsam/site/launcher/_mpi_mode.py @@ -273,7 +273,6 @@ def main( scheduler_id = node_cls.get_scheduler_id() - job_source = SynchronousJobSource( client=site_config.client, site_id=site_config.site_id, diff --git a/tests/server/test_sites.py b/tests/server/test_sites.py index 5541bc45..f97d217b 100644 --- a/tests/server/test_sites.py +++ b/tests/server/test_sites.py @@ -12,7 +12,7 @@ def test_create_site(auth_client): name="thetalogin3.alcf.anl.gov", path="/projects/myProject/balsam-site", ) - assert type(posted_site["id"]) == int + assert type(posted_site["id"]) is int site_list = auth_client.get("/sites/")["results"] assert isinstance(site_list, list) assert len(site_list) == 1 From 0e224987ac59fa688464aefe29b380f3f77d0c17 Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Tue, 1 Aug 2023 15:40:52 -0500 Subject: [PATCH 31/36] lint fixes --- tests/server/test_sites.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/server/test_sites.py b/tests/server/test_sites.py index f97d217b..e55b3ace 100644 --- a/tests/server/test_sites.py +++ b/tests/server/test_sites.py @@ -12,7 +12,7 @@ def test_create_site(auth_client): name="thetalogin3.alcf.anl.gov", path="/projects/myProject/balsam-site", ) - assert type(posted_site["id"]) is int + assert isinstance(posted_site["id"], int) site_list = auth_client.get("/sites/")["results"] assert isinstance(site_list, list) assert len(site_list) == 1 From 063a48b284465621d83f0b9ed35c2746a742e672 Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Tue, 1 Aug 2023 15:45:42 -0500 Subject: [PATCH 32/36] lint fixes --- tests/server/test_auth.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/server/test_auth.py b/tests/server/test_auth.py index d600c60e..a9aedad2 100644 --- a/tests/server/test_auth.py +++ b/tests/server/test_auth.py @@ -12,7 +12,7 @@ def test_unauth_user_cannot_view_sites(anon_client): def test_register(anon_client): login_credentials = {"username": f"user{uuid4()}", "password": "foo"} resp = anon_client.post("/" + urls.PASSWORD_REGISTER, **login_credentials) - assert type(resp["id"]) == int + assert isinstance(resp["id"], int) assert resp["username"] == login_credentials["username"] From ff9dd8cda83ee1275eb0fdd6560526c555ab335d Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Tue, 1 Aug 2023 16:21:15 -0500 Subject: [PATCH 33/36] lint fixes --- balsam/server/models/crud/sessions.py | 1 + 1 file changed, 1 insertion(+) diff --git a/balsam/server/models/crud/sessions.py b/balsam/server/models/crud/sessions.py index 3aec679c..8862e6f3 100644 --- a/balsam/server/models/crud/sessions.py +++ b/balsam/server/models/crud/sessions.py @@ -146,6 +146,7 @@ def _footprint_func_nodes() -> Any: .label("aggregate_footprint") ) + def _footprint_func_walltime() -> Any: footprint = cast(models.Job.num_nodes, Float) / cast(models.Job.node_packing_count, Float) return ( From 6a10eb72b8d421cce25a6166b90abf029d49596e Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Fri, 4 Aug 2023 16:52:42 -0500 Subject: [PATCH 34/36] polaris app_run cleanup --- balsam/platform/app_run/polaris.py | 26 ++++++++------------------ 1 file changed, 8 insertions(+), 18 deletions(-) diff --git a/balsam/platform/app_run/polaris.py b/balsam/platform/app_run/polaris.py index d18efb12..3bc86a65 100644 --- a/balsam/platform/app_run/polaris.py +++ b/balsam/platform/app_run/polaris.py @@ -22,26 +22,24 @@ def _build_cmdline(self) -> str: # 8 cpus per gpu. This code will not skip the appropriate number of cpus # in the rank binding assignments. if "cpu_bind" in self._launch_params.keys(): - cpu_bind = self._launch_params.get("cpu_bind", "none") + cpu_bind = self._launch_params.get("cpu_bind") + elif "--cpu-bind" in self._launch_params.keys(): + cpu_bind = self._launch_params.get("--cpu-bind") else: # Here we grab the cpu_ids assigned to the job in the NodeSpec object # If this is not set in NodeSpec (it is only set for single node jobs), # then we take the cpu_id list from the Polaris ComputeNode subclass, # assuming the job will have use of all the cpus in nodes assigned to it. - cpu_ids_ns = self._node_spec.cpu_ids[0] - if cpu_ids_ns: - cpu_ids = self._node_spec.cpu_ids[0] - if self._threads_per_core == 2: - polaris_node = PolarisNode(self._node_spec.node_ids[0], self._node_spec.hostnames[0]) - else: - polaris_node = PolarisNode(self._node_spec.node_ids[0], self._node_spec.hostnames[0]) + cpu_ids = self._node_spec.cpu_ids[0] + polaris_node = PolarisNode(self._node_spec.node_ids[0], self._node_spec.hostnames[0]) + if not cpu_ids: cpu_ids = polaris_node.cpu_ids cpus_per_rank = self.get_cpus_per_rank() # PolarisNode reverses the order of the gpu_ids, so assigning the cpu-bind # in ascending cpu order is what we want here. - cpu_bind_list = ["verbose,list"] + cpu_bind_list = ["list"] for irank in range(self._ranks_per_node): cpu_bind_list.append(":") for i in range(cpus_per_rank): @@ -57,17 +55,10 @@ def _build_cmdline(self) -> str: cid = str(cpu_ids[i + cpus_per_rank * irank] + len(polaris_node.cpu_ids)) cpu_bind_list.append(cid) cpu_bind = "".join(cpu_bind_list) - # if "CUDA_VISIBLE_DEVICES" in self._envs.keys(): - # gpu_device = self._envs["CUDA_VISIBLE_DEVICES"] - # gpu_ids = gpu_device.split(",") - # else: - # gpu_ids = [] - # logger.info(f"Polaris app_run: cpu_bind={cpu_bind} cpu_ids={cpu_ids} gpu_ids={gpu_ids}") launch_params = [] for k in self._launch_params.keys(): - if k != "cpu_bind": - launch_params.append("--" + k) + if k != "cpu_bind" and k != "--cpu-bind": launch_params.append(str(self._launch_params[k])) # The value of -d depends on the setting of cpu_bind. If cpu-bind=core, -d is the number of @@ -95,7 +86,6 @@ def _build_cmdline(self) -> str: ] return " ".join(str(arg) for arg in args) - # Overide default because sunspot does not use CUDA def _set_envs(self) -> None: envs = os.environ.copy() envs.update(self._envs) From 020ae447d1ade936cc71f4e4e243c80fab08ae09 Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Fri, 4 Aug 2023 16:58:11 -0500 Subject: [PATCH 35/36] lint fix --- balsam/platform/app_run/polaris.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/balsam/platform/app_run/polaris.py b/balsam/platform/app_run/polaris.py index 3bc86a65..20f6ea22 100644 --- a/balsam/platform/app_run/polaris.py +++ b/balsam/platform/app_run/polaris.py @@ -65,7 +65,7 @@ def _build_cmdline(self) -> str: # physical cores per rank, otherwise it is the number of hardware threads per rank # https://docs.alcf.anl.gov/running-jobs/example-job-scripts/ depth = self._threads_per_rank - if "core" in cpu_bind: + if "core" == cpu_bind: depth = self.get_cpus_per_rank() nid_str = ",".join(map(str, node_ids)) From 273c95ed407177193f92252ae49680c86c140744 Mon Sep 17 00:00:00 2001 From: Christine Simpson <48525133+cms21@users.noreply.github.com> Date: Tue, 8 Aug 2023 10:14:07 -0500 Subject: [PATCH 36/36] Removing unnecessary logging from _mpi_mode.py --- balsam/site/launcher/_mpi_mode.py | 1 - 1 file changed, 1 deletion(-) diff --git a/balsam/site/launcher/_mpi_mode.py b/balsam/site/launcher/_mpi_mode.py index 3e484dfd..05f6e957 100644 --- a/balsam/site/launcher/_mpi_mode.py +++ b/balsam/site/launcher/_mpi_mode.py @@ -127,7 +127,6 @@ def acquire_jobs(self) -> List["Job"]: def launch_runs(self) -> None: acquired = self.acquire_jobs() acquired.extend(self.job_stash) - logger.info(f"acquired jobs: {acquired}") self.job_stash = [] for job in acquired: assert job.id is not None