Skip to content

Commit

Permalink
Merge pull request #72 from trustimaging/libnuma
Browse files Browse the repository at this point in the history
Try to prevent OOB in numa
  • Loading branch information
ccuetom authored Dec 18, 2023
2 parents 853e844 + ef6d76f commit 26b9205
Show file tree
Hide file tree
Showing 32 changed files with 402 additions and 144 deletions.
2 changes: 2 additions & 0 deletions mosaic/core/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -1174,3 +1174,5 @@ class MemoryOverflowError(Exception):


types.awaitable_types += (TaskProxy, TaskOutput, TaskDone)
types.remote_types += (Task,)
types.proxy_types += (TaskProxy,)
5 changes: 5 additions & 0 deletions mosaic/core/tessera.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from cached_property import cached_property

import mosaic
from .. import types
from .task import TaskProxy
from .base import Base, CMDBase, RemoteBase, ProxyBase, RuntimeDisconnectedError
from ..types import WarehouseObject
Expand Down Expand Up @@ -1347,3 +1348,7 @@ def local_parameter(_, *args, uid=None, **kwargs):
'or with configuration options within brackets.')

return tessera_wrapper


types.remote_types += (Tessera,)
types.proxy_types += (TesseraProxy, ArrayProxy)
69 changes: 38 additions & 31 deletions mosaic/runtime/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,45 +91,52 @@ async def init_workers(self, **kwargs):

if numa_available:
available_cpus = numa.info.numa_hardware_info()['node_cpu_info']
allowed_cpus = numa.schedule.get_allowed_cpus_num()

else:
available_cpus = {worker_index: list(range(num_threads*worker_index,
num_threads*(worker_index+1)))
for worker_index in range(self._num_workers)}
allowed_cpus = sum([len(c) for c in available_cpus.values()])

# Eliminate cores corresponding to hyperthreading
for node_index, node_cpus in available_cpus.items():
node_cpus = [each for each in node_cpus if each < num_cpus]
available_cpus[node_index] = node_cpus

node_ids = list(available_cpus.keys())
num_nodes = len(available_cpus)
num_cpus_per_node = min([len(cpus) for cpus in available_cpus.values()])

# Distribute cores across workers
total_cpus = sum([len(c) for c in available_cpus.values()])
worker_cpus = {}
worker_nodes = {}
if num_nodes >= self._num_workers:
nodes_per_worker = num_nodes // self._num_workers
for worker_index in range(self._num_workers):
node_s = worker_index*nodes_per_worker
node_e = min((worker_index+1)*nodes_per_worker, num_nodes)
worker_cpus[worker_index] = sum([available_cpus[node_index]
for node_index in node_ids[node_s:node_e]], [])
worker_nodes[worker_index] = node_ids[node_s:node_e]

else:
workers_per_node = self._num_workers // num_nodes
cpus_per_worker = num_cpus_per_node // workers_per_node
if total_cpus <= allowed_cpus:
# Eliminate cores corresponding to hyperthreading
for node_index, node_cpus in available_cpus.items():
worker_s = node_index*workers_per_node
worker_e = min((node_index+1)*workers_per_node, self._num_workers)
worker_chunk = {}
for worker_index in range(worker_s, worker_e):
cpu_s = worker_index*cpus_per_worker
cpu_e = min((worker_index+1)*cpus_per_worker, len(node_cpus))
worker_chunk[worker_index] = node_cpus[cpu_s:cpu_e]
worker_nodes[worker_index] = [node_index]
worker_cpus.update(worker_chunk)
node_cpus = [each for each in node_cpus if each < num_cpus]
available_cpus[node_index] = node_cpus

node_ids = list(available_cpus.keys())
num_nodes = len(available_cpus)
num_cpus_per_node = min([len(cpus) for cpus in available_cpus.values()])

# Distribute cores across workers
worker_cpus = {}
worker_nodes = {}
if num_nodes >= self._num_workers:
nodes_per_worker = num_nodes // self._num_workers
for worker_index in range(self._num_workers):
node_s = worker_index*nodes_per_worker
node_e = min((worker_index+1)*nodes_per_worker, num_nodes)
worker_cpus[worker_index] = sum([available_cpus[node_index]
for node_index in node_ids[node_s:node_e]], [])
worker_nodes[worker_index] = node_ids[node_s:node_e]

else:
workers_per_node = self._num_workers // num_nodes
cpus_per_worker = num_cpus_per_node // workers_per_node
for node_index, node_cpus in available_cpus.items():
worker_s = node_index*workers_per_node
worker_e = min((node_index+1)*workers_per_node, self._num_workers)
worker_chunk = {}
for worker_index in range(worker_s, worker_e):
cpu_s = worker_index*cpus_per_worker
cpu_e = min((worker_index+1)*cpus_per_worker, len(node_cpus))
worker_chunk[worker_index] = node_cpus[cpu_s:cpu_e]
worker_nodes[worker_index] = [node_index]
worker_cpus.update(worker_chunk)

