diff --git a/pyopencl/algorithm.py b/pyopencl/algorithm.py index 2448f149..2f10233d 100644 --- a/pyopencl/algorithm.py +++ b/pyopencl/algorithm.py @@ -58,7 +58,7 @@ def _extract_extra_args_types_values(extra_args): if isinstance(val, cl.array.Array): extra_args_types.append(VectorArg(val.dtype, name, with_offset=False)) extra_args_values.append(val) - extra_wait_for.extend(val.events) + extra_wait_for.extend(val.write_events) elif isinstance(val, np.generic): extra_args_types.append(ScalarArg(val.dtype, name)) extra_args_values.append(val) @@ -1163,7 +1163,7 @@ def __call__(self, queue, n_objects, *args, **kwargs): data_args.append(arg_val.base_data) if arg_descr.with_offset: data_args.append(arg_val.offset) - wait_for.extend(arg_val.events) + wait_for.extend(arg_val.write_events) else: data_args.append(arg_val) @@ -1182,7 +1182,7 @@ def __call__(self, queue, n_objects, *args, **kwargs): counts = cl.array.empty(queue, (n_objects + 1), index_dtype, allocator=allocator) counts[-1] = 0 - wait_for = wait_for + counts.events + wait_for = wait_for + counts.write_events # The scan will turn the "counts" array into the "starts" array # in-place. @@ -1236,7 +1236,7 @@ def __call__(self, queue, n_objects, *args, **kwargs): info_record.nonempty_indices, info_record.compressed_indices, info_record.num_nonempty_lists, - wait_for=[count_event, *info_record.compressed_indices.events]) + wait_for=[count_event, *info_record.compressed_indices.write_events]) info_record.starts = compressed_counts @@ -1265,13 +1265,13 @@ def __call__(self, queue, n_objects, *args, **kwargs): evt = scan_kernel( starts_ary, size=info_record.num_nonempty_lists, - wait_for=starts_ary.events) + wait_for=starts_ary.write_events) else: evt = scan_kernel(starts_ary, wait_for=[count_event], size=n_objects) starts_ary.setitem(0, 0, queue=queue, wait_for=[evt]) - scan_events.extend(starts_ary.events) + scan_events.extend(starts_ary.write_events) # retrieve count info_record.count = int(starts_ary[-1].get()) @@ -1433,7 +1433,7 @@ def __call__(self, queue, keys, values, nkeys, starts = (cl.array.empty(queue, (nkeys+1), starts_dtype, allocator=allocator) .fill(len(values_sorted_by_key), wait_for=[evt])) - evt, = starts.events + evt, = starts.write_events evt = knl_info.start_finder(starts, keys_sorted_by_key, range=slice(len(keys_sorted_by_key)), diff --git a/pyopencl/array.py b/pyopencl/array.py index ce7b5f72..d608a38f 100644 --- a/pyopencl/array.py +++ b/pyopencl/array.py @@ -32,7 +32,7 @@ from dataclasses import dataclass from functools import reduce from numbers import Number -from typing import Any, Dict, List, Optional, Tuple, Union +from typing import Any, Dict, Iterable, List, Optional, Tuple, Union from warnings import warn import numpy as np @@ -57,6 +57,8 @@ else: _SVMPointer_or_nothing = () +_MAX_EVENT_CLEAR_COUNT = 4 + # {{{ _get_common_dtype @@ -191,6 +193,38 @@ def _splay(device, n, kernel_specific_max_wg_size=None): return (group_count*work_items_per_group,), (work_items_per_group,) +def get_wait_for_events( + *, + outputs: Iterable[Any], inputs: Iterable[Any]) -> List["cl.Event"]: + # NOTE: + # * outputs need to wait for all reads and writes to finish + # * inputs only need to wait on writes, but not reads + + wait_for = [] + for ary in outputs: + if isinstance(ary, Array): + wait_for.extend(ary.write_events) + wait_for.extend(ary.read_events) + + for ary in inputs: + if isinstance(ary, Array): + wait_for.extend(ary.write_events) + + return wait_for + + +def add_read_write_event( + evt: "cl.Event", *, + outputs: Iterable[Any], inputs: Iterable[Any]) -> None: + for ary in outputs: + if isinstance(ary, Array): + ary.add_write_event(evt) + + for ary in inputs: + if isinstance(ary, Array): + ary.add_read_event(evt) + + # deliberately undocumented for now ARRAY_KERNEL_EXEC_HOOK = None @@ -204,28 +238,35 @@ def elwise_kernel_runner(kernel_getter): from functools import wraps @wraps(kernel_getter) - def kernel_runner(out, *args, **kwargs): - assert isinstance(out, Array) + def kernel_runner(*args, **kwargs): + assert isinstance(args[0], Array) - wait_for = kwargs.pop("wait_for", None) queue = kwargs.pop("queue", None) - if queue is None: - queue = out.queue + wait_for = kwargs.pop("wait_for", None) or [] + noutputs = kwargs.pop("noutputs", 1) + if queue is None: + queue = args[0].queue assert queue is not None - knl = kernel_getter(out, *args, **kwargs) + outputs = args[:noutputs] + inputs = args[noutputs:] + wait_for.extend(get_wait_for_events(outputs=outputs, inputs=inputs)) + + knl = kernel_getter(*args, **kwargs) work_group_info = knl.get_work_group_info( cl.kernel_work_group_info.WORK_GROUP_SIZE, queue.device) - gs, ls = out._get_sizes(queue, work_group_info) + gs, ls = outputs[0]._get_sizes(queue, work_group_info) - args = (out, *args, out.size) if ARRAY_KERNEL_EXEC_HOOK is not None: - return ARRAY_KERNEL_EXEC_HOOK( # pylint: disable=not-callable - knl, queue, gs, ls, *args, wait_for=wait_for) + evt = ARRAY_KERNEL_EXEC_HOOK( # pylint: disable=not-callable + knl, queue, gs, ls, *args, outputs[0].size, wait_for=wait_for) else: - return knl(queue, gs, ls, *args, wait_for=wait_for) + evt = knl(queue, gs, ls, *args, outputs[0].size, wait_for=wait_for) + add_read_write_event(evt, outputs=outputs, inputs=inputs) + + return evt return kernel_runner @@ -443,13 +484,22 @@ class Array: .. versionadded:: 2014.1.1 - .. attribute:: events + .. attribute:: write_events + + A list of :class:`pyopencl.Event` instances that the current array + depends on for writes. User code should not modify this list directly, + but should use :meth:`add_write_event` to append and :meth:`finish` to + wait on the events. - A list of :class:`pyopencl.Event` instances that the current content of - this array depends on. User code may read, but should never modify this - list directly. To update this list, instead use the following methods. + .. attribute:: read_events - .. automethod:: add_event + A list of :class:`pyopencl.Event` instances that the current array + depends on for reads. User code should not modify this list directly, but + should use :meth:`add_read_event` to append and :meth:`finish` to wait + on the events. + + .. automethod:: add_write_event + .. automethod:: add_read_event .. automethod:: finish """ @@ -465,6 +515,10 @@ def __init__( data: Any = None, offset: int = 0, strides: Optional[Tuple[int, ...]] = None, + write_events: Optional[List[cl.Event]] = None, + read_events: Optional[List[cl.Event]] = None, + + # NOTE: deprecated events: Optional[List[cl.Event]] = None, # NOTE: following args are used for the fast constructor @@ -591,11 +645,20 @@ def __init__( if alloc_nbytes < 0: raise ValueError("cannot allocate CL buffer with negative size") + if events is not None: + warn("Passing 'events' is deprecated and will be removed in 2024. " + "Pass either 'write_events' or 'read_events' explicitly.", + DeprecationWarning, stacklevel=2) + + if write_events is None: + write_events = events + self.queue = queue self.shape = shape self.dtype = dtype self.strides = strides - self.events = [] if events is None else events + self.write_events = write_events or [] + self.read_events = read_events or [] self.nbytes = alloc_nbytes self.size = size self.allocator = allocator @@ -630,6 +693,14 @@ def __init__( "than expected, potentially leading to crashes.", InconsistentOpenCLQueueWarning, stacklevel=2) + @property + def events(self): + warn("Using 'events' is deprecated and will be removed in 2024. Prefer " + "either 'write_events' or 'read_events' depending on the situation.", + DeprecationWarning, stacklevel=2) + + return self.write_events + @property def ndim(self): return len(self.shape) @@ -675,15 +746,22 @@ def _new_with_changes(self, data, offset, shape=None, dtype=None, # share the same events list. if data is None: - events = None + write_events = read_events = None else: - events = self.events + write_events = self.write_events + read_events = self.read_events - return self.__class__(None, shape, dtype, allocator=allocator, - strides=strides, data=data, offset=offset, - events=events, + result = self.__class__(None, shape, dtype, allocator=allocator, + strides=strides, data=data, offset=offset, events=None, _fast=fast, _context=self.context, _queue=queue, _size=size) + # NOTE: these are set after the fact for backwards compatibility + # since subclasses may have overwritten __init__ and are missing them + result.write_events = write_events or [] + result.read_events = read_events or [] + + return result + def with_queue(self, queue): """Return a copy of *self* with the default queue set to *queue*. @@ -695,8 +773,7 @@ def with_queue(self, queue): if queue is not None: assert queue.context == self.context - return self._new_with_changes(self.base_data, self.offset, - queue=queue) + return self._new_with_changes(self.base_data, self.offset, queue=queue) def _get_sizes(self, queue, kernel_specific_max_wg_size=None): if not self.flags.forc: @@ -759,11 +836,11 @@ def set(self, ary, queue=None, async_=None, **kwargs): stacklevel=2) if self.size: - event1 = cl.enqueue_copy(queue or self.queue, self.base_data, ary, + evt = cl.enqueue_copy(queue or self.queue, self.base_data, ary, dst_offset=self.offset, is_blocking=not async_) - self.add_event(event1) + self.add_write_event(evt) def _get(self, queue=None, ary=None, async_=None, **kwargs): # {{{ handle 'async' deprecation @@ -796,7 +873,7 @@ def _get(self, queue=None, ary=None, async_=None, **kwargs): if self.shape != ary.shape: warn("get() between arrays of different shape is deprecated " - "and will be removed in PyCUDA 2017.x", + "and will be removed in PyOpenCL 2017.x", DeprecationWarning, stacklevel=2) assert self.flags.forc, "Array in get() must be contiguous" @@ -809,15 +886,15 @@ def _get(self, queue=None, ary=None, async_=None, **kwargs): "to associate one.") if self.size: - event1 = cl.enqueue_copy(queue, ary, self.base_data, + evt = cl.enqueue_copy(queue, ary, self.base_data, src_offset=self.offset, - wait_for=self.events, is_blocking=not async_) + wait_for=self.write_events, is_blocking=not async_) - self.add_event(event1) + self.add_read_event(evt) else: - event1 = None + evt = None - return ary, event1 + return ary, evt def get(self, queue=None, ary=None, async_=None, **kwargs): """Transfer the contents of *self* into *ary* or a newly allocated @@ -827,10 +904,9 @@ def get(self, queue=None, ary=None, async_=None, **kwargs): .. versionchanged:: 2019.1.2 Calling with ``async_=True`` was deprecated and replaced by - :meth:`get_async`. - The event returned by :meth:`pyopencl.enqueue_copy` is now stored into - :attr:`events` to ensure data is not modified before the copy is - complete. + :meth:`get_async`. The event returned by :meth:`pyopencl.enqueue_copy` + is now stored into :attr:`read_events` to ensure data is not modified + before the copy is complete. .. versionchanged:: 2015.2 @@ -857,9 +933,8 @@ def get(self, queue=None, ary=None, async_=None, **kwargs): def get_async(self, queue=None, ary=None, **kwargs): """ Asynchronous version of :meth:`get` which returns a tuple ``(ary, event)`` - containing the host array ``ary`` - and the :class:`pyopencl.NannyEvent` ``event`` returned by - :meth:`pyopencl.enqueue_copy`. + containing the host array ``ary`` and the :class:`pyopencl.NannyEvent` + event returned by :meth:`pyopencl.enqueue_copy`. .. versionadded:: 2019.1.2 """ @@ -891,11 +966,13 @@ def copy(self, queue=_copy_queue): raise RuntimeError("cannot copy non-contiguous array") if self.nbytes: - event1 = cl.enqueue_copy(queue or self.queue, + evt = cl.enqueue_copy(queue or self.queue, result.base_data, self.base_data, src_offset=self.offset, byte_count=self.nbytes, - wait_for=self.events) - result.add_event(event1) + wait_for=self.write_events) + + self.add_read_event(evt) + result.add_write_event(evt) return result @@ -913,7 +990,7 @@ def __repr__(self): result = repr(self.get()) if result[:5] == "array": - result = f"cl.{type(self).__name__}" + result[5:] + result = f"cl.{type(self).__name__}{result[5:]}" else: warn( f"{type(result).__name__}.__repr__ was expected to return a " @@ -1008,7 +1085,7 @@ def _abs(result, arg): elif arg.dtype.kind in ["u", "i"]: fname = "abs" else: - raise TypeError("unsupported dtype in _abs()") + raise TypeError(f"unsupported dtype in 'abs': {arg.dtype!r}") return elementwise.get_unary_func_kernel( arg.context, fname, arg.dtype, out_dtype=result.dtype) @@ -1130,35 +1207,29 @@ def mul_add(self, selffac, other, otherfac, queue=None): """Return ``selffac * self + otherfac * other``. """ queue = queue or self.queue + assert np.isscalar(selffac) and np.isscalar(otherfac) if isinstance(other, Array): result = _get_broadcasted_binary_op_result(self, other, queue) - result.add_event( - self._axpbyz( - result, selffac, self, otherfac, other, - queue=queue)) + self._axpbyz(result, selffac, self, otherfac, other, queue=queue) return result elif np.isscalar(other): common_dtype = _get_common_dtype(self, other, queue) result = self._new_like_me(common_dtype, queue=queue) - result.add_event( - self._axpbz(result, selffac, - self, common_dtype.type(otherfac * other), - queue=queue)) + self._axpbz( + result, selffac, self, common_dtype.type(otherfac * other), + queue=queue) return result else: - raise NotImplementedError + raise NotImplementedError(f"'mul_add' with '{type(other).__name__}'") def __add__(self, other): """Add an array with an array or an array with a scalar.""" if isinstance(other, Array): result = _get_broadcasted_binary_op_result(self, other, self.queue) - result.add_event( - self._axpbyz(result, - self.dtype.type(1), self, - other.dtype.type(1), other)) - + self._axpbyz( + result, self.dtype.type(1), self, other.dtype.type(1), other) return result elif np.isscalar(other): if other == 0: @@ -1166,9 +1237,8 @@ def __add__(self, other): else: common_dtype = _get_common_dtype(self, other, self.queue) result = self._new_like_me(common_dtype) - result.add_event( - self._axpbz(result, self.dtype.type(1), - self, common_dtype.type(other))) + self._axpbz( + result, self.dtype.type(1), self, common_dtype.type(other)) return result else: return NotImplemented @@ -1180,20 +1250,16 @@ def __sub__(self, other): if isinstance(other, Array): result = _get_broadcasted_binary_op_result(self, other, self.queue) - result.add_event( - self._axpbyz(result, - self.dtype.type(1), self, - result.dtype.type(-1), other)) - + self._axpbyz( + result, self.dtype.type(1), self, result.dtype.type(-1), other) return result elif np.isscalar(other): if other == 0: return self.copy() else: result = self._new_like_me( - _get_common_dtype(self, other, self.queue)) - result.add_event( - self._axpbz(result, self.dtype.type(1), self, -other)) + _get_common_dtype(self, other, self.queue)) + self._axpbz(result, self.dtype.type(1), self, -other) return result else: return NotImplemented @@ -1206,10 +1272,8 @@ def __rsub__(self, other): if np.isscalar(other): common_dtype = _get_common_dtype(self, other, self.queue) result = self._new_like_me(common_dtype) - result.add_event( - self._axpbz(result, result.dtype.type(-1), self, - common_dtype.type(other))) - + self._axpbz( + result, result.dtype.type(-1), self, common_dtype.type(other)) return result else: return NotImplemented @@ -1219,15 +1283,11 @@ def __iadd__(self, other): if other.shape != self.shape and other.shape != (): raise NotImplementedError("Broadcasting binary op with shapes:" f" {self.shape}, {other.shape}.") - self.add_event( - self._axpbyz(self, - self.dtype.type(1), self, - other.dtype.type(1), other)) - + self._axpbyz( + self, self.dtype.type(1), self, other.dtype.type(1), other) return self elif np.isscalar(other): - self.add_event( - self._axpbz(self, self.dtype.type(1), self, other)) + self._axpbz(self, self.dtype.type(1), self, other) return self else: return NotImplemented @@ -1237,9 +1297,8 @@ def __isub__(self, other): if other.shape != self.shape and other.shape != (): raise NotImplementedError("Broadcasting binary op with shapes:" f" {self.shape}, {other.shape}.") - self.add_event( - self._axpbyz(self, self.dtype.type(1), self, - other.dtype.type(-1), other)) + self._axpbyz( + self, self.dtype.type(1), self, other.dtype.type(-1), other) return self elif np.isscalar(other): self._axpbz(self, self.dtype.type(1), self, -other) @@ -1252,21 +1311,18 @@ def __pos__(self): def __neg__(self): result = self._new_like_me() - result.add_event(self._axpbz(result, -1, self, 0)) + self._axpbz(result, -1, self, 0) return result def __mul__(self, other): if isinstance(other, Array): result = _get_broadcasted_binary_op_result(self, other, self.queue) - result.add_event( - self._elwise_multiply(result, self, other)) + self._elwise_multiply(result, self, other) return result elif np.isscalar(other): common_dtype = _get_common_dtype(self, other, self.queue) result = self._new_like_me(common_dtype) - result.add_event( - self._axpbz(result, - common_dtype.type(other), self, self.dtype.type(0))) + self._axpbz(result, common_dtype.type(other), self, self.dtype.type(0)) return result else: return NotImplemented @@ -1275,9 +1331,7 @@ def __rmul__(self, other): if np.isscalar(other): common_dtype = _get_common_dtype(self, other, self.queue) result = self._new_like_me(common_dtype) - result.add_event( - self._axpbz(result, - common_dtype.type(other), self, self.dtype.type(0))) + self._axpbz(result, common_dtype.type(other), self, self.dtype.type(0)) return result else: return NotImplemented @@ -1287,25 +1341,20 @@ def __imul__(self, other): if other.shape != self.shape and other.shape != (): raise NotImplementedError("Broadcasting binary op with shapes:" f" {self.shape}, {other.shape}.") - self.add_event( - self._elwise_multiply(self, self, other)) + self._elwise_multiply(self, self, other) return self elif np.isscalar(other): - self.add_event( - self._axpbz(self, other, self, self.dtype.type(0))) + self._axpbz(self, other, self, self.dtype.type(0)) return self else: return NotImplemented def __div__(self, other): - """Divides an array by an array or a scalar, i.e. ``self / other``. - """ + """Divides an array by an array or a scalar, i.e. ``self / other``.""" if isinstance(other, Array): result = _get_broadcasted_binary_op_result( - self, other, self.queue, - dtype_getter=_get_truedivide_dtype) - result.add_event(self._div(result, self, other)) - + self, other, self.queue, dtype_getter=_get_truedivide_dtype) + self._div(result, self, other) return result elif np.isscalar(other): if other == 1: @@ -1313,10 +1362,9 @@ def __div__(self, other): else: common_dtype = _get_truedivide_dtype(self, other, self.queue) result = self._new_like_me(common_dtype) - result.add_event( - self._axpbz(result, - np.true_divide(common_dtype.type(1), other), - self, self.dtype.type(0))) + self._axpbz( + result, np.true_divide(common_dtype.type(1), other), + self, self.dtype.type(0)) return result else: return NotImplemented @@ -1324,18 +1372,16 @@ def __div__(self, other): __truediv__ = __div__ def __rdiv__(self, other): - """Divides an array by a scalar or an array, i.e. ``other / self``. - """ + """Divides an array by a scalar or an array, i.e. ``other / self``.""" common_dtype = _get_truedivide_dtype(self, other, self.queue) if isinstance(other, Array): result = self._new_like_me(common_dtype) - result.add_event(other._div(result, self)) + other._div(result, self) return result elif np.isscalar(other): result = self._new_like_me(common_dtype) - result.add_event( - self._rdiv_scalar(result, self, common_dtype.type(other))) + self._rdiv_scalar(result, self, common_dtype.type(other)) return result else: return NotImplemented @@ -1353,16 +1399,15 @@ def __itruediv__(self, other): if other.shape != self.shape and other.shape != (): raise NotImplementedError("Broadcasting binary op with shapes:" f" {self.shape}, {other.shape}.") - self.add_event( - self._div(self, self, other)) + self._div(self, self, other) return self elif np.isscalar(other): if other == 1: return self else: - self.add_event( - self._axpbz(self, common_dtype.type(np.true_divide(1, other)), - self, self.dtype.type(0))) + self._axpbz( + self, common_dtype.type(np.true_divide(1, other)), + self, self.dtype.type(0)) return self else: return NotImplemented @@ -1375,12 +1420,11 @@ def __and__(self, other): if isinstance(other, Array): result = _get_broadcasted_binary_op_result(self, other, self.queue) - result.add_event(self._array_binop(result, self, other, op="&")) + self._array_binop(result, self, other, op="&") return result elif np.isscalar(other): result = self._new_like_me(common_dtype) - result.add_event( - self._scalar_binop(result, self, other, op="&")) + self._scalar_binop(result, self, other, op="&") return result else: return NotImplemented @@ -1394,14 +1438,12 @@ def __or__(self, other): raise TypeError("Integral types only") if isinstance(other, Array): - result = _get_broadcasted_binary_op_result(self, other, - self.queue) - result.add_event(self._array_binop(result, self, other, op="|")) + result = _get_broadcasted_binary_op_result(self, other, self.queue) + self._array_binop(result, self, other, op="|") return result elif np.isscalar(other): result = self._new_like_me(common_dtype) - result.add_event( - self._scalar_binop(result, self, other, op="|")) + self._scalar_binop(result, self, other, op="|") return result else: return NotImplemented @@ -1416,12 +1458,11 @@ def __xor__(self, other): if isinstance(other, Array): result = _get_broadcasted_binary_op_result(self, other, self.queue) - result.add_event(self._array_binop(result, self, other, op="^")) + self._array_binop(result, self, other, op="^") return result elif np.isscalar(other): result = self._new_like_me(common_dtype) - result.add_event( - self._scalar_binop(result, self, other, op="^")) + self._scalar_binop(result, self, other, op="^") return result else: return NotImplemented @@ -1438,11 +1479,10 @@ def __iand__(self, other): if other.shape != self.shape and other.shape != (): raise NotImplementedError("Broadcasting binary op with shapes:" f" {self.shape}, {other.shape}.") - self.add_event(self._array_binop(self, self, other, op="&")) + self._array_binop(self, self, other, op="&") return self elif np.isscalar(other): - self.add_event( - self._scalar_binop(self, self, other, op="&")) + self._scalar_binop(self, self, other, op="&") return self else: return NotImplemented @@ -1457,11 +1497,10 @@ def __ior__(self, other): if other.shape != self.shape and other.shape != (): raise NotImplementedError("Broadcasting binary op with shapes:" f" {self.shape}, {other.shape}.") - self.add_event(self._array_binop(self, self, other, op="|")) + self._array_binop(self, self, other, op="|") return self elif np.isscalar(other): - self.add_event( - self._scalar_binop(self, self, other, op="|")) + self._scalar_binop(self, self, other, op="|") return self else: return NotImplemented @@ -1476,17 +1515,17 @@ def __ixor__(self, other): if other.shape != self.shape and other.shape != (): raise NotImplementedError("Broadcasting binary op with shapes:" f" {self.shape}, {other.shape}.") - self.add_event(self._array_binop(self, self, other, op="^")) + self._array_binop(self, self, other, op="^") return self elif np.isscalar(other): - self.add_event( - self._scalar_binop(self, self, other, op="^")) + self._scalar_binop(self, self, other, op="^") return self else: return NotImplemented def _zero_fill(self, queue=None, wait_for=None): queue = queue or self.queue + wait_for = wait_for or [] if not self.size: return @@ -1500,9 +1539,11 @@ def _zero_fill(self, queue=None, wait_for=None): # circumvent bug with large buffers on NVIDIA # https://github.com/inducer/pyopencl/issues/395 if cl_version_gtr_1_2 and not (on_nvidia and self.nbytes >= 2**31): - self.add_event( - cl.enqueue_fill(queue, self.base_data, np.int8(0), - self.nbytes, offset=self.offset, wait_for=wait_for)) + evt = cl.enqueue_fill( + queue, self.base_data, np.int8(0), + self.nbytes, offset=self.offset, + wait_for=wait_for + self.write_events + self.read_events) + self.add_write_event(evt) else: zero = np.zeros((), self.dtype) self.fill(zero, queue=queue) @@ -1513,9 +1554,7 @@ def fill(self, value, queue=None, wait_for=None): :returns: *self*. """ - self.add_event( - self._fill(self, value, queue=queue, wait_for=wait_for)) - + self._fill(self, value, queue=queue, wait_for=wait_for) return self def __len__(self): @@ -1531,7 +1570,7 @@ def __abs__(self): """ result = self._new_like_me(self.dtype.type(0).real.dtype) - result.add_event(self._abs(result, self)) + self._abs(result, self) return result def __pow__(self, other): @@ -1542,15 +1581,12 @@ def __pow__(self, other): if isinstance(other, Array): assert self.shape == other.shape - result = self._new_like_me( - _get_common_dtype(self, other, self.queue)) - result.add_event( - self._pow_array(result, self, other)) + result = self._new_like_me(_get_common_dtype(self, other, self.queue)) + self._pow_array(result, self, other) return result elif np.isscalar(other): - result = self._new_like_me( - _get_common_dtype(self, other, self.queue)) - result.add_event(self._pow_scalar(result, self, other)) + result = self._new_like_me(_get_common_dtype(self, other, self.queue)) + self._pow_scalar(result, self, other) return result else: return NotImplemented @@ -1559,8 +1595,7 @@ def __rpow__(self, other): if np.isscalar(other): common_dtype = _get_common_dtype(self, other, self.queue) result = self._new_like_me(common_dtype) - result.add_event( - self._rpow_scalar(result, common_dtype.type(other), self)) + self._rpow_scalar(result, common_dtype.type(other), self) return result else: return NotImplemented @@ -1570,8 +1605,7 @@ def __invert__(self): raise TypeError(f"Integral types only: {self.dtype}") result = self._new_like_me() - result.add_event(self._unop(result, self, op="~")) - + self._unop(result, self, op="~") return result # }}} @@ -1582,7 +1616,7 @@ def reverse(self, queue=None): """ result = self._new_like_me() - result.add_event(self._reverse(result, self)) + self._reverse(result, self) return result def astype(self, dtype, queue=None): @@ -1591,7 +1625,7 @@ def astype(self, dtype, queue=None): return self.copy() result = self._new_like_me(dtype=dtype) - result.add_event(self._copy(result, self, queue=queue)) + self._copy(result, self, queue=queue) return result # {{{ rich comparisons, any, all @@ -1606,21 +1640,27 @@ def __bool__(self): def any(self, queue=None, wait_for=None): from pyopencl.reduction import get_any_kernel krnl = get_any_kernel(self.context, self.dtype) - if wait_for is None: - wait_for = [] - result, event1 = krnl(self, queue=queue, - wait_for=wait_for + self.events, return_event=True) - result.add_event(event1) + + queue = queue or self.queue + wait_for = wait_for or [] + result, evt = krnl( + self, queue=queue, wait_for=wait_for + self.write_events, + return_event=True) + self.add_read_event(evt) + return result def all(self, queue=None, wait_for=None): from pyopencl.reduction import get_all_kernel krnl = get_all_kernel(self.context, self.dtype) - if wait_for is None: - wait_for = [] - result, event1 = krnl(self, queue=queue, - wait_for=wait_for + self.events, return_event=True) - result.add_event(event1) + + queue = queue or self.queue + wait_for = wait_for or [] + result, evt = krnl( + self, queue=queue, wait_for=wait_for + self.write_events, + return_event=True) + self.add_read_event(evt) + return result @staticmethod @@ -1640,13 +1680,11 @@ def _array_comparison(out, a, b, queue=None, op=None): def __eq__(self, other): if isinstance(other, Array): result = self._new_like_me(_BOOL_DTYPE) - result.add_event( - self._array_comparison(result, self, other, op="==")) + self._array_comparison(result, self, other, op="==") return result elif np.isscalar(other): result = self._new_like_me(_BOOL_DTYPE) - result.add_event( - self._scalar_comparison(result, self, other, op="==")) + self._scalar_comparison(result, self, other, op="==") return result else: return NotImplemented @@ -1654,13 +1692,11 @@ def __eq__(self, other): def __ne__(self, other): if isinstance(other, Array): result = self._new_like_me(_BOOL_DTYPE) - result.add_event( - self._array_comparison(result, self, other, op="!=")) + self._array_comparison(result, self, other, op="!=") return result elif np.isscalar(other): result = self._new_like_me(_BOOL_DTYPE) - result.add_event( - self._scalar_comparison(result, self, other, op="!=")) + self._scalar_comparison(result, self, other, op="!=") return result else: return NotImplemented @@ -1668,8 +1704,7 @@ def __ne__(self, other): def __le__(self, other): if isinstance(other, Array): result = self._new_like_me(_BOOL_DTYPE) - result.add_event( - self._array_comparison(result, self, other, op="<=")) + self._array_comparison(result, self, other, op="<=") return result elif np.isscalar(other): result = self._new_like_me(_BOOL_DTYPE) @@ -1681,13 +1716,11 @@ def __le__(self, other): def __ge__(self, other): if isinstance(other, Array): result = self._new_like_me(_BOOL_DTYPE) - result.add_event( - self._array_comparison(result, self, other, op=">=")) + self._array_comparison(result, self, other, op=">=") return result elif np.isscalar(other): result = self._new_like_me(_BOOL_DTYPE) - result.add_event( - self._scalar_comparison(result, self, other, op=">=")) + self._scalar_comparison(result, self, other, op=">=") return result else: return NotImplemented @@ -1695,13 +1728,11 @@ def __ge__(self, other): def __lt__(self, other): if isinstance(other, Array): result = self._new_like_me(_BOOL_DTYPE) - result.add_event( - self._array_comparison(result, self, other, op="<")) + self._array_comparison(result, self, other, op="<") return result elif np.isscalar(other): result = self._new_like_me(_BOOL_DTYPE) - result.add_event( - self._scalar_comparison(result, self, other, op="<")) + self._scalar_comparison(result, self, other, op="<") return result else: return NotImplemented @@ -1709,13 +1740,11 @@ def __lt__(self, other): def __gt__(self, other): if isinstance(other, Array): result = self._new_like_me(_BOOL_DTYPE) - result.add_event( - self._array_comparison(result, self, other, op=">")) + self._array_comparison(result, self, other, op=">") return result elif np.isscalar(other): result = self._new_like_me(_BOOL_DTYPE) - result.add_event( - self._scalar_comparison(result, self, other, op=">")) + self._scalar_comparison(result, self, other, op=">") return result else: return NotImplemented @@ -1731,8 +1760,7 @@ def real(self): """ if self.dtype.kind == "c": result = self._new_like_me(self.dtype.type(0).real.dtype) - result.add_event( - self._real(result, self)) + self._real(result, self) return result else: return self @@ -1744,8 +1772,7 @@ def imag(self): """ if self.dtype.kind == "c": result = self._new_like_me(self.dtype.type(0).real.dtype) - result.add_event( - self._imag(result, self)) + self._imag(result, self) return result else: return zeros_like(self) @@ -1756,7 +1783,7 @@ def conj(self): """ if self.dtype.kind == "c": result = self._new_like_me() - result.add_event(self._conj(result, self)) + self._conj(result, self) return result else: return self @@ -1768,25 +1795,46 @@ def conj(self): # {{{ event management def add_event(self, evt): - """Add *evt* to :attr:`events`. If :attr:`events` is too long, this method - may implicitly wait for a subset of :attr:`events` and clear them from the - list. + return self.add_write_event(evt) + + def add_write_event(self, evt: "cl.Event") -> None: + """Add *evt* to :attr:`write_events`. If :attr:`write_events` is too + long, this method may implicitly wait for a subset of :attr:`write_events` + and clear them from the list. """ - n_wait = 4 + n = _MAX_EVENT_CLEAR_COUNT - self.events.append(evt) + self.write_events.append(evt) + if len(self.write_events) > 3*n: + wait_events = self.write_events[:n] + cl.wait_for_events(wait_events) + del self.write_events[:n] + + def add_read_event(self, evt: "cl.Event") -> None: + """Add *evt* to :attr:`read_events`. If :attr:`read_events` is too + long, this method may implicitly wait for a subset of :attr:`read_events` + and clear them from the list. + """ + n = _MAX_EVENT_CLEAR_COUNT - if len(self.events) > 3*n_wait: - wait_events = self.events[:n_wait] + self.read_events.append(evt) + if len(self.read_events) > 3*n: + wait_events = self.read_events[:n] cl.wait_for_events(wait_events) - del self.events[:n_wait] + del self.read_events[:n] def finish(self): - """Wait for the entire contents of :attr:`events`, clear it.""" + """Wait for the entire contents of :attr:`write_events` and + :attr:`read_events` and clear the lists. + """ + + if self.write_events: + cl.wait_for_events(self.write_events) + del self.write_events[:] - if self.events: - cl.wait_for_events(self.events) - del self.events[:] + if self.read_events: + cl.wait_for_events(self.read_events) + del self.read_events[:] # }}} @@ -2043,7 +2091,8 @@ def map_to_host(self, queue=None, flags=None, is_blocking=True, wait_for=None): ary, evt = cl.enqueue_map_buffer( queue or self.queue, self.base_data, flags, self.offset, self.shape, self.dtype, strides=self.strides, - wait_for=wait_for + self.events, is_blocking=is_blocking) + wait_for=wait_for + self.write_events, is_blocking=is_blocking) + self.add_read_event(evt) if is_blocking: return ary @@ -2161,10 +2210,8 @@ def setitem(self, subscript, value, queue=None, wait_for=None): Added *wait_for*. """ - queue = queue or self.queue or value.queue - if wait_for is None: - wait_for = [] - wait_for = wait_for + self.events + queue = queue or self.queue + wait_for = wait_for or [] if isinstance(subscript, Array): if subscript.dtype.kind not in ("i", "u"): @@ -2181,6 +2228,7 @@ def setitem(self, subscript, value, queue=None, wait_for=None): wait_for=wait_for) return + # NOTE: subarray shares the event lists with self subarray = self[subscript] if not subarray.size: @@ -2190,9 +2238,10 @@ def setitem(self, subscript, value, queue=None, wait_for=None): if isinstance(value, np.ndarray): if subarray.shape == value.shape and subarray.strides == value.strides: - self.add_event( - cl.enqueue_copy(queue, subarray.base_data, - value, dst_offset=subarray.offset, wait_for=wait_for)) + evt = cl.enqueue_copy( + queue, subarray.base_data, value, dst_offset=subarray.offset, + wait_for=wait_for + subarray.write_events + subarray.read_events) + subarray.add_write_event(evt) return else: value = to_device(queue, value, self.allocator) @@ -2208,11 +2257,9 @@ def setitem(self, subscript, value, queue=None, wait_for=None): raise NotImplementedError("cannot assign between arrays of " "differing strides") - self.add_event( - self._copy(subarray, value, queue=queue, wait_for=wait_for)) - + self._copy(subarray, value, queue=queue, wait_for=wait_for) else: - # Let's assume it's a scalar + assert np.isscalar(value) subarray.fill(value, queue=queue, wait_for=wait_for) def __setitem__(self, subscript, value): @@ -2308,8 +2355,9 @@ def to_device(queue, ary, allocator=None, async_=None, first_arg = queue.context result = Array(first_arg, ary.shape, ary.dtype, - allocator=allocator, strides=ary.strides) + allocator=allocator, strides=ary.strides) result.set(ary, async_=async_, queue=queue) + return result @@ -2328,6 +2376,7 @@ def zeros(queue, shape, dtype, order="C", allocator=None): order=order, allocator=allocator, _context=queue.context, _queue=queue) result._zero_fill() + return result @@ -2336,8 +2385,8 @@ def empty_like(ary, queue=_copy_queue, allocator=None): as *other_ary*. """ - return ary._new_with_changes(data=None, offset=0, queue=queue, - allocator=allocator) + return ary._new_with_changes( + data=None, offset=0, queue=queue, allocator=allocator) def zeros_like(ary): @@ -2361,8 +2410,7 @@ class _ArangeInfo: @elwise_kernel_runner def _arange_knl(result, start, step): - return elementwise.get_arange_kernel( - result.context, result.dtype) + return elementwise.get_arange_kernel(result.context, result.dtype) def arange(queue, *args, **kwargs): @@ -2446,7 +2494,7 @@ def arange(queue, *args, **kwargs): size = ceil((stop-start)/step) result = Array(queue, (size,), dtype, allocator=inf.allocator) - result.add_event(_arange_knl(result, start, step, queue=queue)) + _arange_knl(result, start, step, queue=queue) # }}} @@ -2459,8 +2507,7 @@ def arange(queue, *args, **kwargs): @elwise_kernel_runner def _take(result, ary, indices): - return elementwise.get_take_kernel( - result.context, result.dtype, indices.dtype) + return elementwise.get_take_kernel(result.context, result.dtype, indices.dtype) def take(a, indices, out=None, queue=None, wait_for=None): @@ -2473,8 +2520,7 @@ def take(a, indices, out=None, queue=None, wait_for=None): out = type(a)(queue, indices.shape, a.dtype, allocator=a.allocator) assert len(indices.shape) == 1 - out.add_event( - _take(out, a, indices, queue=queue, wait_for=wait_for)) + _take(out, a, indices, queue=queue, wait_for=wait_for) return out @@ -2525,17 +2571,20 @@ def make_func_for_chunk_size(chunk_size): queue.device)) wait_for_this = ( - *indices.events, - *[evt for i in arrays[chunk_slice] for evt in i.events], - *[evt for o in out[chunk_slice] for evt in o.events]) + *indices.write_events, + *[evt for i in arrays[chunk_slice] for evt in i.write_events], + *[evt for o in out[chunk_slice] for evt in o.write_events]) evt = knl(queue, gs, ls, indices.data, *[o.data for o in out[chunk_slice]], *[i.data for i in arrays[chunk_slice]], *[indices.size], wait_for=wait_for_this) - for o in out[chunk_slice]: - o.add_event(evt) + + add_read_write_event( + evt, + outputs=out[chunk_slice], + inputs=arrays[chunk_size] + [indices]) return out @@ -2605,10 +2654,10 @@ def make_func_for_chunk_size(chunk_size): queue.device)) wait_for_this = ( - *dest_indices.events, - *src_indices.events, - *[evt for i in arrays[chunk_slice] for evt in i.events], - *[evt for o in out[chunk_slice] for evt in o.events]) + *dest_indices.write_events, + *src_indices.write_events, + *[evt for i in arrays[chunk_slice] for evt in i.write_events], + *[evt for o in out[chunk_slice] for evt in o.write_events]) evt = knl(queue, gs, ls, *out[chunk_slice], dest_indices, @@ -2617,8 +2666,11 @@ def make_func_for_chunk_size(chunk_size): *src_offsets_list[chunk_slice], src_indices.size, wait_for=wait_for_this) - for o in out[chunk_slice]: - o.add_event(evt) + + add_read_write_event( + evt, + outputs=out[chunk_slice], + inputs=arrays[chunk_slice] + [src_indices, dest_indices]) return out @@ -2633,9 +2685,7 @@ def multi_put(arrays, dest_indices, dest_shape=None, out=None, queue=None, a_allocator = arrays[0].allocator context = dest_indices.context queue = queue or dest_indices.queue - if wait_for is None: - wait_for = [] - wait_for = wait_for + dest_indices.events + wait_for = wait_for or [] vec_count = len(arrays) @@ -2687,8 +2737,8 @@ def make_func_for_chunk_size(chunk_size): wait_for_this = ( *wait_for, - *[evt for i in arrays[chunk_slice] for evt in i.events], - *[evt for o in out[chunk_slice] for evt in o.events]) + *[evt for i in arrays[chunk_slice] for evt in i.write_events], + *[evt for o in out[chunk_slice] for evt in o.write_events]) evt = knl(queue, gs, ls, *out[chunk_slice], dest_indices, @@ -2696,8 +2746,10 @@ def make_func_for_chunk_size(chunk_size): use_fill_cla, array_lengths_cla, dest_indices.size, wait_for=wait_for_this) - for o in out[chunk_slice]: - o.add_event(evt) + add_read_write_event( + evt, + outputs=out[chunk_slice], + inputs=arrays[chunk_slice] + [dest_indices]) return out @@ -2771,7 +2823,7 @@ def concatenate(arrays, axis=0, queue=None, allocator=None): @elwise_kernel_runner def _diff(result, array): - return elementwise.get_diff_kernel(array.context, array.dtype) + return elementwise.get_diff_kernel(result.context, result.dtype) def diff(array, queue=None, allocator=None): @@ -2788,8 +2840,7 @@ def diff(array, queue=None, allocator=None): allocator = allocator or array.allocator result = array.__class__(queue, (n-1,), array.dtype, allocator=allocator) - event1 = _diff(result, array, queue=queue) - result.add_event(event1) + _diff(result, array, queue=queue) return result @@ -2994,9 +3045,7 @@ def if_positive(criterion, then_, else_, out=None, queue=None): allocator=criterion.allocator, strides=out_strides) - event1 = _if_positive(out, criterion, then_, else_, queue=queue) - out.add_event(event1) - + _if_positive(out, criterion, then_, else_, queue=queue) return out # }}} @@ -3037,8 +3086,7 @@ def maximum(a, b, out=None, queue=None): elif not b_is_scalar: out = b._new_like_me(out_dtype, queue) - out.add_event(_minimum_maximum_backend(out, a, b, queue=queue, minmax="max")) - + _minimum_maximum_backend(out, a, b, queue=queue, minmax="max") return out @@ -3063,8 +3111,7 @@ def minimum(a, b, out=None, queue=None): elif not b_is_scalar: out = b._new_like_me(out_dtype, queue) - out.add_event(_minimum_maximum_backend(out, a, b, queue=queue, minmax="min")) - + _minimum_maximum_backend(out, a, b, queue=queue, minmax="min") return out # }}} @@ -3180,9 +3227,10 @@ def sum(a, dtype=None, queue=None, slice=None, initial=np._NoValue): from pyopencl.reduction import get_sum_kernel krnl = get_sum_kernel(a.context, dtype, a.dtype) - result, event1 = krnl(a, queue=queue, slice=slice, wait_for=a.events, - return_event=True) - result.add_event(event1) + result, evt = krnl( + a, queue=queue, slice=slice, wait_for=a.write_events, + return_event=True) + a.add_read_event(evt) # NOTE: neutral element in `get_sum_kernel` is 0 by default if initial is not np._NoValue: @@ -3215,9 +3263,13 @@ def dot(a, b, dtype=None, queue=None, slice=None): from pyopencl.reduction import get_dot_kernel krnl = get_dot_kernel(a.context, dtype, a.dtype, b.dtype) - result, event1 = krnl(a, b, queue=queue, slice=slice, - wait_for=a.events + b.events, return_event=True) - result.add_event(event1) + result, evt = krnl( + a, b, queue=queue, slice=slice, + wait_for=a.write_events + b.write_events, return_event=True) + + a.add_read_event(evt) + if b is not a: + b.add_read_event(evt) return result @@ -3234,9 +3286,13 @@ def vdot(a, b, dtype=None, queue=None, slice=None): krnl = get_dot_kernel(a.context, dtype, a.dtype, b.dtype, conjugate_first=True) - result, event1 = krnl(a, b, queue=queue, slice=slice, - wait_for=a.events + b.events, return_event=True) - result.add_event(event1) + result, evt = krnl( + a, b, queue=queue, slice=slice, + wait_for=a.write_events + b.write_events, return_event=True) + + a.add_read_event(evt) + if b is not a: + b.add_read_event(evt) return result @@ -3252,9 +3308,15 @@ def subset_dot(subset, a, b, dtype=None, queue=None, slice=None): krnl = get_subset_dot_kernel( a.context, dtype, subset.dtype, a.dtype, b.dtype) - result, event1 = krnl(subset, a, b, queue=queue, slice=slice, - wait_for=subset.events + a.events + b.events, return_event=True) - result.add_event(event1) + result, evt = krnl( + subset, a, b, queue=queue, slice=slice, + wait_for=subset.write_events + a.write_events + b.write_events, + return_event=True) + + subset.add_read_event(evt) + a.add_read_event(evt) + if b is not a: + b.add_read_event(evt) return result @@ -3277,9 +3339,10 @@ def f(a, queue=None, initial=np._NoValue): from pyopencl.reduction import get_minmax_kernel krnl = get_minmax_kernel(a.context, what, a.dtype) - result, event1 = krnl(a, queue=queue, wait_for=a.events, - return_event=True) - result.add_event(event1) + result, evt = krnl( + a, queue=queue, wait_for=a.write_events, + return_event=True) + a.add_read_event(evt) if initial is not np._NoValue: initial = a.dtype.type(initial) @@ -3312,10 +3375,16 @@ def _make_subset_minmax_kernel(what): def f(subset, a, queue=None, slice=None): from pyopencl.reduction import get_subset_minmax_kernel krnl = get_subset_minmax_kernel(a.context, what, a.dtype, subset.dtype) - result, event1 = krnl(subset, a, queue=queue, slice=slice, - wait_for=a.events + subset.events, return_event=True) - result.add_event(event1) + result, evt = krnl( + subset, a, queue=queue, slice=slice, + wait_for=a.write_events + subset.write_events, + return_event=True) + + a.add_read_event(evt) + subset.add_read_event(evt) + return result + return f @@ -3349,8 +3418,10 @@ def cumsum(a, output_dtype=None, queue=None, from pyopencl.scan import get_cumsum_kernel krnl = get_cumsum_kernel(a.context, a.dtype, output_dtype) - evt = krnl(a, result, queue=queue, wait_for=wait_for + a.events) - result.add_event(evt) + evt = krnl(a, result, queue=queue, wait_for=wait_for + a.write_events) + + a.add_read_event(evt) + result.add_write_event(evt) if return_event: return evt, result diff --git a/pyopencl/bitonic_sort.py b/pyopencl/bitonic_sort.py index f322e9f7..4bb5465e 100644 --- a/pyopencl/bitonic_sort.py +++ b/pyopencl/bitonic_sort.py @@ -95,7 +95,7 @@ def __call__(self, arr, idx=None, queue=None, wait_for=None, axis=0): if wait_for is None: wait_for = [] - wait_for = wait_for + arr.events + wait_for = wait_for + arr.write_events last_evt = cl.enqueue_marker(queue, wait_for=wait_for) diff --git a/pyopencl/clmath.py b/pyopencl/clmath.py index 445693b7..89ea9ed4 100644 --- a/pyopencl/clmath.py +++ b/pyopencl/clmath.py @@ -24,13 +24,12 @@ import numpy as np -import pyopencl.array as cl_array import pyopencl.elementwise as elementwise -from pyopencl.array import _get_common_dtype +from pyopencl.array import _get_common_dtype, elwise_kernel_runner def _make_unary_array_func(name): - @cl_array.elwise_kernel_runner + @elwise_kernel_runner def knl_runner(result, arg): if arg.dtype.kind == "c": from pyopencl.elementwise import complex_dtype_to_name @@ -43,8 +42,7 @@ def knl_runner(result, arg): def f(array, queue=None): result = array._new_like_me(queue=queue) - event1 = knl_runner(result, array, queue=queue) - result.add_event(event1) + knl_runner(result, array, queue=queue) return result return f @@ -60,13 +58,13 @@ def f(array, queue=None): asinpi = _make_unary_array_func("asinpi") -@cl_array.elwise_kernel_runner +@elwise_kernel_runner def _atan2(result, arg1, arg2): return elementwise.get_float_binary_func_kernel( result.context, "atan2", arg1.dtype, arg2.dtype, result.dtype) -@cl_array.elwise_kernel_runner +@elwise_kernel_runner def _atan2pi(result, arg1, arg2): return elementwise.get_float_binary_func_kernel( result.context, "atan2pi", arg1.dtype, arg2.dtype, result.dtype) @@ -81,7 +79,7 @@ def atan2(y, x, queue=None): """ queue = queue or y.queue result = y._new_like_me(_get_common_dtype(y, x, queue)) - result.add_event(_atan2(result, y, x, queue=queue)) + _atan2(result, y, x, queue=queue) return result @@ -95,7 +93,7 @@ def atan2pi(y, x, queue=None): """ queue = queue or y.queue result = y._new_like_me(_get_common_dtype(y, x, queue)) - result.add_event(_atan2pi(result, y, x, queue=queue)) + _atan2pi(result, y, x, queue=queue) return result @@ -122,7 +120,7 @@ def atan2pi(y, x, queue=None): # TODO: fmin -@cl_array.elwise_kernel_runner +@elwise_kernel_runner def _fmod(result, arg, mod): return elementwise.get_fmod_kernel(result.context, result.dtype, arg.dtype, mod.dtype) @@ -133,13 +131,13 @@ def fmod(arg, mod, queue=None): for each element in ``arg`` and ``mod``.""" queue = (queue or arg.queue) or mod.queue result = arg._new_like_me(_get_common_dtype(arg, mod, queue)) - result.add_event(_fmod(result, arg, mod, queue=queue)) + _fmod(result, arg, mod, queue=queue) return result # TODO: fract -@cl_array.elwise_kernel_runner +@elwise_kernel_runner def _frexp(sig, expt, arg): return elementwise.get_frexp_kernel(sig.context, sig.dtype, expt.dtype, arg.dtype) @@ -151,9 +149,7 @@ def frexp(arg, queue=None): """ sig = arg._new_like_me(queue=queue) expt = arg._new_like_me(queue=queue, dtype=np.int32) - event1 = _frexp(sig, expt, arg, queue=queue) - sig.add_event(event1) - expt.add_event(event1) + _frexp(sig, expt, arg, queue=queue, noutputs=2) return sig, expt # TODO: hypot @@ -162,7 +158,7 @@ def frexp(arg, queue=None): ilogb = _make_unary_array_func("ilogb") -@cl_array.elwise_kernel_runner +@elwise_kernel_runner def _ldexp(result, sig, exp): return elementwise.get_ldexp_kernel(result.context, result.dtype, sig.dtype, exp.dtype) @@ -174,7 +170,7 @@ def ldexp(significand, exponent, queue=None): ``result = significand * 2**exponent``. """ result = significand._new_like_me(queue=queue) - result.add_event(_ldexp(result, significand, exponent)) + _ldexp(result, significand, exponent) return result @@ -192,7 +188,7 @@ def ldexp(significand, exponent, queue=None): # TODO: minmag -@cl_array.elwise_kernel_runner +@elwise_kernel_runner def _modf(intpart, fracpart, arg): return elementwise.get_modf_kernel(intpart.context, intpart.dtype, fracpart.dtype, arg.dtype) @@ -204,9 +200,7 @@ def modf(arg, queue=None): """ intpart = arg._new_like_me(queue=queue) fracpart = arg._new_like_me(queue=queue) - event1 = _modf(intpart, fracpart, arg, queue=queue) - fracpart.add_event(event1) - intpart.add_event(event1) + _modf(intpart, fracpart, arg, queue=queue, noutputs=2) return fracpart, intpart @@ -239,19 +233,19 @@ def modf(arg, queue=None): # TODO: table 6.10, integer functions # TODO: table 6.12, clamp et al -@cl_array.elwise_kernel_runner +@elwise_kernel_runner def _bessel_jn(result, n, x): return elementwise.get_bessel_kernel(result.context, "j", result.dtype, np.dtype(type(n)), x.dtype) -@cl_array.elwise_kernel_runner +@elwise_kernel_runner def _bessel_yn(result, n, x): return elementwise.get_bessel_kernel(result.context, "y", result.dtype, np.dtype(type(n)), x.dtype) -@cl_array.elwise_kernel_runner +@elwise_kernel_runner def _hankel_01(h0, h1, x): if h0.dtype != h1.dtype: raise TypeError("types of h0 and h1 must match") @@ -261,20 +255,18 @@ def _hankel_01(h0, h1, x): def bessel_jn(n, x, queue=None): result = x._new_like_me(queue=queue) - result.add_event(_bessel_jn(result, n, x, queue=queue)) + _bessel_jn(result, n, x, queue=queue) return result def bessel_yn(n, x, queue=None): result = x._new_like_me(queue=queue) - result.add_event(_bessel_yn(result, n, x, queue=queue)) + _bessel_yn(result, n, x, queue=queue) return result def hankel_01(x, queue=None): h0 = x._new_like_me(queue=queue) h1 = x._new_like_me(queue=queue) - event1 = _hankel_01(h0, h1, x, queue=queue) - h0.add_event(event1) - h1.add_event(event1) + _hankel_01(h0, h1, x, queue=queue, noutputs=2) return h0, h1 diff --git a/pyopencl/clrandom.py b/pyopencl/clrandom.py index 297717a3..81cb8182 100644 --- a/pyopencl/clrandom.py +++ b/pyopencl/clrandom.py @@ -309,7 +309,7 @@ def _fill(self, distribution, ary, scale, shift, queue=None): gsize, lsize = _splay(queue.device, ary.size) evt = knl(queue, gsize, lsize, *args) - ary.add_event(evt) + ary.add_write_event(evt) self.counter[0] += n * counter_multiplier c1_incr, self.counter[0] = divmod(self.counter[0], self.counter_max) diff --git a/pyopencl/invoker.py b/pyopencl/invoker.py index a90a121e..f079cb77 100644 --- a/pyopencl/invoker.py +++ b/pyopencl/invoker.py @@ -141,7 +141,7 @@ def add_buf_arg(arg_idx, typechar, expr_str): cl_arg_idx += 1 if in_enqueue: - wait_for_parts .append(f"{arg_var}.events") + wait_for_parts.append(f"{arg_var}.write_events") continue @@ -381,7 +381,7 @@ def _check_arg_size(function_name, num_cl_args, arg_types, devs): from pytools.py_codegen import PicklableModule invoker_cache: WriteOncePersistentDict[Any, Tuple[PicklableModule, str]] \ = WriteOncePersistentDict( - "pyopencl-invoker-cache-v42-nano", + "pyopencl-invoker-cache-v43-nano", key_builder=_NumpyTypesKeyBuilder(), in_mem_cache_size=0, safe_sync=False) diff --git a/pyopencl/reduction.py b/pyopencl/reduction.py index 683d7cf2..a6998da2 100644 --- a/pyopencl/reduction.py +++ b/pyopencl/reduction.py @@ -430,7 +430,7 @@ def __call__(self, *args: Any, **kwargs: Any) -> cl.Event: invocation_args.append(arg.base_data) if arg_tp.with_offset: invocation_args.append(arg.offset) - wait_for.extend(arg.events) + wait_for.extend(arg.write_events) else: invocation_args.append(arg) @@ -523,7 +523,7 @@ def __call__(self, *args: Any, **kwargs: Any) -> cl.Event: wait_for=wait_for) wait_for = [last_evt] - result.add_event(last_evt) + result.add_write_event(last_evt) if group_count == 1: if return_event: diff --git a/pyopencl/scan.py b/pyopencl/scan.py index 850c4e70..d38e4ae0 100644 --- a/pyopencl/scan.py +++ b/pyopencl/scan.py @@ -1533,7 +1533,7 @@ def __call__(self, *args: Any, **kwargs: Any) -> cl.Event: data_args.append(arg_val.base_data) if arg_descr.with_offset: data_args.append(arg_val.offset) - wait_for.extend(arg_val.events) + wait_for.extend(arg_val.write_events) else: data_args.append(arg_val) @@ -1766,7 +1766,7 @@ def __call__(self, *args: Any, **kwargs: Any) -> cl.Event: data_args.append(arg_val.base_data) if arg_descr.with_offset: data_args.append(arg_val.offset) - wait_for.extend(arg_val.events) + wait_for.extend(arg_val.write_events) else: data_args.append(arg_val) diff --git a/test/test_array.py b/test/test_array.py index 3dd68e32..0d35794e 100644 --- a/test/test_array.py +++ b/test/test_array.py @@ -1371,36 +1371,55 @@ def test_event_management(ctx_factory): from pyopencl.clrandom import rand as clrand x = clrand(queue, (5, 10), dtype=np.float32) - assert len(x.events) == 1, len(x.events) + assert len(x.write_events) == 1, x.write_events + assert len(x.read_events) == 0, x.read_events x.finish() - assert len(x.events) == 0 - - y = x+x - assert len(y.events) == 1 - y = x*x - assert len(y.events) == 1 - y = 2*x - assert len(y.events) == 1 - y = 2/x - assert len(y.events) == 1 - y = x/2 - assert len(y.events) == 1 - y = x**2 - assert len(y.events) == 1 - y = 2**x - assert len(y.events) == 1 + assert len(x.write_events) == 0 + assert len(x.read_events) == 0 + + y = x + x + assert len(y.write_events) == 1 and len(y.read_events) == 0 + assert len(x.write_events) == 0 and len(x.read_events) == 2 + + y = x * x + assert len(y.write_events) == 1 and len(y.read_events) == 0 + assert len(x.write_events) == 0 and len(x.read_events) == 4 + + y = 2 * x + assert len(y.write_events) == 1 and len(y.read_events) == 0 + assert len(x.write_events) == 0 and len(x.read_events) == 5 + + y = 2 / x + assert len(y.write_events) == 1 and len(y.read_events) == 0 + assert len(x.write_events) == 0 and len(x.read_events) == 6 + + y = x / 2 + assert len(y.write_events) == 1 and len(y.read_events) == 0 + assert len(x.write_events) == 0 and len(x.read_events) == 7 + + y = x ** 2 + assert len(y.write_events) == 1 and len(y.read_events) == 0 + assert len(x.write_events) == 0 and len(x.read_events) == 8 + + y = 2 ** x + assert len(y.write_events) == 1 and len(y.read_events) == 0 + assert len(x.write_events) == 0 and len(x.read_events) == 9 + + x.finish() for _i in range(10): x.fill(0) - assert len(x.events) == 10 + assert len(x.write_events) == 10 + assert len(x.read_events) == 0 for _i in range(1000): x.fill(0) - assert len(x.events) < 100 + assert len(x.write_events) < 100 + assert len(x.read_events) == 0 # }}} @@ -1646,7 +1665,7 @@ def test_get_async(ctx_factory): assert np.abs(b1 - b).mean() < 1e-5 wait_event = cl.UserEvent(context) - b_gpu.add_event(wait_event) + b_gpu.add_write_event(wait_event) b, evt = b_gpu.get_async() # testing that this doesn't hang wait_event.set_status(cl.command_execution_status.COMPLETE) evt.wait() @@ -2284,6 +2303,8 @@ def alloc2(size): # }}} +# {{{ test_logical_and_or + def test_logical_and_or(ctx_factory): # NOTE: Copied over from pycuda/test/test_gpuarray.py rng = np.random.default_rng(seed=0) @@ -2337,6 +2358,31 @@ def test_logical_not(ctx_factory): cl_array.logical_not(cl_array.zeros(cq, 10, np.float64) + 1).get(), np.logical_not(np.ones(10))) +# }}} + + +# {{{ test multiple queues + +def test_multiple_queues(ctx_factory): + ctx = ctx_factory() + + a = [None] * 3 + for i in range(len(a)): + queue = cl.CommandQueue(ctx) + a[i] = cl_array.arange(queue, 1000, dtype=np.float32) + + b = a[i] + a[i] + b = a[i] ** 2 + b = a[i] + 4000 + assert len(b.write_events) == 1 + assert len(a[i].read_events) == 4 + + a[i] = a[i].with_queue(None) + + b = a[0].with_queue(queue) + a[1].with_queue(queue) + +# }}} + # {{{ test XDG_CACHE_HOME handling