diff --git a/devito/core/__init__.py b/devito/core/__init__.py index c8097df67b..342438d644 100644 --- a/devito/core/__init__.py +++ b/devito/core/__init__.py @@ -4,6 +4,8 @@ backend as well) are used to run Devito on standard CPU architectures. """ +from devito.dle import (BasicRewriter, AdvancedRewriter, AdvancedRewriterSafeMath, + SpeculativeRewriter, init_dle) from devito.parameters import Parameters, add_sub_configuration core_configuration = Parameters('core') @@ -15,6 +17,13 @@ add_sub_configuration(core_configuration, env_vars_mapper) +# Initialize the DLE +modes = {'basic': BasicRewriter, + 'advanced': AdvancedRewriter, + 'advanced-safemath': AdvancedRewriterSafeMath, + 'speculative': SpeculativeRewriter} +init_dle(modes) + # The following used by backends.backendSelector from devito.function import (Constant, Function, TimeFunction, SparseFunction, # noqa SparseTimeFunction) diff --git a/devito/dle/__init__.py b/devito/dle/__init__.py index bf798e9329..2018d73a2c 100644 --- a/devito/dle/__init__.py +++ b/devito/dle/__init__.py @@ -1,2 +1,3 @@ from devito.dle.blocking_utils import * # noqa from devito.dle.transformer import * # noqa +from devito.dle.backends import * # noqa diff --git a/devito/dle/backends/__init__.py b/devito/dle/backends/__init__.py index 6e6bf9fcbd..3c93c16b5f 100644 --- a/devito/dle/backends/__init__.py +++ b/devito/dle/backends/__init__.py @@ -1,5 +1,6 @@ from devito.dle.backends.common import * # noqa from devito.dle.backends.utils import * # noqa from devito.dle.backends.basic import BasicRewriter # noqa -from devito.dle.backends.advanced import (DevitoRewriter, DevitoSpeculativeRewriter, # noqa - DevitoRewriterSafeMath, DevitoCustomRewriter) # noqa +from devito.dle.backends.parallelizer import Ompizer # noqa +from devito.dle.backends.advanced import (AdvancedRewriter, SpeculativeRewriter, # noqa + AdvancedRewriterSafeMath, CustomRewriter) # noqa diff --git a/devito/dle/backends/advanced.py b/devito/dle/backends/advanced.py index 87a5cef973..344cfcbe3b 100644 --- a/devito/dle/backends/advanced.py +++ b/devito/dle/backends/advanced.py @@ -7,30 +7,30 @@ import cgen import numpy as np -import psutil from devito.cgen_utils import ccode from devito.dimension import Dimension from devito.dle import fold_blockable_tree, unfold_blocked_tree -from devito.dle.backends import (BasicRewriter, BlockingArg, dle_pass, omplang, +from devito.dle.backends import (BasicRewriter, BlockingArg, Ompizer, dle_pass, simdinfo, get_simd_flag, get_simd_items) from devito.exceptions import DLEException -from devito.ir.iet import (Block, Expression, Iteration, List, - PARALLEL, ELEMENTAL, REMAINDER, tagger, - FindNodes, FindSymbols, IsPerfectIteration, Transformer, - compose_nodes, retrieve_iteration_tree, filter_iterations) +from devito.ir.iet import (Iteration, List, PARALLEL, ELEMENTAL, REMAINDER, tagger, + FindSymbols, IsPerfectIteration, Transformer, compose_nodes, + retrieve_iteration_tree) from devito.logger import dle_warning from devito.tools import as_tuple -class DevitoRewriter(BasicRewriter): +class AdvancedRewriter(BasicRewriter): + + _parallelizer = Ompizer def _pipeline(self, state): self._avoid_denormals(state) self._loop_blocking(state) self._simdize(state) if self.params['openmp'] is True: - self._ompize(state) + self._parallelize(state) self._create_elemental_functions(state) self._minimize_remainders(state) @@ -196,11 +196,11 @@ def _simdize(self, nodes, state): except KeyError: aligned = [] if aligned: - simd = omplang['simd-for-aligned'] + simd = Ompizer.lang['simd-for-aligned'] simd = as_tuple(simd(','.join([j.name for j in aligned]), simdinfo[get_simd_flag()])) else: - simd = as_tuple(omplang['simd-for']) + simd = as_tuple(Ompizer.lang['simd-for']) mapper[i] = i._rebuild(pragmas=i.pragmas + ignore_deps + simd) processed = Transformer(mapper).visit(nodes) @@ -208,70 +208,13 @@ def _simdize(self, nodes, state): return processed, {} @dle_pass - def _ompize(self, nodes, state): + def _parallelize(self, iet, state): """ Add OpenMP pragmas to the Iteration/Expression tree to emit parallel code """ - # Group by outer loop so that we can embed within the same parallel region - was_tagged = False - groups = OrderedDict() - for tree in retrieve_iteration_tree(nodes): - # Determine the number of consecutive parallelizable Iterations - key = lambda i: i.is_ParallelRelaxed and\ - not (i.is_Elementizable or i.is_Vectorizable) - candidates = filter_iterations(tree, key=key, stop='asap') - if not candidates: - was_tagged = False - continue - # Consecutive tagged Iteration go in the same group - is_tagged = any(i.tag is not None for i in tree) - key = len(groups) - (is_tagged & was_tagged) - handle = groups.setdefault(key, OrderedDict()) - handle[candidates[0]] = candidates - was_tagged = is_tagged - - # Handle parallelizable loops - mapper = OrderedDict() - for group in groups.values(): - private = [] - for root, tree in group.items(): - # Heuristic: if at least two parallel loops are available and the - # physical core count is greater than self.thresholds['collapse'], - # then omp-collapse the loops - nparallel = len(tree) - if psutil.cpu_count(logical=False) < self.thresholds['collapse'] or\ - nparallel < 2: - parallel = omplang['for'] - else: - parallel = omplang['collapse'](nparallel) - - # Introduce the `omp parallel` pragma - if root.is_ParallelAtomic: - # Introduce the `omp atomic` pragmas - exprs = FindNodes(Expression).visit(root) - subs = {i: List(header=omplang['atomic'], body=i) - for i in exprs if i.is_increment} - handle = Transformer(subs).visit(root) - mapper[root] = handle._rebuild(pragmas=root.pragmas + (parallel,)) - else: - mapper[root] = root._rebuild(pragmas=root.pragmas + (parallel,)) - - # Track the thread-private and thread-shared variables - private.extend([i for i in FindSymbols('symbolics').visit(root) - if i.is_Array and i._mem_stack]) - - # Build the parallel region - private = sorted(set([i.name for i in private])) - private = ('private(%s)' % ','.join(private)) if private else '' - rebuilt = [v for k, v in mapper.items() if k in group] - par_region = Block(header=omplang['par-region'](private), body=rebuilt) - for k, v in list(mapper.items()): - if isinstance(v, Iteration): - mapper[k] = None if v.is_Remainder else par_region - - processed = Transformer(mapper).visit(nodes) - - return processed, {} + def key(i): + return i.is_ParallelRelaxed and not (i.is_Elementizable or i.is_Vectorizable) + return self._parallelizer(key).make_omp_parallel_iet(iet), {} @dle_pass def _minimize_remainders(self, nodes, state): @@ -343,10 +286,10 @@ def _minimize_remainders(self, nodes, state): return processed, {} -class DevitoRewriterSafeMath(DevitoRewriter): +class AdvancedRewriterSafeMath(AdvancedRewriter): """ - This Rewriter is slightly less aggressive than :class:`DevitoRewriter`, as it + This Rewriter is slightly less aggressive than :class:`AdvancedRewriter`, as it doesn't drop denormal numbers, which may sometimes harm the numerical precision. """ @@ -354,12 +297,12 @@ def _pipeline(self, state): self._loop_blocking(state) self._simdize(state) if self.params['openmp'] is True: - self._ompize(state) + self._parallelize(state) self._create_elemental_functions(state) self._minimize_remainders(state) -class DevitoSpeculativeRewriter(DevitoRewriter): +class SpeculativeRewriter(AdvancedRewriter): def _pipeline(self, state): self._avoid_denormals(state) @@ -367,7 +310,7 @@ def _pipeline(self, state): self._simdize(state) self._nontemporal_stores(state) if self.params['openmp'] is True: - self._ompize(state) + self._parallelize(state) self._create_elemental_functions(state) self._minimize_remainders(state) @@ -400,14 +343,14 @@ def _nontemporal_stores(self, nodes, state): return processed, {'flags': 'ntstores'} -class DevitoCustomRewriter(DevitoSpeculativeRewriter): +class CustomRewriter(SpeculativeRewriter): passes_mapper = { - 'denormals': DevitoSpeculativeRewriter._avoid_denormals, - 'blocking': DevitoSpeculativeRewriter._loop_blocking, - 'openmp': DevitoSpeculativeRewriter._ompize, - 'simd': DevitoSpeculativeRewriter._simdize, - 'split': DevitoSpeculativeRewriter._create_elemental_functions + 'denormals': SpeculativeRewriter._avoid_denormals, + 'blocking': SpeculativeRewriter._loop_blocking, + 'openmp': SpeculativeRewriter._parallelize, + 'simd': SpeculativeRewriter._simdize, + 'split': SpeculativeRewriter._create_elemental_functions } def __init__(self, nodes, passes, params): @@ -415,11 +358,11 @@ def __init__(self, nodes, passes, params): passes = passes.split(',') except AttributeError: # Already in tuple format - if not all(i in DevitoCustomRewriter.passes_mapper for i in passes): + if not all(i in CustomRewriter.passes_mapper for i in passes): raise DLEException self.passes = passes - super(DevitoCustomRewriter, self).__init__(nodes, params) + super(CustomRewriter, self).__init__(nodes, params) def _pipeline(self, state): for i in self.passes: - DevitoCustomRewriter.passes_mapper[i](self, state) + CustomRewriter.passes_mapper[i](self, state) diff --git a/devito/dle/backends/common.py b/devito/dle/backends/common.py index 3b5f6dd914..6360b48da9 100644 --- a/devito/dle/backends/common.py +++ b/devito/dle/backends/common.py @@ -92,13 +92,6 @@ class AbstractRewriter(object): __metaclass__ = abc.ABCMeta - """ - Bag of thresholds, to be used to trigger or prevent certain transformations. - """ - thresholds = { - 'collapse': 32, # Available physical cores - } - def __init__(self, nodes, params): self.nodes = nodes self.params = params diff --git a/devito/dle/backends/parallelizer.py b/devito/dle/backends/parallelizer.py new file mode 100644 index 0000000000..c5103f0164 --- /dev/null +++ b/devito/dle/backends/parallelizer.py @@ -0,0 +1,104 @@ +from collections import OrderedDict + +import cgen as c +import psutil + +from devito.ir.iet import (FindSymbols, FindNodes, Transformer, Block, Expression, + List, Iteration, retrieve_iteration_tree, filter_iterations) + + +class Ompizer(object): + + COLLAPSE = 32 + """Use a collapse clause if the number of available physical cores is + greater than this threshold.""" + + lang = { + 'for': c.Pragma('omp for schedule(static)'), + 'collapse': lambda i: c.Pragma('omp for collapse(%d) schedule(static)' % i), + 'par-region': lambda i: c.Pragma('omp parallel %s' % i), + 'par-for': c.Pragma('omp parallel for schedule(static)'), + 'simd-for': c.Pragma('omp simd'), + 'simd-for-aligned': lambda i, j: c.Pragma('omp simd aligned(%s:%d)' % (i, j)), + 'atomic': c.Pragma('omp atomic update') + } + """ + Shortcuts for the OpenMP language. + """ + + def __init__(self, key): + """ + :param key: A function returning True if ``v`` can be parallelized, + False otherwise. + """ + self.key = key + + def _make_omp_parallel_tree(self, root, candidates): + """ + Return a mapper to parallelize the :class:`Iteration`s within /root/. + """ + mapper = OrderedDict() + + # Heuristic: if at least two parallel loops are available and the + # physical core count is greater than COLLAPSE, then omp-collapse them + nparallel = len(candidates) + if psutil.cpu_count(logical=False) < Ompizer.COLLAPSE or\ + nparallel < 2: + parallel = self.lang['for'] + else: + parallel = self.lang['collapse'](nparallel) + + # Introduce the `omp parallel` pragma + if root.is_ParallelAtomic: + # Introduce the `omp atomic` pragmas + exprs = FindNodes(Expression).visit(root) + subs = {i: List(header=self.lang['atomic'], body=i) + for i in exprs if i.is_increment} + handle = Transformer(subs).visit(root) + mapper[root] = handle._rebuild(pragmas=root.pragmas + (parallel,)) + else: + mapper[root] = root._rebuild(pragmas=root.pragmas + (parallel,)) + + return mapper + + def make_omp_parallel_iet(self, iet): + """ + Transform ``iet`` by decorating its parallel :class:`Iteration`s with + suitable ``#pragma omp ...`` triggering thread-level parallelism. + """ + # Group sequences of loops that should go within the same parallel region + was_tagged = False + groups = OrderedDict() + for tree in retrieve_iteration_tree(iet): + # Determine the number of consecutive parallelizable Iterations + candidates = filter_iterations(tree, key=self.key, stop='asap') + if not candidates: + was_tagged = False + continue + # Consecutive tagged Iteration go in the same group + is_tagged = any(i.tag is not None for i in tree) + key = len(groups) - (is_tagged & was_tagged) + handle = groups.setdefault(key, OrderedDict()) + handle[candidates[0]] = candidates + was_tagged = is_tagged + + mapper = OrderedDict() + for group in groups.values(): + private = [] + for root, candidates in group.items(): + mapper.update(self._make_omp_parallel_tree(root, candidates)) + + # Track the thread-private and thread-shared variables + private.extend([i for i in FindSymbols('symbolics').visit(root) + if i.is_Array and i._mem_stack]) + + # Build the parallel region + private = sorted(set([i.name for i in private])) + private = ('private(%s)' % ','.join(private)) if private else '' + rebuilt = [v for k, v in mapper.items() if k in group] + par_region = Block(header=self.lang['par-region'](private), body=rebuilt) + for k, v in list(mapper.items()): + if isinstance(v, Iteration): + mapper[k] = None if v.is_Remainder else par_region + + return Transformer(mapper).visit(iet) diff --git a/devito/dle/backends/utils.py b/devito/dle/backends/utils.py index 16a13cec3e..a1f34f4312 100644 --- a/devito/dle/backends/utils.py +++ b/devito/dle/backends/utils.py @@ -3,19 +3,6 @@ import cgen as c -""" -A dictionary to quickly access standard OpenMP pragmas -""" -omplang = { - 'for': c.Pragma('omp for schedule(static)'), - 'collapse': lambda i: c.Pragma('omp for collapse(%d) schedule(static)' % i), - 'par-region': lambda i: c.Pragma('omp parallel %s' % i), - 'par-for': c.Pragma('omp parallel for schedule(static)'), - 'simd-for': c.Pragma('omp simd'), - 'simd-for-aligned': lambda i, j: c.Pragma('omp simd aligned(%s:%d)' % (i, j)), - 'atomic': c.Pragma('omp atomic update') -} - """ Compiler-specific language """ diff --git a/devito/dle/transformer.py b/devito/dle/transformer.py index a3d4cbc0a4..4d6b2e3876 100644 --- a/devito/dle/transformer.py +++ b/devito/dle/transformer.py @@ -1,35 +1,41 @@ from devito.ir.iet import Node -from devito.dle.backends import (State, BasicRewriter, DevitoCustomRewriter, - DevitoRewriter, DevitoRewriterSafeMath, - DevitoSpeculativeRewriter) +from devito.dle.backends import State, CustomRewriter from devito.exceptions import DLEException from devito.logger import dle_warning from devito.parameters import configuration -__all__ = ['transform', 'modes', 'default_options'] +__all__ = ['init_dle', 'transform'] -modes = { - 'basic': BasicRewriter, - 'advanced': DevitoRewriter, - 'advanced-safemath': DevitoRewriterSafeMath, - 'speculative': DevitoSpeculativeRewriter +default_modes = { + 'basic': None, + 'advanced': None, + 'advanced-safemath': None, + 'speculative': None } -"""The DLE transformation modes.""" +"""The DLE transformation modes. +This dictionary may be modified at backend-initialization time.""" default_options = { 'blockinner': False, 'blockshape': None, 'blockalways': False } -"""Default values for the various optimization options.""" +"""Default values for the supported optimization options. +This dictionary may be modified at backend-initialization time.""" -configuration.add('dle', 'advanced', list(modes)) +configuration.add('dle', 'advanced', list(default_modes)) configuration.add('dle_options', ';'.join('%s:%s' % (k, v) for k, v in default_options.items()), list(default_options)) +def init_dle(backend_modes): + global default_modes + for i in list(default_modes): + default_modes[i] = backend_modes[i] + + def transform(node, mode='basic', options=None): """ Transform Iteration/Expression trees to generate highly optimized C code. @@ -82,13 +88,13 @@ def transform(node, mode='basic', options=None): # Process the Iteration/Expression tree through the DLE if mode is None or mode == 'noop': return State(node) - elif mode not in modes: + elif mode not in default_modes: try: - rewriter = DevitoCustomRewriter(node, mode, params) + rewriter = CustomRewriter(node, mode, params) return rewriter.run() except DLEException: dle_warning("Unknown transformer mode(s) %s" % mode) return State(node) else: - rewriter = modes[mode](node, params) + rewriter = default_modes[mode](node, params) return rewriter.run() diff --git a/devito/ir/support/utils.py b/devito/ir/support/utils.py index a1726c6b98..3dead3f007 100644 --- a/devito/ir/support/utils.py +++ b/devito/ir/support/utils.py @@ -209,12 +209,15 @@ def group_expressions(exprs): # Partion based on data dependences mapper = OrderedDict() ngroups = 0 - for i, e1 in enumerate(exprs): + for e1 in exprs: if e1 in mapper: + # Optimization: we know already that a group for `e1` has been found continue found = False - for e2 in exprs[i+1:]: - if Scope([e1, e2]).has_dep: + for e2 in exprs: + if e1 is e2: + continue + elif Scope([e1, e2]).has_dep: v = mapper.get(e1, mapper.get(e2)) if v is None: ngroups += 1 diff --git a/devito/yask/__init__.py b/devito/yask/__init__.py index bb18265475..2f72c0b570 100644 --- a/devito/yask/__init__.py +++ b/devito/yask/__init__.py @@ -6,12 +6,14 @@ from collections import OrderedDict import os -from devito import configuration +from devito.dle import BasicRewriter, init_dle from devito.exceptions import InvalidOperator from devito.logger import yask as log -from devito.parameters import Parameters, add_sub_configuration +from devito.parameters import Parameters, configuration, add_sub_configuration from devito.tools import ctypes_pointer, infer_cpu +from devito.yask.dle import YaskRewriter + def exit(emsg): """ @@ -107,6 +109,12 @@ def switch_cpu(develop_mode): add_sub_configuration(yask_configuration, env_vars_mapper) +# Initialize the DLE +modes = {'basic': BasicRewriter, + 'advanced': YaskRewriter, + 'advanced-safemath': YaskRewriter, + 'speculative': YaskRewriter} +init_dle(modes) # The following used by backends.backendSelector from devito.yask.function import Constant, Function, TimeFunction # noqa diff --git a/devito/yask/dle.py b/devito/yask/dle.py new file mode 100644 index 0000000000..79a945cb2d --- /dev/null +++ b/devito/yask/dle.py @@ -0,0 +1,28 @@ +from devito.dle.backends import AdvancedRewriter, Ompizer, dle_pass + +__all__ = ['YaskRewriter'] + + +class YaskOmpizer(Ompizer): + # TODO: will need to specialize `_make_omp_parallel_tree` as soon as the + # necessary APIs will be ready in YASK (e.g., atomic incs required) + + def key(self, v): + return v.is_Parallel and not (v.is_Elementizable or v.is_Vectorizable) + + +class YaskRewriter(AdvancedRewriter): + + _parallelizer = YaskOmpizer + + def _pipeline(self, state): + self._avoid_denormals(state) + if self.params['openmp'] is True: + self._parallelize(state) + + @dle_pass + def _parallelize(self, iet, state): + def key(i): + # TODO: ParallelRelaxed not supported yet (see TODO above) + return i.is_Parallel and not (i.is_Elementizable or i.is_Vectorizable) + return self._parallelizer(key).make_omp_parallel_iet(iet), {} diff --git a/devito/yask/operator.py b/devito/yask/operator.py index 9c8f1b3f44..2797ddeb87 100644 --- a/devito/yask/operator.py +++ b/devito/yask/operator.py @@ -35,7 +35,6 @@ class Operator(OperatorRunnable): _default_includes = OperatorRunnable._default_includes + ['yask_kernel_api.hpp'] def __init__(self, expressions, **kwargs): - kwargs['dle'] = ('denormals',) + (('openmp',) if configuration['openmp'] else ()) super(Operator, self).__init__(expressions, **kwargs) # Each YASK Operator needs to have its own compiler (hence the copy() # below) because Operator-specific shared object will be added to the diff --git a/tests/test_autotuner.py b/tests/test_autotuner.py index 32a96f01c7..20e1248720 100644 --- a/tests/test_autotuner.py +++ b/tests/test_autotuner.py @@ -15,7 +15,6 @@ from devito import Grid, Function, TimeFunction, Eq, Operator, configuration, silencio from devito.logger import logger, logging -from devito.core.autotuning import options @silencio(log_level='DEBUG') @@ -66,9 +65,10 @@ def test_at_is_actually_working(shape, expected): def test_timesteps_per_at_run(): """ Check that each autotuning run (ie with a given block shape) takes - ``autotuning.options['at_squeezer']`` timesteps, for an operator + ``autotuning.core.options['at_squeezer']`` timesteps, for an operator performing the increment ``a[t + timeorder, ...] = f(a[t, ...], ...)``. """ + from devito.core.autotuning import options buffer = StringIO() temporary_handler = logging.StreamHandler(buffer)