Skip to content

Commit

Permalink
Merge pull request #61 from trustimaging/reuse-head-node
Browse files Browse the repository at this point in the history
Add support for worker creation in head node
  • Loading branch information
ccuetom authored Oct 26, 2023
2 parents 5b9c386 + 6400825 commit f2dd959
Show file tree
Hide file tree
Showing 17 changed files with 194 additions and 132 deletions.
5 changes: 4 additions & 1 deletion mosaic/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def init(runtime_type='head', runtime_indices=(),
parent_id=None, parent_address=None, parent_port=None,
monitor_address=None, monitor_port=None,
num_workers=1, num_threads=None,
mode='local', monitor_strategy='round-robin',
mode='local', reuse_head=False, monitor_strategy='round-robin',
log_level='perf', profile=False, node_list=None,
asyncio_loop=None, wait=False,
**kwargs):
Expand Down Expand Up @@ -64,6 +64,8 @@ def init(runtime_type='head', runtime_indices=(),
available cores over ``num_workers``.
mode : str, optional
Mode of the runtime, defaults to ``local``.
reuse_head : bool, optional
Whether to set up workers in the head node, defaults to False.
monitor_strategy : str, optional
Strategy used by the monitor to allocate tessera, defaults to round robin.
log_level : str, optional
Expand Down Expand Up @@ -93,6 +95,7 @@ def init(runtime_type='head', runtime_indices=(),
runtime_config = {
'runtime_indices': runtime_indices,
'mode': mode,
'reuse_head': reuse_head,
'monitor_strategy': monitor_strategy,
'num_workers': num_workers,
'num_threads': num_threads,
Expand Down
5 changes: 3 additions & 2 deletions mosaic/cli/clusters/pbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
__all__ = ['node_list', 'submission_script']


def node_list(host_name):
def node_list(host_name, reuse_head):
"""
Attempt to find a node list for PBS clusters.
Parameters
----------
host_name
reuse_head
Returns
-------
Expand All @@ -29,7 +30,7 @@ def node_list(host_name):
for line in lines:
line = line.strip().split(' ')

if line[0] != host_name:
if line[0] != host_name or reuse_head:
pbs_list.append(line[0])

return pbs_list
Expand Down
5 changes: 3 additions & 2 deletions mosaic/cli/clusters/sge.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
__all__ = ['node_list', 'submission_script']


def node_list(host_name):
def node_list(host_name, reuse_head):
"""
Attempt to find a node list for SGE clusters.
Parameters
----------
host_name
reuse_head
Returns
-------
Expand All @@ -29,7 +30,7 @@ def node_list(host_name):
for line in lines:
line = line.strip().split(' ')

if line[0] != host_name:
if line[0] != host_name or reuse_head:
sge_list.append(line[0])

return sge_list
Expand Down
12 changes: 7 additions & 5 deletions mosaic/cli/clusters/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@
__all__ = ['node_list', 'submission_script']


def node_list(host_name):
def node_list(host_name, reuse_head):
"""
Attempt to find a node list for SLURM clusters.
Parameters
----------
host_name
reuse_head
Returns
-------
Expand All @@ -25,10 +26,11 @@ def node_list(host_name):
return

slurm_list = expand_hostlist(slurm_nodes)
try:
slurm_list.remove(host_name)
except ValueError:
slurm_list.remove(host_name.split('.')[0])
if not reuse_head:
try:
slurm_list.remove(host_name)
except ValueError:
slurm_list.remove(host_name.split('.')[0])

return slurm_list

Expand Down
10 changes: 7 additions & 3 deletions mosaic/cli/mrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
# cluster options
@click.option('--local/--cluster', '-l/-c', default=False, required=True, show_default=True,
help='whether to run mosaic locally or in a cluster system')
@click.option('--reuse-head/--free-head', '-rh/-fh', default=False, required=True, show_default=True,
help='whether to create workers in the head node')
# log level
@click.option('--perf', 'log_level', flag_value='perf', default='perf', show_default=True,
help='set log level to PERF')
Expand All @@ -60,6 +62,7 @@ def go(cmd=None, **kwargs):
runtime_type = kwargs.get('runtime_type', None)
runtime_indices = kwargs.get('indices', None)
local = kwargs.get('local', False)
reuse_head = kwargs.get('reuse_head', False)

if runtime_indices is not None:
runtime_indices = tuple(runtime_indices.split(':'))
Expand All @@ -82,9 +85,9 @@ def go(cmd=None, **kwargs):

host_name = get_hostname()

sge_nodes = clusters.sge.node_list(host_name)
pbs_nodes = clusters.pbs.node_list(host_name)
slurm_nodes = clusters.slurm.node_list(host_name)
sge_nodes = clusters.sge.node_list(host_name, reuse_head)
pbs_nodes = clusters.pbs.node_list(host_name, reuse_head)
slurm_nodes = clusters.slurm.node_list(host_name, reuse_head)

if sge_nodes is not None:
node_list = sge_nodes
Expand All @@ -111,6 +114,7 @@ def go(cmd=None, **kwargs):
'num_workers': num_workers,
'num_threads': num_threads,
'mode': 'local' if local is True else 'cluster',
'reuse_head': reuse_head,
'log_level': log_level,
'profile': profile,
'node_list': node_list,
Expand Down
10 changes: 0 additions & 10 deletions mosaic/runtime/head.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@

import psutil
import asyncio

import mosaic
from .runtime import Runtime, RuntimeProxy
from ..utils import LoggerManager
from ..utils import subprocess
from ..profile import profiler, global_profiler
from ..utils.utils import cpu_count


__all__ = ['Head']
Expand Down Expand Up @@ -36,14 +34,6 @@ async def init(self, **kwargs):
-------
"""
if self.mode == 'cluster':
num_cpus = cpu_count()

monitor_cpus = max(1, min(int(num_cpus // 8), 8))
warehouse_cpus = max(1, min(int(num_cpus // 8), 8))
available_cpus = list(range(num_cpus))
psutil.Process().cpu_affinity(available_cpus[:-(monitor_cpus+warehouse_cpus)])

await super().init(**kwargs)

# Start monitor if necessary and handshake in reverse
Expand Down
10 changes: 2 additions & 8 deletions mosaic/runtime/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,6 @@ async def init(self, **kwargs):
-------
"""
if self.mode == 'cluster':
num_cpus = cpu_count()

monitor_cpus = max(1, min(int(num_cpus // 8), 8))
available_cpus = list(range(num_cpus))
psutil.Process().cpu_affinity(available_cpus[-monitor_cpus:])

await super().init(**kwargs)

# Start local cluster
Expand Down Expand Up @@ -195,6 +188,7 @@ async def init_cluster(self, **kwargs):
ssh_flags = os.environ.get('SSH_FLAGS', '')
ssh_commands = os.environ.get('SSH_COMMANDS', None)
ssh_commands = ssh_commands + ';' if ssh_commands else ''
reuse_head = '--reuse-head' if self.reuse_head else '--free-head'

in_slurm = os.environ.get('SLURM_NODELIST', None) is not None

Expand All @@ -207,7 +201,7 @@ async def init_cluster(self, **kwargs):
f'mrun --node -i {node_index} '
f'--monitor-address {runtime_address} --monitor-port {runtime_port} '
f'-n {num_nodes} -nw {num_workers} -nth {num_threads} '
f'--cluster --{log_level}')
f'--cluster --{log_level} {reuse_head}')

if in_slurm:
cpu_mask = _cpu_mask(1, 1, num_cpus)
Expand Down
111 changes: 54 additions & 57 deletions mosaic/runtime/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,6 @@ async def init(self, **kwargs):
-------
"""
if self.mode == 'cluster':
num_cpus = cpu_count()

# Node process is pinned to first CPU
if num_cpus > 1:
psutil.Process().cpu_affinity([num_cpus-1])

await super().init(**kwargs)

# Start local workers
Expand Down Expand Up @@ -82,57 +75,60 @@ async def init_workers(self, **kwargs):
num_threads = num_threads or num_cpus // num_workers
self._num_threads = num_threads

if num_workers*num_threads > num_cpus:
raise ValueError('Requested number of CPUs per node (%d - num_workers*num_threads) '
'is greater than the number of available CPUs (%d)' % (num_workers*num_threads, num_cpus))

# Find all available NUMA nodes and CPUs per node
try:
import numa
numa_available = numa.info.numa_available()
except Exception:
numa_available = False

if numa_available:
available_cpus = numa.info.numa_hardware_info()['node_cpu_info']
else:
available_cpus = {worker_index: list(range(num_threads*worker_index,
num_threads*(worker_index+1)))
for worker_index in range(self._num_workers)}

# Eliminate cores corresponding to hyperthreading
for node_index, node_cpus in available_cpus.items():
node_cpus = [each for each in node_cpus if each < num_cpus]
available_cpus[node_index] = node_cpus

node_ids = list(available_cpus.keys())
num_nodes = len(available_cpus)
num_cpus_per_node = min([len(cpus) for cpus in available_cpus.values()])

# Distribute cores across workers
worker_cpus = {}
if self.mode == 'cluster':
if num_workers*num_threads > num_cpus:
raise ValueError('Requested number of CPUs per node (%d - num_workers*num_threads) '
'is greater than the number of available CPUs (%d)' % (num_workers*num_threads, num_cpus))

# Find all available NUMA nodes and CPUs per node
try:
import numa
numa_available = numa.info.numa_available()
except Exception:
numa_available = False

if numa_available:
available_cpus = numa.info.numa_hardware_info()['node_cpu_info']
else:
available_cpus = {worker_index: list(range(num_threads*worker_index,
num_threads*(worker_index+1)))
for worker_index in range(self._num_workers)}

# Eliminate cores corresponding to hyperthreading
for node_index, node_cpus in available_cpus.items():
node_cpus = [each for each in node_cpus if each < num_cpus]
available_cpus[node_index] = node_cpus

node_ids = list(available_cpus.keys())
num_nodes = len(available_cpus)
num_cpus_per_node = min([len(cpus) for cpus in available_cpus.values()])

# Distribute cores across workers
if num_nodes >= self._num_workers:
nodes_per_worker = num_nodes // self._num_workers
for worker_index in range(self._num_workers):
node_s = worker_index*nodes_per_worker
node_e = min((worker_index+1)*nodes_per_worker, num_nodes)
worker_cpus[worker_index] = sum([available_cpus[node_index]
for node_index in node_ids[node_s:node_e]], [])

else:
workers_per_node = self._num_workers // num_nodes
cpus_per_worker = num_cpus_per_node // workers_per_node
for node_index, node_cpus in available_cpus.items():
worker_s = node_index*workers_per_node
worker_e = min((node_index+1)*workers_per_node, self._num_workers)
worker_chunk = {}
for worker_index in range(worker_s, worker_e):
cpu_s = worker_index*cpus_per_worker
cpu_e = min((worker_index+1)*cpus_per_worker, len(node_cpus))
worker_chunk[worker_index] = node_cpus[cpu_s:cpu_e]
worker_cpus.update(worker_chunk)
worker_nodes = {}
if num_nodes >= self._num_workers:
nodes_per_worker = num_nodes // self._num_workers
for worker_index in range(self._num_workers):
node_s = worker_index*nodes_per_worker
node_e = min((worker_index+1)*nodes_per_worker, num_nodes)
worker_cpus[worker_index] = sum([available_cpus[node_index]
for node_index in node_ids[node_s:node_e]], [])
worker_nodes[worker_index] = node_ids[node_s:node_e]

else:
workers_per_node = self._num_workers // num_nodes
cpus_per_worker = num_cpus_per_node // workers_per_node
for node_index, node_cpus in available_cpus.items():
worker_s = node_index*workers_per_node
worker_e = min((node_index+1)*workers_per_node, self._num_workers)
worker_chunk = {}
for worker_index in range(worker_s, worker_e):
cpu_s = worker_index*cpus_per_worker
cpu_e = min((worker_index+1)*cpus_per_worker, len(node_cpus))
worker_chunk[worker_index] = node_cpus[cpu_s:cpu_e]
worker_nodes[worker_index] = [node_index]
worker_cpus.update(worker_chunk)

# Initialise workers
for worker_index in range(self._num_workers):
indices = self.indices + (worker_index,)

Expand All @@ -147,7 +143,8 @@ def start_worker(*args, **extra_kwargs):
worker_proxy = RuntimeProxy(name='worker', indices=indices)
worker_subprocess = subprocess(start_worker)(name=worker_proxy.uid,
daemon=False,
cpu_affinity=worker_cpus.get(worker_index, None))
cpu_affinity=worker_cpus.get(worker_index, None),
mem_affinity=worker_nodes.get(worker_index, None))
worker_subprocess.start_process()
worker_proxy.subprocess = worker_subprocess

Expand Down
1 change: 1 addition & 0 deletions mosaic/runtime/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ def __init__(self, **kwargs):
super().__init__(name=self.__class__.__name__.lower(), indices=runtime_indices)

self.mode = kwargs.get('mode', 'local')
self.reuse_head = kwargs.get('reuse_head', False)

self._comms = None
self._head = None
Expand Down
9 changes: 0 additions & 9 deletions mosaic/runtime/warehouse.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@

import copy
import psutil
import asyncio

from .runtime import Runtime
from .utils import WarehouseObject
from ..utils import LoggerManager
from ..profile import global_profiler
from ..utils.utils import cpu_count


__all__ = ['Warehouse']
Expand All @@ -33,13 +31,6 @@ def __init__(self, **kwargs):
super().__init__(**kwargs)

async def init(self, **kwargs):
if self.mode == 'cluster':
num_cpus = cpu_count()

warehouse_cpus = max(1, min(int(num_cpus // 8), 8))
available_cpus = list(range(num_cpus))
psutil.Process().cpu_affinity(available_cpus[-2*warehouse_cpus:-warehouse_cpus])

await super().init(**kwargs)

def set_logger(self):
Expand Down
Loading

0 comments on commit f2dd959

Please sign in to comment.