From 0a5add5c8573792e00adbd4e11b410a93ec8dd3c Mon Sep 17 00:00:00 2001 From: Fabio Luporini Date: Thu, 14 Mar 2024 09:44:39 +0000 Subject: [PATCH 01/16] compiler: Restructure MemoryAllocator hierarchy --- devito/data/allocators.py | 97 ++++++++++++++++++++++----------------- 1 file changed, 54 insertions(+), 43 deletions(-) diff --git a/devito/data/allocators.py b/devito/data/allocators.py index 55977796ed..72289c57bf 100644 --- a/devito/data/allocators.py +++ b/devito/data/allocators.py @@ -18,36 +18,18 @@ 'default_allocator'] -class MemoryAllocator: +class AbstractMemoryAllocator: - """Abstract class defining the interface to memory allocators.""" + """ + The MemoryAllocator interface. + """ __metaclass__ = abc.ABCMeta - _attempted_init = False - lib = None - guaranteed_alignment = 64 """Guaranteed data alignment.""" - @classmethod - def available(cls): - if cls._attempted_init is False: - cls.initialize() - cls._attempted_init = True - return cls.lib is not None - - @classmethod - def initialize(cls): - """ - Initialize the MemoryAllocator. - - Notes - ----- - This method must be implemented by all subclasses of MemoryAllocator. - """ - return - + @abc.abstractmethod def alloc(self, shape, dtype, padding=0): """ Allocate memory. @@ -64,11 +46,52 @@ def alloc(self, shape, dtype, padding=0): Returns ------- - pointer, memfree_args - The first element of the tuple is the reference that can be used to - access the data as a ctypes object. The second element is an opaque + ndarray, memfree_args + The first element of the tuple is a numpy array that uses the + allocated memory underneath. The second element is an opaque object that is needed only for the "memfree" call. """ + return + + @abc.abstractmethod + def free(self, *args): + """ + Free memory previously allocated with `self.alloc`. + + Arguments are provided exactly as returned in the second element of the + tuple returned by `alloc`. + """ + return + + +class MemoryAllocator(AbstractMemoryAllocator): + + """ + A memory allocator implementing the alloc method by resorting to a C-level + memory allocation function, to be specified by subclasses. + + This is still an abstract class, and subclasses are expected to implement the + `_alloc_C_libcall` and `free` methods. + """ + + _attempted_init = False + lib = None + + @classmethod + def available(cls): + if cls._attempted_init is False: + cls.initialize() + cls._attempted_init = True + return cls.lib is not None + + @classmethod + def initialize(cls): + """ + Initialize the MemoryAllocator. + """ + return + + def alloc(self, shape, dtype, padding=0): datasize = int(reduce(mul, shape)) ctype = dtype_to_ctype(dtype) @@ -117,20 +140,6 @@ def _alloc_C_libcall(self, size, ctype): """ return - @abc.abstractmethod - def free(self, *args): - """ - Free memory previously allocated with ``self.alloc``. - - Arguments are provided exactly as returned in the second element of the - tuple returned by _alloc_C_libcall - - Notes - ----- - This method must be implemented by all subclasses of MemoryAllocator. - """ - return - class PosixAllocator(MemoryAllocator): @@ -385,7 +394,9 @@ def alloc(self, shape, dtype, padding=0): ALLOC_NUMA_ANY = NumaAllocator('any') ALLOC_NUMA_LOCAL = NumaAllocator('local') -custom_allocators = {} +custom_allocators = { + 'fallback': ALLOC_ALIGNED, +} """User-defined allocators.""" @@ -397,7 +408,7 @@ def register_allocator(name, allocator): raise TypeError("name must be a str, not `%s`" % type(name)) if name in custom_allocators: raise ValueError("A MemoryAllocator for `%s` already exists" % name) - if not isinstance(allocator, MemoryAllocator): + if not isinstance(allocator, AbstractMemoryAllocator): raise TypeError("Expected a MemoryAllocator, not `%s`" % type(allocator)) custom_allocators[name] = allocator @@ -440,4 +451,4 @@ def default_allocator(name=None): infer_knl_mode() == 'flat'): return ALLOC_KNL_MCDRAM else: - return custom_allocators.get('default', ALLOC_ALIGNED) + return custom_allocators.get('default', custom_allocators['fallback']) From 4bb8bea9be271440ea3f6ca8615a180879b1351e Mon Sep 17 00:00:00 2001 From: Fabio Luporini Date: Thu, 14 Mar 2024 08:49:51 +0000 Subject: [PATCH 02/16] compiler: Fix Data.__reduce__ --- devito/data/data.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/devito/data/data.py b/devito/data/data.py index 31db578cd1..ca18006331 100644 --- a/devito/data/data.py +++ b/devito/data/data.py @@ -91,7 +91,7 @@ def __del__(self): def __reduce__(self): warning("Pickling of `Data` objects is not supported. Casting to `numpy.ndarray`") - return np.array(self).__reduce__() + return self.view(np.ndarray).__reduce__() def __array_finalize__(self, obj): # `self` is the newly created object From 1bb73072b41ed7600f1ab24ee4091a67ce061418 Mon Sep 17 00:00:00 2001 From: Fabio Luporini Date: Fri, 15 Mar 2024 14:40:50 +0000 Subject: [PATCH 03/16] compiler: Tidy up --- devito/mpi/distributed.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/devito/mpi/distributed.py b/devito/mpi/distributed.py index 9d4e1d31bf..6a278df143 100644 --- a/devito/mpi/distributed.py +++ b/devito/mpi/distributed.py @@ -18,8 +18,6 @@ from devito.types.utils import DimensionTuple -__all__ = ['CustomTopology'] - # Do not prematurely initialize MPI # This allows launching a Devito program from within another Python program # that has *already* initialized MPI @@ -208,13 +206,15 @@ def __init__(self, shape, dimensions, input_comm=None, topology=None): self._input_comm = (input_comm or MPI.COMM_WORLD).Clone() if topology is None: - # `MPI.Compute_dims` sets the dimension sizes to be as close to each other - # as possible, using an appropriate divisibility algorithm. Thus, in 3D: + # `MPI.Compute_dims` sets the dimension sizes to be as close to + # each other as possible, using an appropriate divisibility + # algorithm. Thus, in 3D: # * topology[0] >= topology[1] >= topology[2] # * topology[0] * topology[1] * topology[2] == self._input_comm.size - # However, `MPI.Compute_dims` is distro-dependent, so we have to enforce - # some properties through our own wrapper (e.g., OpenMPI v3 does not - # guarantee that 9 ranks are arranged into a 3x3 grid when shape=(9, 9)) + # However, `MPI.Compute_dims` is distro-dependent, so we have + # to enforce some properties through our own wrapper (e.g., + # OpenMPI v3 does not guarantee that 9 ranks are arranged into + # a 3x3 grid when shape=(9, 9)) self._topology = compute_dims(self._input_comm.size, len(shape)) else: # A custom topology may contain integers or the wildcard '*' From b56481f7251cc8f6b17a93fc7e587d6cda9e2fb6 Mon Sep 17 00:00:00 2001 From: Fabio Luporini Date: Fri, 22 Mar 2024 10:22:32 +0000 Subject: [PATCH 04/16] compiler: Polishing --- devito/types/sparse.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/devito/types/sparse.py b/devito/types/sparse.py index d6845a4dc5..c92550ed9d 100644 --- a/devito/types/sparse.py +++ b/devito/types/sparse.py @@ -327,16 +327,6 @@ def _position_map(self): self.grid.dimensions, self.grid.origin_symbols)]) - @cached_property - def _dist_reorder_mask(self): - """ - An ordering mask that puts ``self._sparse_position`` at the front. - """ - ret = (self._sparse_position,) - ret += tuple(i for i, d in enumerate(self.dimensions) - if d is not self._sparse_dim) - return ret - @cached_property def dist_origin(self): return self._dist_origin @@ -394,6 +384,16 @@ def guard(self, expr=None): return out, temps + @cached_property + def _dist_reorder_mask(self): + """ + An ordering mask that puts ``self._sparse_position`` at the front. + """ + ret = (self._sparse_position,) + ret += tuple(i for i, d in enumerate(self.dimensions) + if d is not self._sparse_dim) + return ret + def _dist_scatter_mask(self, dmap=None): """ A mask to index into ``self.data``, which creates a new data array that From 34e7435221277bbf1ad5b0728d9c60e370fa9af6 Mon Sep 17 00:00:00 2001 From: Fabio Luporini Date: Fri, 22 Mar 2024 16:00:16 +0000 Subject: [PATCH 05/16] compiler: Relax checks for unpickling --- devito/mpi/routines.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/devito/mpi/routines.py b/devito/mpi/routines.py index 46fc2a8e3a..83dff98874 100644 --- a/devito/mpi/routines.py +++ b/devito/mpi/routines.py @@ -1200,7 +1200,7 @@ def _arg_defaults(self, allocator, alias, args=None): try: shape.append(getattr(f._size_owned[dim], side.name)) except AttributeError: - assert side is CENTER + assert side == CENTER shape.append(self._as_number(f._size_domain[dim], args)) entry.sizes = (c_int*len(shape))(*shape) @@ -1264,7 +1264,7 @@ def _arg_defaults(self, allocator, alias=None, args=None): v = getattr(f._offset_owned[dim], side.name) ofsg.append(self._as_number(v, args)) except AttributeError: - assert side is CENTER + assert side == CENTER ofsg.append(f._offset_owned[dim].left) entry.ofsg = (c_int*len(ofsg))(*ofsg) @@ -1276,7 +1276,7 @@ def _arg_defaults(self, allocator, alias=None, args=None): v = getattr(f._offset_halo[dim], side.flip().name) ofss.append(self._as_number(v, args)) except AttributeError: - assert side is CENTER + assert side == CENTER # Note `_offset_owned`, and not `_offset_halo`, is *not* a bug # here. If it's the CENTER we need, we can't just use # `_offset_halo[d].left` as otherwise we would pick the corner From 2fb80abafc95f4fd507552ed45291ba9f238c7f2 Mon Sep 17 00:00:00 2001 From: Fabio Luporini Date: Mon, 25 Mar 2024 15:02:23 +0000 Subject: [PATCH 06/16] compiler: Add AbstractFunction.is_transient --- devito/types/basic.py | 13 ++++++++++++- devito/types/dense.py | 4 ++-- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/devito/types/basic.py b/devito/types/basic.py index c077c78faa..2b01bd71c2 100644 --- a/devito/types/basic.py +++ b/devito/types/basic.py @@ -842,7 +842,7 @@ class AbstractFunction(sympy.Function, Basic, Pickable, Evaluable): """ __rkwargs__ = ('name', 'dtype', 'grid', 'halo', 'padding', 'ghost', - 'alias', 'space', 'function') + 'alias', 'space', 'function', 'is_transient') def __new__(cls, *args, **kwargs): # Preprocess arguments @@ -957,6 +957,13 @@ def __init_finalize__(self, *args, **kwargs): self._space = kwargs.get('space', 'mapped') assert self._space in ['local', 'mapped', 'host'] + # If True, the AbstractFunction is treated by the compiler as a "transient + # field", meaning that its content is only useful within an Operator + # execution, but the final data is not expected to be read back in + # Python-land by the user. This allows the compiler/run-time to apply + # certain optimizations, such as avoiding memory copies + self._is_transient = kwargs.get('is_transient', False) + @classmethod def __args_setup__(cls, *args, **kwargs): """ @@ -1208,6 +1215,10 @@ def ghost(self): def is_const(self): return False + @property + def is_transient(self): + return self._is_transient + @property def alias(self): return self._alias diff --git a/devito/types/dense.py b/devito/types/dense.py index 0a5385b7a5..299ab68dc0 100644 --- a/devito/types/dense.py +++ b/devito/types/dense.py @@ -18,8 +18,8 @@ from devito.symbolics import FieldFromPointer, normalize_args from devito.finite_differences import Differentiable, generate_fd_shortcuts from devito.finite_differences.tools import fd_weights_registry -from devito.tools import (ReducerMap, as_tuple, c_restrict_void_p, flatten, is_integer, - memoized_meth, dtype_to_ctype, humanbytes) +from devito.tools import (ReducerMap, as_tuple, c_restrict_void_p, flatten, + is_integer, memoized_meth, dtype_to_ctype, humanbytes) from devito.types.dimension import Dimension from devito.types.args import ArgProvider from devito.types.caching import CacheManager From a45476c0f33f0bfc0719ca50557bbfd752d80bce Mon Sep 17 00:00:00 2001 From: Fabio Luporini Date: Mon, 25 Mar 2024 16:08:06 +0000 Subject: [PATCH 07/16] arch: Add Cpu64.numa_domains --- devito/arch/archinfo.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/devito/arch/archinfo.py b/devito/arch/archinfo.py index 75cf5c6901..364e402a11 100644 --- a/devito/arch/archinfo.py +++ b/devito/arch/archinfo.py @@ -634,6 +634,13 @@ def __repr__(self): def _detect_isa(self): return 'unknown' + @property + def numa_domains(self): + """ + Number of NUMA domains, or None if unknown. + """ + return 1 + @property def threads_per_core(self): return self.cores_logical // self.cores_physical @@ -706,6 +713,14 @@ def simd_items_per_reg(self, dtype): assert self.simd_reg_nbytes % np.dtype(dtype).itemsize == 0 return int(self.simd_reg_nbytes / np.dtype(dtype).itemsize) + @cached_property + def numa_domains(self): + try: + return int(lscpu()['NUMA node(s)']) + except KeyError: + warning("NUMA domain count autodetection failed") + return 1 + @cached_property def memtotal(self): return psutil.virtual_memory().total @@ -785,6 +800,10 @@ def _mro(cls): def march(self): return None + @property + def numa_domains(self): + raise NotImplementedError + @cached_property def memtotal(self): info = get_gpu_info() From a4b6dc714256744dea002cc0df321ed66412eb01 Mon Sep 17 00:00:00 2001 From: Fabio Luporini Date: Tue, 26 Mar 2024 09:14:50 +0000 Subject: [PATCH 08/16] compiler: Tweak Operator pickling --- devito/operator/operator.py | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/devito/operator/operator.py b/devito/operator/operator.py index 5c781512b9..f2fad8244c 100644 --- a/devito/operator/operator.py +++ b/devito/operator/operator.py @@ -1005,14 +1005,16 @@ def lower_perfentry(v): # Pickling support def __getstate__(self): + state = dict(self.__dict__) + if self._lib: - state = dict(self.__dict__) # The compiled shared-object will be pickled; upon unpickling, it # will be restored into a potentially different temporary directory, # so the entire process during which the shared-object is loaded and # given to ctypes must be performed again state['_lib'] = None state['_cfunction'] = None + # Do not pickle the `args` used to construct the Operator. Not only # would this be completely useless, but it might also lead to # allocating additional memory upon unpickling, as the user-provided @@ -1020,12 +1022,16 @@ def __getstate__(self): # (e.g., f(t, x-1), f(t, x), f(t, x+1)), which are different objects # with distinct `.data` fields state['_args'] = None + with open(self._lib._name, 'rb') as f: state['binary'] = f.read() state['soname'] = self._soname - return state - else: - return self.__dict__ + + # The allocator depends on the environment at the unpickling site, so + # we don't pickle it + state['_allocator'] = None + + return state def __getnewargs_ex__(self): return (None,), {} @@ -1033,13 +1039,19 @@ def __getnewargs_ex__(self): def __setstate__(self, state): soname = state.pop('soname', None) binary = state.pop('binary', None) + for k, v in state.items(): setattr(self, k, v) + if soname is not None: self._compiler.save(soname, binary) self._lib = self._compiler.load(soname) self._lib.name = soname + self._allocator = default_allocator( + '%s.%s.%s' % (self._compiler.name, self._language, self._platform) + ) + # Default action (perform or bypass) for selected compilation passes upon # recursive compilation From 8c9015afd9594191b274b7aca9a50b56fbb54b33 Mon Sep 17 00:00:00 2001 From: Fabio Luporini Date: Tue, 26 Mar 2024 14:18:11 +0000 Subject: [PATCH 09/16] mpi: Initialize MPI with threading support --- devito/mpi/distributed.py | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/devito/mpi/distributed.py b/devito/mpi/distributed.py index 6a278df143..34a352d0ae 100644 --- a/devito/mpi/distributed.py +++ b/devito/mpi/distributed.py @@ -197,7 +197,13 @@ def __init__(self, shape, dimensions, input_comm=None, topology=None): if configuration['mpi']: # First time we enter here, we make sure MPI is initialized if not MPI.Is_initialized(): - MPI.Init() + try: + thread_level = mpi4py_thread_levels[mpi4py.rc.thread_level] + except KeyError: + assert False + + MPI.Init_thread(thread_level) + global init_by_devito init_by_devito = True @@ -706,3 +712,12 @@ def compute_dims(nprocs, ndim): else: v = int(v) return tuple(v for _ in range(ndim)) + + +# Yes, AFAICT, nothing like this is available in mpi4py +mpi4py_thread_levels = { + 'single': MPI.THREAD_SINGLE, + 'funneled': MPI.THREAD_FUNNELED, + 'serialized': MPI.THREAD_SERIALIZED, + 'multiple': MPI.THREAD_MULTIPLE +} From fd2a614a8df5b8caf23088beced55e19a793e15c Mon Sep 17 00:00:00 2001 From: Fabio Luporini Date: Fri, 5 Apr 2024 10:50:09 +0000 Subject: [PATCH 10/16] compiler: Refactor builtins --- devito/builtins/arithmetic.py | 31 ++++++------------------------- devito/builtins/utils.py | 35 ++++++++++++++++++++++++++++++----- 2 files changed, 36 insertions(+), 30 deletions(-) diff --git a/devito/builtins/arithmetic.py b/devito/builtins/arithmetic.py index f24a0e56ea..0e1e7fbcdc 100644 --- a/devito/builtins/arithmetic.py +++ b/devito/builtins/arithmetic.py @@ -3,24 +3,8 @@ import devito as dv from devito.builtins.utils import make_retval - __all__ = ['norm', 'sumall', 'sum', 'inner', 'mmin', 'mmax'] -accumulator_mapper = { - # Integer accumulates on Float64 - np.int8: np.float64, np.uint8: np.float64, - np.int16: np.float64, np.uint16: np.float64, - np.int32: np.float64, np.uint32: np.float64, - np.int64: np.float64, np.uint64: np.float64, - # FloatX accumulates on Float2X - np.float16: np.float32, - np.float32: np.float64, - # NOTE: np.float128 isn't really a thing, see for example - # https://github.com/numpy/numpy/issues/10288 - # https://docs.oracle.com/cd/E19957-01/806-3568/ncg_goldberg.html#1070 - np.float64: np.float64 -} - @dv.switchconfig(log_level='ERROR') def norm(f, order=2): @@ -43,9 +27,8 @@ def norm(f, order=2): # otherwise we would eventually be summing more than expected p, eqns = f.guard() if f.is_SparseFunction else (f, []) - dtype = accumulator_mapper[f.dtype] - n = make_retval(f.grid, dtype) - s = dv.types.Symbol(name='sum', dtype=dtype) + n = make_retval(f) + s = dv.types.Symbol(name='sum', dtype=n.dtype) op = dv.Operator([dv.Eq(s, 0.0)] + eqns + [dv.Inc(s, dv.Abs(Pow(p, order))), dv.Eq(n[0], s)], @@ -128,9 +111,8 @@ def sumall(f): # otherwise we would eventually be summing more than expected p, eqns = f.guard() if f.is_SparseFunction else (f, []) - dtype = accumulator_mapper[f.dtype] - n = make_retval(f.grid, dtype) - s = dv.types.Symbol(name='sum', dtype=dtype) + n = make_retval(f) + s = dv.types.Symbol(name='sum', dtype=n.dtype) op = dv.Operator([dv.Eq(s, 0.0)] + eqns + [dv.Inc(s, p), dv.Eq(n[0], s)], @@ -183,9 +165,8 @@ def inner(f, g): # otherwise we would eventually be summing more than expected rhs, eqns = f.guard(f*g) if f.is_SparseFunction else (f*g, []) - dtype = accumulator_mapper[f.dtype] - n = make_retval(f.grid or g.grid, dtype) - s = dv.types.Symbol(name='sum', dtype=dtype) + n = make_retval(f) + s = dv.types.Symbol(name='sum', dtype=n.dtype) op = dv.Operator([dv.Eq(s, 0.0)] + eqns + [dv.Inc(s, rhs), dv.Eq(n[0], s)], diff --git a/devito/builtins/utils.py b/devito/builtins/utils.py index 786dbbce48..32f59c731f 100644 --- a/devito/builtins/utils.py +++ b/devito/builtins/utils.py @@ -1,5 +1,7 @@ from functools import wraps +import numpy as np + import devito as dv from devito.symbolics import uxreplace from devito.tools import as_tuple @@ -7,20 +9,43 @@ __all__ = ['make_retval', 'nbl_to_padsize', 'pad_outhalo', 'abstract_args'] -def make_retval(grid, dtype): +accumulator_mapper = { + # Integer accumulates on Float64 + np.int8: np.float64, np.uint8: np.float64, + np.int16: np.float64, np.uint16: np.float64, + np.int32: np.float64, np.uint32: np.float64, + np.int64: np.float64, np.uint64: np.float64, + # FloatX accumulates on Float2X + np.float16: np.float32, + np.float32: np.float64, + # NOTE: np.float128 isn't really a thing, see for example + # https://github.com/numpy/numpy/issues/10288 + # https://docs.oracle.com/cd/E19957-01/806-3568/ncg_goldberg.html#1070 + np.float64: np.float64 +} + + +def make_retval(f): """ Devito does not support passing values by reference. This function creates a dummy Function of size 1 to store the return value of a builtin applied to `f`. """ - if grid is None: - raise ValueError("Expected Grid, got None") + if f.grid is None: + raise ValueError("No Grid available") + + cls = make_retval.cls or dv.Function + + dtype = accumulator_mapper[f.dtype] i = dv.Dimension(name='mri',) - n = dv.Function(name='n', shape=(1,), dimensions=(i,), grid=grid, - dtype=dtype, space='host') + n = cls(name='n', shape=(1,), dimensions=(i,), grid=f.grid, + dtype=dtype, space='host') + n.data[:] = 0 + return n +make_retval.cls = None # noqa def nbl_to_padsize(nbl, ndim): From 17795db45bddad7a9a121fe62236e90f3bf0c3aa Mon Sep 17 00:00:00 2001 From: Fabio Luporini Date: Fri, 5 Apr 2024 13:17:39 +0000 Subject: [PATCH 11/16] compiler: Call clear_cache at program termination --- devito/__init__.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/devito/__init__.py b/devito/__init__.py index f75e94ce79..214a0ee8f2 100644 --- a/devito/__init__.py +++ b/devito/__init__.py @@ -1,3 +1,4 @@ +import atexit from itertools import product from . import _version @@ -159,4 +160,14 @@ def mode_performance(): configuration['opt-options']['blockinner'] = True +# Ensure the SymPy caches are purged at exit +# For whatever reason, if we don't do this the garbage collector won't its +# job properly and thus we may end up missing some custom __del__'s +atexit.register(clear_cache) + + __version__ = _version.get_versions()['version'] + + +# Clean up namespace +del atexit, product From 7bf3037c1668feb47ea170e6169650baac35d2fe Mon Sep 17 00:00:00 2001 From: Fabio Luporini Date: Mon, 8 Apr 2024 15:02:17 +0000 Subject: [PATCH 12/16] api: Expose devito_mpi_init --- devito/mpi/distributed.py | 30 +++++++++++++++++++----------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/devito/mpi/distributed.py b/devito/mpi/distributed.py index 34a352d0ae..e677f91ca0 100644 --- a/devito/mpi/distributed.py +++ b/devito/mpi/distributed.py @@ -65,7 +65,24 @@ def __getattr__(self, name): return None -__all__ = ['Distributor', 'SparseDistributor', 'MPI', 'CustomTopology'] +__all__ = ['Distributor', 'SparseDistributor', 'MPI', 'CustomTopology', + 'devito_mpi_init'] + + +def devito_mpi_init(): + """ + Initialize MPI, if not already initialized. + """ + if not MPI.Is_initialized(): + try: + thread_level = mpi4py_thread_levels[mpi4py.rc.thread_level] + except KeyError: + assert False + + MPI.Init_thread(thread_level) + + global init_by_devito + init_by_devito = True class AbstractDistributor(ABC): @@ -196,16 +213,7 @@ def __init__(self, shape, dimensions, input_comm=None, topology=None): if configuration['mpi']: # First time we enter here, we make sure MPI is initialized - if not MPI.Is_initialized(): - try: - thread_level = mpi4py_thread_levels[mpi4py.rc.thread_level] - except KeyError: - assert False - - MPI.Init_thread(thread_level) - - global init_by_devito - init_by_devito = True + devito_mpi_init() # Note: the cloned communicator doesn't need to be explicitly freed; # mpi4py takes care of that when the object gets out of scope From f0ec96718a9f266575bd7b636046fae96c0a76ca Mon Sep 17 00:00:00 2001 From: Fabio Luporini Date: Mon, 8 Apr 2024 15:40:14 +0000 Subject: [PATCH 13/16] api: Refine optional MPI init / finalize --- devito/mpi/distributed.py | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/devito/mpi/distributed.py b/devito/mpi/distributed.py index e677f91ca0..ebd5c9b002 100644 --- a/devito/mpi/distributed.py +++ b/devito/mpi/distributed.py @@ -37,9 +37,7 @@ # will be called only at the very end and only if necessary, after all cloned # communicators will have been freed def cleanup(): - global init_by_devito - if init_by_devito and MPI.Is_initialized() and not MPI.Is_finalized(): - MPI.Finalize() + devito_mpi_finalize() atexit.register(cleanup) except ImportError as e: # Dummy fallback in case mpi4py/MPI aren't available @@ -66,7 +64,7 @@ def __getattr__(self, name): __all__ = ['Distributor', 'SparseDistributor', 'MPI', 'CustomTopology', - 'devito_mpi_init'] + 'devito_mpi_init', 'devito_mpi_finalize'] def devito_mpi_init(): @@ -84,6 +82,18 @@ def devito_mpi_init(): global init_by_devito init_by_devito = True + return True + return False + + +def devito_mpi_finalize(): + """ + Finalize MPI, if initialized by Devito. + """ + global init_by_devito + if init_by_devito and MPI.Is_initialized() and not MPI.Is_finalized(): + MPI.Finalize() + class AbstractDistributor(ABC): From fc91dea016e3fdac73c0166f9f0b1ed4ec7eca85 Mon Sep 17 00:00:00 2001 From: Fabio Luporini Date: Tue, 9 Apr 2024 12:36:34 +0000 Subject: [PATCH 14/16] compiler: Stash Grid.topology --- devito/types/grid.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/devito/types/grid.py b/devito/types/grid.py index 8dbc4767ef..2d5d46b20e 100644 --- a/devito/types/grid.py +++ b/devito/types/grid.py @@ -163,6 +163,7 @@ def __init__(self, shape, extent=None, origin=None, dimensions=None, # Create a Distributor, used internally to implement domain decomposition # by all Functions defined on this Grid + self._topology = topology self._distributor = Distributor(shape, dimensions, comm, topology) # The physical extent @@ -298,9 +299,14 @@ def dimension_map(self): return {d: GlobalLocal(g, l) for d, g, l in zip(self.dimensions, self.shape, self.shape_local)} + @property + def topology(self): + """The topology used for decomposing the CartesianDiscretization.""" + return self._topology + @property def distributor(self): - """The Distributor used for MPI-decomposing the CartesianDiscretization.""" + """The Distributor used for decomposing the CartesianDiscretization.""" return self._distributor @property From e39e014916d80ffcd9e82f2e88202a19887e11d3 Mon Sep 17 00:00:00 2001 From: Fabio Luporini Date: Tue, 9 Apr 2024 14:03:54 +0000 Subject: [PATCH 15/16] arch: Add Cpu64.cores_physical_per_numa_domain --- devito/arch/archinfo.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/devito/arch/archinfo.py b/devito/arch/archinfo.py index 364e402a11..ce17ecfc13 100644 --- a/devito/arch/archinfo.py +++ b/devito/arch/archinfo.py @@ -721,6 +721,10 @@ def numa_domains(self): warning("NUMA domain count autodetection failed") return 1 + @property + def cores_physical_per_numa_domain(self): + return self.cores_physical // self.numa_domains + @cached_property def memtotal(self): return psutil.virtual_memory().total From b6f1a087cb0b2b71b0f0f4f2f8a3080519132220 Mon Sep 17 00:00:00 2001 From: Fabio Luporini Date: Mon, 3 Jun 2024 08:43:00 +0000 Subject: [PATCH 16/16] tests: Add decoupler marker --- conftest.py | 93 ++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 71 insertions(+), 22 deletions(-) diff --git a/conftest.py b/conftest.py index c38829f6e8..c88248e2b4 100644 --- a/conftest.py +++ b/conftest.py @@ -122,6 +122,29 @@ def EVAL(exprs, *args): return processed[0] if isinstance(exprs, str) else processed +def get_testname(item): + if item.cls is not None: + return "%s::%s::%s" % (item.fspath, item.cls.__name__, item.name) + else: + return "%s::%s" % (item.fspath, item.name) + + +def set_run_reset(env_vars, call): + old_env_vars = {k: os.environ.get(k, None) for k in env_vars} + + os.environ.update(env_vars) + os.environ['DEVITO_PYTEST_FLAG'] = '1' + + try: + check_call(call) + return True + except: + return False + finally: + os.environ['DEVITO_PYTEST_FLAG'] = '0' + os.environ.update({k: v for k, v in old_env_vars.items() if v is not None}) + + def parallel(item, m): """ Run a test in parallel. Readapted from: @@ -141,14 +164,12 @@ def parallel(item, m): else: raise ValueError("Can't run test: unexpected mode `%s`" % m) + env_vars = {'DEVITO_MPI': scheme} + pyversion = sys.executable + testname = get_testname(item) # Only spew tracebacks on rank 0. # Run xfailing tests to ensure that errors are reported to calling process - if item.cls is not None: - testname = "%s::%s::%s" % (item.fspath, item.cls.__name__, item.name) - else: - testname = "%s::%s" % (item.fspath, item.name) - args = ["-n", "1", pyversion, "-m", "pytest", "--no-summary", "-s", "--runxfail", "-qq", testname] if nprocs > 1: @@ -161,16 +182,24 @@ def parallel(item, m): else: call = [mpi_exec] + args - # Tell the MPI ranks that they are running a parallel test - os.environ['DEVITO_MPI'] = scheme - try: - check_call(call) - res = True - except: - res = False - finally: - os.environ['DEVITO_MPI'] = '0' - return res + return set_run_reset(env_vars, call) + + +def decoupler(item, m): + """ + Run a test in decoupled mode. + """ + mpi_exec = 'mpiexec' + assert sniff_mpi_distro(mpi_exec) != 'unknown', "Decoupled tests require MPI" + + env_vars = {'DEVITO_DECOUPLER': '1'} + if isinstance(m, int): + env_vars['DEVITO_DECOUPLER_WORKERS'] = str(m) + + testname = get_testname(item) + call = ["pytest", "--no-summary", "-s", "--runxfail", testname] + + return set_run_reset(env_vars, call) def pytest_configure(config): @@ -179,6 +208,10 @@ def pytest_configure(config): "markers", "parallel(mode): mark test to run in parallel" ) + config.addinivalue_line( + "markers", + "decoupler(mode): mark test to run in decoupled mode", + ) def pytest_generate_tests(metafunc): @@ -187,26 +220,37 @@ def pytest_generate_tests(metafunc): if 'mode' in metafunc.fixturenames: markers = metafunc.definition.iter_markers() for marker in markers: - if marker.name == 'parallel': + if marker.name in ('parallel', 'decoupler'): mode = list(as_tuple(marker.kwargs.get('mode', 2))) metafunc.parametrize("mode", mode) @pytest.hookimpl(tryfirst=True, hookwrapper=True) def pytest_runtest_call(item): - partest = os.environ.get('DEVITO_MPI', 0) + inside_pytest_marker = os.environ.get('DEVITO_PYTEST_FLAG', 0) try: - partest = int(partest) + inside_pytest_marker = int(inside_pytest_marker) except ValueError: pass - if item.get_closest_marker("parallel") and not partest: + if inside_pytest_marker: + outcome = yield + + elif item.get_closest_marker("parallel"): # Spawn parallel processes to run test outcome = parallel(item, item.funcargs['mode']) if outcome: pytest.skip(f"{item} success in parallel") else: pytest.fail(f"{item} failed in parallel") + + elif item.get_closest_marker("decoupler"): + outcome = decoupler(item, item.funcargs.get('mode')) + if outcome: + pytest.skip(f"{item} success in decoupled mode") + else: + pytest.fail(f"{item} failed in decoupled mode") + else: outcome = yield @@ -215,12 +259,17 @@ def pytest_runtest_call(item): def pytest_runtest_makereport(item, call): outcome = yield result = outcome.get_result() - partest = os.environ.get('DEVITO_MPI', 0) + + inside_pytest_marker = os.environ.get('DEVITO_PYTEST_FLAG', 0) try: - partest = int(partest) + inside_pytest_marker = int(inside_pytest_marker) except ValueError: pass - if item.get_closest_marker("parallel") and not partest: + if inside_pytest_marker: + return + + if item.get_closest_marker("parallel") or \ + item.get_closest_marker("decoupler"): if call.when == 'call' and result.outcome == 'skipped': result.outcome = 'passed'