# Initialise workers
for worker_index in range(self._num_workers):
Expand Down
6 changes: 3 additions & 3 deletions mosaic/runtime/warehouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,13 @@ def set_profiler(self):
global_profiler.set_remote('monitor')
super().set_profiler()

async def put(self, obj, uid=None, publish=False):
async def put(self, obj, uid=None, publish=False, **kwargs):
return await self.put_remote(self.uid, obj, uid=uid)

async def get(self, uid):
async def get(self, uid, **kwargs):
return await self.get_remote(self.uid, uid=uid)

async def drop(self, uid):
async def drop(self, uid, **kwargs):
return await self.drop_remote(self.uid, uid=uid)

async def put_remote(self, sender_id, obj, uid=None, publish=False):
Expand Down
2 changes: 2 additions & 0 deletions mosaic/types/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@
from .warehouse_object import *

awaitable_types = (WarehouseObject,)
remote_types = ()
proxy_types = ()
4 changes: 2 additions & 2 deletions mosaic/utils/change_case.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ def camel_case(name):
String in camelcase
"""
name = re.sub(r"^[\-_\.]", '', str(name))
name = re.sub(r"^[\_\.]", '', str(name))
if not name:
return name
return uppercase(name[0]) + re.sub(r"[\-_\.\s]([a-zA-Z0-9])", lambda matched: uppercase(matched.group(1)), name[1:])
return uppercase(name[0]) + re.sub(r"[\_\.\s]([a-zA-Z0-9])", lambda matched: uppercase(matched.group(1)), name[1:])


def lowercase(name):
Expand Down
28 changes: 19 additions & 9 deletions mosaic/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import asyncio
import traceback
import threading
import numpy as np

import mosaic

Expand All @@ -28,11 +27,15 @@ def sizeof(obj, seen=None):
Size in bytes.
"""
if isinstance(obj, asyncio.Future):
ignore = (asyncio.Future,) + mosaic.types.remote_types
if isinstance(obj, ignore):
return 0
if isinstance(obj, np.ndarray):
size = obj.nbytes
else:
try:
if hasattr(obj, 'nbytes') and isinstance(obj.nbytes, int):
size = obj.nbytes
else:
size = sys.getsizeof(obj)
except Exception:
size = sys.getsizeof(obj)
if seen is None:
seen = set()
Expand All @@ -49,8 +52,12 @@ def sizeof(obj, seen=None):
size += sum([sizeof(k, seen) for k in obj.keys()])
elif hasattr(obj, '__dict__'):
size += sizeof(obj.__dict__, seen)

elif hasattr(obj, '__iter__') and not isinstance(obj, (str, bytes, bytearray)):
size += sum([sizeof(i, seen) for i in obj])
try:
size += sum([sizeof(i, seen) for i in obj])
except TypeError:
pass
except RuntimeError:
pass

Expand Down Expand Up @@ -83,9 +90,12 @@ async def remote_sizeof(obj, seen=None, pending=False):
if pending:
size = 0
else:
if isinstance(obj, np.ndarray):
size = obj.nbytes
else:
try:
if hasattr(obj, 'nbytes') and isinstance(obj.nbytes, int):
size = obj.nbytes
else:
size = sys.getsizeof(obj)
except Exception:
size = sys.getsizeof(obj)
if seen is None:
seen = set()
Expand Down
20 changes: 14 additions & 6 deletions stride/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,14 @@ async def loop(worker, shot_id):
else:
raise ValueError('Unknown platform %s' % platform)

# run PDE
traces = await pde(wavelets, *published_args,
problem=sub_problem,
runtime=worker, **_kwargs).result()

logger.perf('Shot %d retrieved' % sub_problem.shot_id)

# save data
shot = problem.acquisitions.get(shot_id)
shot.observed.data[:] = traces.data

Expand Down Expand Up @@ -298,26 +300,31 @@ async def loop(worker, shot_id):

# pre-process wavelets and observed traces
wavelets = process_wavelets(wavelets,
problem=sub_problem, runtime=worker, **_kwargs)
iteration=iteration, problem=sub_problem,
runtime=worker, **_kwargs)
await wavelets.init_future
observed = process_observed(observed,
problem=sub_problem, runtime=worker, **_kwargs)
iteration=iteration, problem=sub_problem,
runtime=worker, **_kwargs)
await observed.init_future

# run PDE
modelled = pde(wavelets, *published_args,
problem=sub_problem, runtime=worker, **_kwargs)
iteration=iteration, problem=sub_problem,
runtime=worker, **_kwargs)
await modelled.init_future

