Skip to content

Commit

Permalink
Merge pull request #539 from opesci/dle-driver
Browse files Browse the repository at this point in the history
Give YASK more freedom to drive the DLE
  • Loading branch information
FabioLuporini authored Apr 12, 2018
2 parents 04304cd + 5b23914 commit e6f1de9
Show file tree
Hide file tree
Showing 13 changed files with 212 additions and 130 deletions.
9 changes: 9 additions & 0 deletions devito/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
backend as well) are used to run Devito on standard CPU architectures.
"""

from devito.dle import (BasicRewriter, AdvancedRewriter, AdvancedRewriterSafeMath,
SpeculativeRewriter, init_dle)
from devito.parameters import Parameters, add_sub_configuration

core_configuration = Parameters('core')
Expand All @@ -15,6 +17,13 @@

add_sub_configuration(core_configuration, env_vars_mapper)

# Initialize the DLE
modes = {'basic': BasicRewriter,
'advanced': AdvancedRewriter,
'advanced-safemath': AdvancedRewriterSafeMath,
'speculative': SpeculativeRewriter}
init_dle(modes)

# The following used by backends.backendSelector
from devito.function import (Constant, Function, TimeFunction, SparseFunction, # noqa
SparseTimeFunction)
Expand Down
1 change: 1 addition & 0 deletions devito/dle/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
from devito.dle.blocking_utils import * # noqa
from devito.dle.transformer import * # noqa
from devito.dle.backends import * # noqa
5 changes: 3 additions & 2 deletions devito/dle/backends/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from devito.dle.backends.common import * # noqa
from devito.dle.backends.utils import * # noqa
from devito.dle.backends.basic import BasicRewriter # noqa
from devito.dle.backends.advanced import (DevitoRewriter, DevitoSpeculativeRewriter, # noqa
DevitoRewriterSafeMath, DevitoCustomRewriter) # noqa
from devito.dle.backends.parallelizer import Ompizer # noqa
from devito.dle.backends.advanced import (AdvancedRewriter, SpeculativeRewriter, # noqa
AdvancedRewriterSafeMath, CustomRewriter) # noqa
113 changes: 28 additions & 85 deletions devito/dle/backends/advanced.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,30 +7,30 @@

import cgen
import numpy as np
import psutil

from devito.cgen_utils import ccode
from devito.dimension import Dimension
from devito.dle import fold_blockable_tree, unfold_blocked_tree
from devito.dle.backends import (BasicRewriter, BlockingArg, dle_pass, omplang,
from devito.dle.backends import (BasicRewriter, BlockingArg, Ompizer, dle_pass,
simdinfo, get_simd_flag, get_simd_items)
from devito.exceptions import DLEException
from devito.ir.iet import (Block, Expression, Iteration, List,
PARALLEL, ELEMENTAL, REMAINDER, tagger,
FindNodes, FindSymbols, IsPerfectIteration, Transformer,
compose_nodes, retrieve_iteration_tree, filter_iterations)
from devito.ir.iet import (Iteration, List, PARALLEL, ELEMENTAL, REMAINDER, tagger,
FindSymbols, IsPerfectIteration, Transformer, compose_nodes,
retrieve_iteration_tree)
from devito.logger import dle_warning
from devito.tools import as_tuple


class DevitoRewriter(BasicRewriter):
class AdvancedRewriter(BasicRewriter):

_parallelizer = Ompizer

def _pipeline(self, state):
self._avoid_denormals(state)
self._loop_blocking(state)
self._simdize(state)
if self.params['openmp'] is True:
self._ompize(state)
self._parallelize(state)
self._create_elemental_functions(state)
self._minimize_remainders(state)

Expand Down Expand Up @@ -196,82 +196,25 @@ def _simdize(self, nodes, state):
except KeyError:
aligned = []
if aligned:
simd = omplang['simd-for-aligned']
simd = Ompizer.lang['simd-for-aligned']
simd = as_tuple(simd(','.join([j.name for j in aligned]),
simdinfo[get_simd_flag()]))
else:
simd = as_tuple(omplang['simd-for'])
simd = as_tuple(Ompizer.lang['simd-for'])
mapper[i] = i._rebuild(pragmas=i.pragmas + ignore_deps + simd)

processed = Transformer(mapper).visit(nodes)

return processed, {}

@dle_pass
def _ompize(self, nodes, state):
def _parallelize(self, iet, state):
"""
Add OpenMP pragmas to the Iteration/Expression tree to emit parallel code
"""
# Group by outer loop so that we can embed within the same parallel region
was_tagged = False
groups = OrderedDict()
for tree in retrieve_iteration_tree(nodes):
# Determine the number of consecutive parallelizable Iterations
key = lambda i: i.is_ParallelRelaxed and\
not (i.is_Elementizable or i.is_Vectorizable)
candidates = filter_iterations(tree, key=key, stop='asap')
if not candidates:
was_tagged = False
continue
# Consecutive tagged Iteration go in the same group
is_tagged = any(i.tag is not None for i in tree)
key = len(groups) - (is_tagged & was_tagged)
handle = groups.setdefault(key, OrderedDict())
handle[candidates[0]] = candidates
was_tagged = is_tagged

# Handle parallelizable loops
mapper = OrderedDict()
for group in groups.values():
private = []
for root, tree in group.items():
# Heuristic: if at least two parallel loops are available and the
# physical core count is greater than self.thresholds['collapse'],
# then omp-collapse the loops
nparallel = len(tree)
if psutil.cpu_count(logical=False) < self.thresholds['collapse'] or\
nparallel < 2:
parallel = omplang['for']
else:
parallel = omplang['collapse'](nparallel)

# Introduce the `omp parallel` pragma
if root.is_ParallelAtomic:
# Introduce the `omp atomic` pragmas
exprs = FindNodes(Expression).visit(root)
subs = {i: List(header=omplang['atomic'], body=i)
for i in exprs if i.is_increment}
handle = Transformer(subs).visit(root)
mapper[root] = handle._rebuild(pragmas=root.pragmas + (parallel,))
else:
mapper[root] = root._rebuild(pragmas=root.pragmas + (parallel,))

# Track the thread-private and thread-shared variables
private.extend([i for i in FindSymbols('symbolics').visit(root)
if i.is_Array and i._mem_stack])

# Build the parallel region
private = sorted(set([i.name for i in private]))
private = ('private(%s)' % ','.join(private)) if private else ''
rebuilt = [v for k, v in mapper.items() if k in group]
par_region = Block(header=omplang['par-region'](private), body=rebuilt)
for k, v in list(mapper.items()):
if isinstance(v, Iteration):
mapper[k] = None if v.is_Remainder else par_region

processed = Transformer(mapper).visit(nodes)

return processed, {}
def key(i):
return i.is_ParallelRelaxed and not (i.is_Elementizable or i.is_Vectorizable)
return self._parallelizer(key).make_omp_parallel_iet(iet), {}

@dle_pass
def _minimize_remainders(self, nodes, state):
Expand Down Expand Up @@ -343,31 +286,31 @@ def _minimize_remainders(self, nodes, state):
return processed, {}


class DevitoRewriterSafeMath(DevitoRewriter):
class AdvancedRewriterSafeMath(AdvancedRewriter):

"""
This Rewriter is slightly less aggressive than :class:`DevitoRewriter`, as it
This Rewriter is slightly less aggressive than :class:`AdvancedRewriter`, as it
doesn't drop denormal numbers, which may sometimes harm the numerical precision.
"""

def _pipeline(self, state):
self._loop_blocking(state)
self._simdize(state)
if self.params['openmp'] is True:
self._ompize(state)
self._parallelize(state)
self._create_elemental_functions(state)
self._minimize_remainders(state)


class DevitoSpeculativeRewriter(DevitoRewriter):
class SpeculativeRewriter(AdvancedRewriter):

def _pipeline(self, state):
self._avoid_denormals(state)
self._loop_blocking(state)
self._simdize(state)
self._nontemporal_stores(state)
if self.params['openmp'] is True:
self._ompize(state)
self._parallelize(state)
self._create_elemental_functions(state)
self._minimize_remainders(state)

Expand Down Expand Up @@ -400,26 +343,26 @@ def _nontemporal_stores(self, nodes, state):
return processed, {'flags': 'ntstores'}


class DevitoCustomRewriter(DevitoSpeculativeRewriter):
class CustomRewriter(SpeculativeRewriter):

passes_mapper = {
'denormals': DevitoSpeculativeRewriter._avoid_denormals,
'blocking': DevitoSpeculativeRewriter._loop_blocking,
'openmp': DevitoSpeculativeRewriter._ompize,
'simd': DevitoSpeculativeRewriter._simdize,
'split': DevitoSpeculativeRewriter._create_elemental_functions
'denormals': SpeculativeRewriter._avoid_denormals,
'blocking': SpeculativeRewriter._loop_blocking,
'openmp': SpeculativeRewriter._parallelize,
'simd': SpeculativeRewriter._simdize,
'split': SpeculativeRewriter._create_elemental_functions
}

def __init__(self, nodes, passes, params):
try:
passes = passes.split(',')
except AttributeError:
# Already in tuple format
if not all(i in DevitoCustomRewriter.passes_mapper for i in passes):
if not all(i in CustomRewriter.passes_mapper for i in passes):
raise DLEException
self.passes = passes
super(DevitoCustomRewriter, self).__init__(nodes, params)
super(CustomRewriter, self).__init__(nodes, params)

def _pipeline(self, state):
for i in self.passes:
DevitoCustomRewriter.passes_mapper[i](self, state)
CustomRewriter.passes_mapper[i](self, state)
7 changes: 0 additions & 7 deletions devito/dle/backends/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,6 @@ class AbstractRewriter(object):

__metaclass__ = abc.ABCMeta

"""
Bag of thresholds, to be used to trigger or prevent certain transformations.
"""
thresholds = {
'collapse': 32, # Available physical cores
}

def __init__(self, nodes, params):
self.nodes = nodes
self.params = params
Expand Down
104 changes: 104 additions & 0 deletions devito/dle/backends/parallelizer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
from collections import OrderedDict

import cgen as c
import psutil

from devito.ir.iet import (FindSymbols, FindNodes, Transformer, Block, Expression,
List, Iteration, retrieve_iteration_tree, filter_iterations)


class Ompizer(object):

COLLAPSE = 32
"""Use a collapse clause if the number of available physical cores is
greater than this threshold."""

lang = {
'for': c.Pragma('omp for schedule(static)'),
'collapse': lambda i: c.Pragma('omp for collapse(%d) schedule(static)' % i),
'par-region': lambda i: c.Pragma('omp parallel %s' % i),
'par-for': c.Pragma('omp parallel for schedule(static)'),
'simd-for': c.Pragma('omp simd'),
'simd-for-aligned': lambda i, j: c.Pragma('omp simd aligned(%s:%d)' % (i, j)),
'atomic': c.Pragma('omp atomic update')
}
"""
Shortcuts for the OpenMP language.
"""

def __init__(self, key):
"""
:param key: A function returning True if ``v`` can be parallelized,
False otherwise.
"""
self.key = key

def _make_omp_parallel_tree(self, root, candidates):
"""
Return a mapper to parallelize the :class:`Iteration`s within /root/.
"""
mapper = OrderedDict()

# Heuristic: if at least two parallel loops are available and the
# physical core count is greater than COLLAPSE, then omp-collapse them
nparallel = len(candidates)
if psutil.cpu_count(logical=False) < Ompizer.COLLAPSE or\
nparallel < 2:
parallel = self.lang['for']
else:
parallel = self.lang['collapse'](nparallel)

# Introduce the `omp parallel` pragma
if root.is_ParallelAtomic:
# Introduce the `omp atomic` pragmas
exprs = FindNodes(Expression).visit(root)
subs = {i: List(header=self.lang['atomic'], body=i)
for i in exprs if i.is_increment}
handle = Transformer(subs).visit(root)
mapper[root] = handle._rebuild(pragmas=root.pragmas + (parallel,))
else:
mapper[root] = root._rebuild(pragmas=root.pragmas + (parallel,))

return mapper

def make_omp_parallel_iet(self, iet):
"""
Transform ``iet`` by decorating its parallel :class:`Iteration`s with
suitable ``#pragma omp ...`` triggering thread-level parallelism.
"""
# Group sequences of loops that should go within the same parallel region
was_tagged = False
groups = OrderedDict()
for tree in retrieve_iteration_tree(iet):
# Determine the number of consecutive parallelizable Iterations
candidates = filter_iterations(tree, key=self.key, stop='asap')
if not candidates:
was_tagged = False
continue
# Consecutive tagged Iteration go in the same group
is_tagged = any(i.tag is not None for i in tree)
key = len(groups) - (is_tagged & was_tagged)
handle = groups.setdefault(key, OrderedDict())
handle[candidates[0]] = candidates
was_tagged = is_tagged

mapper = OrderedDict()
for group in groups.values():
private = []
for root, candidates in group.items():
mapper.update(self._make_omp_parallel_tree(root, candidates))

# Track the thread-private and thread-shared variables
private.extend([i for i in FindSymbols('symbolics').visit(root)
if i.is_Array and i._mem_stack])

# Build the parallel region
private = sorted(set([i.name for i in private]))
private = ('private(%s)' % ','.join(private)) if private else ''
rebuilt = [v for k, v in mapper.items() if k in group]
par_region = Block(header=self.lang['par-region'](private), body=rebuilt)
for k, v in list(mapper.items()):
if isinstance(v, Iteration):
mapper[k] = None if v.is_Remainder else par_region

return Transformer(mapper).visit(iet)
13 changes: 0 additions & 13 deletions devito/dle/backends/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,6 @@

import cgen as c

"""
A dictionary to quickly access standard OpenMP pragmas
"""
omplang = {
'for': c.Pragma('omp for schedule(static)'),
'collapse': lambda i: c.Pragma('omp for collapse(%d) schedule(static)' % i),
'par-region': lambda i: c.Pragma('omp parallel %s' % i),
'par-for': c.Pragma('omp parallel for schedule(static)'),
'simd-for': c.Pragma('omp simd'),
'simd-for-aligned': lambda i, j: c.Pragma('omp simd aligned(%s:%d)' % (i, j)),
'atomic': c.Pragma('omp atomic update')
}

"""
Compiler-specific language
"""
Expand Down
Loading

0 comments on commit e6f1de9

Please sign in to comment.