From b55c1cfca58cdf055bfbe6ad5ca29c133efe8005 Mon Sep 17 00:00:00 2001 From: Moritz Gunz Date: Thu, 24 Oct 2024 09:46:18 -0400 Subject: [PATCH 1/3] Support multi-node trainings in `ReturnnTrainingJob` --- returnn/training.py | 87 +++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 81 insertions(+), 6 deletions(-) diff --git a/returnn/training.py b/returnn/training.py index 03e17127..505697c3 100644 --- a/returnn/training.py +++ b/returnn/training.py @@ -16,6 +16,7 @@ import sys import os import shutil +import socket import subprocess as sp import numpy as np from typing import Dict, Sequence, Iterable, List, Optional, Union @@ -242,7 +243,7 @@ def __init__( self.rqmt["gpu"] *= self.horovod_num_processes // (self.multi_node_slots or 1) self.rqmt["mem"] *= self.horovod_num_processes // (self.multi_node_slots or 1) - def _get_run_cmd(self): + def _get_run_cmd(self, rdzv_node_addr: Optional[str] = None): run_cmd = [ self.returnn_python_exe.get_path(), self.returnn_root.join_right("rnn.py").get_path(), @@ -255,11 +256,19 @@ def _get_run_cmd(self): # Instead of using the torchrun binary, directly execute the corresponding Python module # and directly use the correct Python environment. prefix = [self.returnn_python_exe.get_path(), "-mtorch.distributed.run"] - if (self.multi_node_slots or 1) == 1: + mn_slots = self.multi_node_slots or 1 + if mn_slots == 1: prefix += ["--standalone"] + elif mn_slots > 1: + rdzv_id = self.job_id().replace("/", "_") + prefix += [ + f'--rdzv-id="{rdzv_id}"', + "--rdzv-backend=c10d", + f'--rdzv-endpoint={rdzv_node_addr or "$RDZV_NODE_ADDR"}', + ] prefix += [ - f"--nnodes={self.multi_node_slots or 1}", - f"--nproc-per-node={self.horovod_num_processes}", + f"--nnodes={mn_slots}", + f"--nproc-per-node={self.horovod_num_processes // mn_slots}", ] run_cmd = prefix + run_cmd[1:] elif self.distributed_launch_cmd == "mpirun": @@ -379,12 +388,78 @@ def run(self): print(f.read()) except Exception as exc: print("Cannot read:", exc) - sys.stdout.flush() env = os.environ.copy() env["OMP_NUM_THREADS"] = str(self.rqmt["cpu"]) env["MKL_NUM_THREADS"] = str(self.rqmt["cpu"]) - sp.check_call(self._get_run_cmd(), env=env) + + rdzv_addr = None + if (self.multi_node_slots or 1) > 1 and self.distributed_launch_cmd == "torchrun": + import hashlib + import psutil + + assert "SLURM_JOB_NODELIST" in os.environ, ( + "multi-node multi GPU-training w/ torchrun currently depends on SLURM environment variables " + "to determine the master node" + ) + + partaking_nodes = os.environ["SLURM_JOB_NODELIST"] + nodes = sorted( + node_name.strip() + for node_name in sp.check_output(["scontrol", "show", "hostnames", partaking_nodes]) + .decode("utf-8") + .splitlines() + ) + + cur_hostname = socket.gethostname() + assert cur_hostname in nodes + + # Declare first node as rendezvous (RDVZ) node, it is arbitrary which one + # it is, as long as the network connectivity on that node is good. + # + # For trainings on a well-known set of nodes the master node can be set + # statically, but if you're in a cluster and you don't know the nodes + # your training will be running on this needs to be determined + # dynamically after the scheduler has placed your job. + rdzv_node = nodes[0] + rdzv_host = socket.getfqdn(rdzv_node) + if rdzv_host == rdzv_node: + rdzv_host = socket.gethostbyname(rdzv_node) + + # 29400 is torchrun's default RDZV port. Also incorporate the job ID to + # reduce the chance for port conflicts. + rdzv_port = 29400 + (int(hashlib.sha256(self.job_id().encode("utf-8")).hexdigest(), 16) % 1024) + rdzv_addr = f"{rdzv_host}:{rdzv_port}" + print(f"Nodes {partaking_nodes} in job, running RDZV server on {rdzv_node} @ {rdzv_host} ({rdzv_addr}).") + + # Gloo and NCCL cannot be trusted to find suitable network interfaces on their own. + # + # torchrun docs say all nodes need to specify the exact same number of network interfaces. + # Therefore for now we specify just one. + up_ifaces = [] + net_if = psutil.net_if_addrs() + for ifname, sni_c_addrs in net_if.items(): + if any(ifname.startswith(prefix) for prefix in ["lo", "docker"]) or any( + addr.address == "127.0.0.1" for addr in sni_c_addrs + ): + # skip loopback and docker iface + continue + if any( + addr.family in [socket.AF_INET, socket.AF_INET6] and addr.broadcast is not None + for addr in sni_c_addrs + ): + up_ifaces.append(ifname) + if not up_ifaces: + raise ValueError(f"Could not find UP network interface in {net_if}.") + iface_to_use = up_ifaces[0] + if len(up_ifaces) > 1: + print(f"Found more than one UP interface: {', '.join(up_ifaces)}, using first one found.") + print(f"Using {iface_to_use} as GLOO_SOCKET_NAME and NCCL_SOCKET_IFNAME.") + env["GLOO_SOCKET_IFNAME"] = iface_to_use + env["NCCL_SOCKET_IFNAME"] = iface_to_use + + sys.stdout.flush() + sp.check_call(self._get_run_cmd(rdzv_node_addr=rdzv_addr), env=env) lrf = self.returnn_config.get("learning_rate_file", "learning_rates") self._relink(lrf, self.out_learning_rates.get_path()) From d049f01a8e852805a78e3b8e06e6de5c9d72204a Mon Sep 17 00:00:00 2001 From: Moritz Gunz Date: Mon, 4 Nov 2024 08:23:37 -0500 Subject: [PATCH 2/3] use star --- returnn/training.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/returnn/training.py b/returnn/training.py index 505697c3..377b889d 100644 --- a/returnn/training.py +++ b/returnn/training.py @@ -243,7 +243,7 @@ def __init__( self.rqmt["gpu"] *= self.horovod_num_processes // (self.multi_node_slots or 1) self.rqmt["mem"] *= self.horovod_num_processes // (self.multi_node_slots or 1) - def _get_run_cmd(self, rdzv_node_addr: Optional[str] = None): + def _get_run_cmd(self, *, rdzv_node_addr: Optional[str] = None): run_cmd = [ self.returnn_python_exe.get_path(), self.returnn_root.join_right("rnn.py").get_path(), From c6b4190a1e5631eb22fac141c789bf97eccbf3aa Mon Sep 17 00:00:00 2001 From: Moritz Gunz Date: Mon, 4 Nov 2024 08:23:47 -0500 Subject: [PATCH 3/3] clarify var name --- returnn/training.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/returnn/training.py b/returnn/training.py index 377b889d..b917a633 100644 --- a/returnn/training.py +++ b/returnn/training.py @@ -256,10 +256,10 @@ def _get_run_cmd(self, *, rdzv_node_addr: Optional[str] = None): # Instead of using the torchrun binary, directly execute the corresponding Python module # and directly use the correct Python environment. prefix = [self.returnn_python_exe.get_path(), "-mtorch.distributed.run"] - mn_slots = self.multi_node_slots or 1 - if mn_slots == 1: + multi_node_slots = self.multi_node_slots or 1 + if multi_node_slots == 1: prefix += ["--standalone"] - elif mn_slots > 1: + elif multi_node_slots > 1: rdzv_id = self.job_id().replace("/", "_") prefix += [ f'--rdzv-id="{rdzv_id}"', @@ -267,8 +267,8 @@ def _get_run_cmd(self, *, rdzv_node_addr: Optional[str] = None): f'--rdzv-endpoint={rdzv_node_addr or "$RDZV_NODE_ADDR"}', ] prefix += [ - f"--nnodes={mn_slots}", - f"--nproc-per-node={self.horovod_num_processes // mn_slots}", + f"--nnodes={multi_node_slots}", + f"--nproc-per-node={self.horovod_num_processes // multi_node_slots}", ] run_cmd = prefix + run_cmd[1:] elif self.distributed_launch_cmd == "mpirun":