# post-process modelled and observed traces
traces = process_traces(modelled, observed,
scale_to=sub_problem.shot.observed,
problem=sub_problem, runtime=worker, **_kwargs)
iteration=iteration, problem=sub_problem,
runtime=worker, **_kwargs)
await traces.init_future

# calculate loss
fun = await loss(traces.outputs[0], traces.outputs[1],
problem=sub_problem, runtime=worker, **_kwargs).result()
iteration=iteration, problem=sub_problem,
runtime=worker, **_kwargs).result()

iteration.add_fun(fun)
logger.perf('Functional value for shot %d: %s' % (shot_id, fun))
Expand All @@ -332,7 +339,8 @@ async def loop(worker, shot_id):

await loop

await optimiser.step()
await optimiser.step(iteration=iteration, problem=problem,
**kwargs)

if dump:
optimiser.variable.dump(path=problem.output_folder,
Expand Down
4 changes: 2 additions & 2 deletions stride/optimisation/loss/l2_distance.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ async def forward(self, modelled, observed, **kwargs):
async def adjoint(self, d_fun, modelled, observed, **kwargs):
grad_modelled = None
if modelled.needs_grad:
grad_modelled = +np.asarray(d_fun) * self.residual
grad_modelled = +np.asarray(d_fun) * self.residual.copy(name='modelledresidual')

grad_observed = None
if observed.needs_grad:
grad_observed = -np.asarray(d_fun) * self.residual
grad_observed = -np.asarray(d_fun) * self.residual.copy(name='observedresidual')

self.residual = None

Expand Down
26 changes: 20 additions & 6 deletions stride/optimisation/optimisers/optimiser.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@

import mosaic


__all__ = ['LocalOptimiser']
from ..pipelines import ProcessGlobalGradient, ProcessModelIteration


async def noop(arg, *args, **kwargs):
return arg
__all__ = ['LocalOptimiser']


class LocalOptimiser(ABC):
Expand All @@ -35,8 +33,10 @@ def __init__(self, variable, **kwargs):
raise ValueError('To be optimised, a variable needs to be set with "needs_grad=True"')

self.variable = variable
self._process_grad = kwargs.pop('process_grad', noop)
self._process_model = kwargs.pop('process_model', noop)
self.dump_grad = kwargs.pop('dump_grad', False)
self.dump_prec = kwargs.pop('dump_prec', False)
self._process_grad = kwargs.pop('process_grad', ProcessGlobalGradient(**kwargs))
self._process_model = kwargs.pop('process_model', ProcessModelIteration(**kwargs))

def clear_grad(self):
"""
Expand Down Expand Up @@ -93,6 +93,20 @@ async def pre_process(self, grad=None, processed_grad=None, **kwargs):
if hasattr(self.variable, 'is_proxy') and self.variable.is_proxy:
await self.variable.pull(attr='grad')

problem = kwargs.pop('problem', None)
iteration = kwargs.pop('iteration', None)
dump_grad = kwargs.pop('dump_grad', self.dump_grad)
dump_prec = kwargs.pop('dump_prec', self.dump_prec)
if dump_grad and problem is not None:
self.variable.grad.dump(path=problem.output_folder,
project_name=problem.name,
version=iteration.abs_id+1)

if dump_prec and self.variable.grad.prec is not None and problem is not None:
self.variable.grad.prec.dump(path=problem.output_folder,
project_name=problem.name,
version=iteration.abs_id+1)

grad = self.variable.process_grad(**kwargs)

min_dir = np.min(grad.data)
Expand Down
7 changes: 7 additions & 0 deletions stride/optimisation/pipelines/default_pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ def __init__(self, steps=None, no_grad=False, **kwargs):
if kwargs.pop('filter_traces', True):
steps.append('filter_traces')

if kwargs.pop('resonance_filter', False):
steps.append(('resonance_filter', False))

super().__init__(steps, no_grad=no_grad, **kwargs)


Expand All @@ -46,6 +49,7 @@ class ProcessObserved(ProcessWavelets):

def __init__(self, steps=None, no_grad=False, **kwargs):
steps = steps or []
kwargs['resonance_filter'] = False
super().__init__(steps, no_grad=no_grad, **kwargs)


Expand Down Expand Up @@ -77,6 +81,9 @@ def __init__(self, steps=None, no_grad=False, **kwargs):
if kwargs.pop('filter_traces', True):
steps.append('filter_traces')

if kwargs.pop('time_weighting', True):
steps.append(('time_weighting', False))

if kwargs.pop('agc', False):
steps.append(('agc', False))

Expand Down
Loading

0 comments on commit 26b9205

Please sign in to comment.