Skip to content

Commit

Permalink
Merge pull request #1872 from IntelPython/add-empty-task-submission
Browse files Browse the repository at this point in the history
Add alternative implementation of device timer to SyclTimer class
  • Loading branch information
oleksandr-pavlyk authored Nov 15, 2024
2 parents 2a4714f + 7752078 commit 34ae129
Show file tree
Hide file tree
Showing 4 changed files with 236 additions and 19 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

### Change
### Changed

* Improved performance of copy-and-cast operations from `numpy.ndarray` to `tensor.usm_ndarray` for contiguous inputs [gh-1829](https://github.com/IntelPython/dpctl/pull/1829)
* Improved performance of copying operation to C-/F-contig array, with optimization for batch of square matrices [gh-1850](https://github.com/IntelPython/dpctl/pull/1850)
* Improved performance of `tensor.argsort` function for all types [gh-1859](https://github.com/IntelPython/dpctl/pull/1859)
* Improved performance of `tensor.sort` and `tensor.argsort` for short arrays in the range [16, 64] elements [gh-1866](https://github.com/IntelPython/dpctl/pull/1866)
* Implement radix sort algorithm to be used in `dpt.sort` and `dpt.argsort` [gh-1867](https://github.com/IntelPython/dpctl/pull/1867)
* Extended `dpctl.SyclTimer` with `device_timer` keyword, implementing different methods of collecting device times [gh-1872](https://github.com/IntelPython/dpctl/pull/1872)

### Fixed
* Fix for `tensor.result_type` when all inputs are Python built-in scalars [gh-1877](https://github.com/IntelPython/dpctl/pull/1877)
Expand Down
131 changes: 113 additions & 18 deletions dpctl/_sycl_timer.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,47 @@ def device_dt(self):
return self._device_dt


class BaseDeviceTimer:
__slots__ = ["queue"]

def __init__(self, sycl_queue):
if not isinstance(sycl_queue, SyclQueue):
raise TypeError(f"Expected type SyclQueue, got {type(sycl_queue)}")
self.queue = sycl_queue


class QueueBarrierDeviceTimer(BaseDeviceTimer):
__slots__ = []

def __init__(self, sycl_queue):
super(QueueBarrierDeviceTimer, self).__init__(sycl_queue)

def get_event(self):
return self.queue.submit_barrier()


class OrderManagerDeviceTimer(BaseDeviceTimer):
__slots__ = ["_order_manager", "_submit_empty_task_fn"]

def __init__(self, sycl_queue):
import dpctl.utils._seq_order_keeper as s_ok
from dpctl.utils import SequentialOrderManager as seq_om

super(OrderManagerDeviceTimer, self).__init__(sycl_queue)
self._order_manager = seq_om[self.queue]
self._submit_empty_task_fn = s_ok._submit_empty_task

def get_event(self):
ev = self._submit_empty_task_fn(
sycl_queue=self.queue, depends=self._order_manager.submitted_events
)
self._order_manager.add_event_pair(ev, ev)
return ev


class SyclTimer:
"""
Context to measure device time and host wall-time of execution
of commands submitted to :class:`dpctl.SyclQueue`.
Context to time execution of tasks submitted to :class:`dpctl.SyclQueue`.
:Example:
.. code-block:: python
Expand All @@ -58,40 +95,81 @@ class SyclTimer:
q = dpctl.SyclQueue(property="enable_profiling")
# create the timer
milliseconds_sc = 1e-3
milliseconds_sc = 1e3
timer = dpctl.SyclTimer(time_scale = milliseconds_sc)
untimed_code_block_1
# use the timer
with timer(queue=q):
code_block1
timed_code_block1
untimed_code_block_2
# use the timer
with timer(queue=q):
code_block2
timed_code_block2
untimed_code_block_3
# retrieve elapsed times in milliseconds
wall_dt, device_dt = timer.dt
.. note::
The timer submits barriers to the queue at the entrance and the
The timer submits tasks to the queue at the entrance and the
exit of the context and uses profiling information from events
associated with these submissions to perform the timing. Thus
:class:`dpctl.SyclTimer` requires the queue with ``"enable_profiling"``
property. In order to be able to collect the profiling information,
the ``dt`` property ensures that both submitted barriers complete their
execution and thus effectively synchronizes the queue.
the ``dt`` property ensures that both tasks submitted by the timer
complete their execution and thus effectively synchronizes the queue.
Execution of the above example results in the following task graph,
where each group of tasks is ordered after the one preceding it,
``[tasks_of_untimed_block1]``, ``[timer_fence_start_task]``,
``[tasks_of_timed_block1]``, ``[timer_fence_finish_task]``,
``[tasks_of_untimed_block2]``, ``[timer_fence_start_task]``,
``[tasks_of_timed_block2]``, ``[timer_fence_finish_task]``,
``[tasks_of_untimed_block3]``.
``device_timer`` keyword argument controls the type of tasks submitted.
With ``"queue_barrier"`` value, queue barrier tasks are used. With
``"order_manager"`` value, a single empty body task is inserted
and order manager (used by all `dpctl.tensor` operations) is used to
order these tasks so that they fence operations performed within
timer's context.
Timing offloading operations that do not use the order manager with
the timer that uses ``"order_manager"`` as ``device_timer`` value
will be misleading becaused the tasks submitted by the timer will not
be ordered with respect to tasks we intend to time.
Note, that host timer effectively measures the time of task
submissions. To measure host timer wall-time that includes execution
of submitted tasks, make sure to include synchronization point in
the timed block.
:Example:
.. code-block:: python
with timer(q):
timed_block
q.wait()
Args:
host_timer (callable, optional):
A callable such that host_timer() returns current
host time in seconds.
Default: :py:func:`timeit.default_timer`.
device_timer (Literal["queue_barrier", "order_manager"], optional):
Device timing method. Default: "queue_barrier".
time_scale (Union[int, float], optional):
Ratio of the unit of time of interest and one second.
Ratio of one second and the unit of time-scale of interest.
Default: ``1``.
"""

def __init__(self, host_timer=timeit.default_timer, time_scale=1):
def __init__(
self, host_timer=timeit.default_timer, device_timer=None, time_scale=1
):
"""
Create new instance of :class:`.SyclTimer`.
Expand All @@ -100,6 +178,8 @@ def __init__(self, host_timer=timeit.default_timer, time_scale=1):
A function that takes no arguments and returns a value
measuring time.
Default: :meth:`timeit.default_timer`.
device_timer (Literal["queue_barrier", "order_manager"], optional):
Device timing method. Default: "queue_barrier"
time_scale (Union[int, float], optional):
Scaling factor applied to durations measured by
the host_timer. Default: ``1``.
Expand All @@ -109,11 +189,26 @@ def __init__(self, host_timer=timeit.default_timer, time_scale=1):
self.queue = None
self.host_times = []
self.bracketing_events = []
self._context_data = list()
if device_timer is None:
device_timer = "queue_barrier"
if device_timer == "queue_barrier":
self._device_timer_class = QueueBarrierDeviceTimer
elif device_timer == "order_manager":
self._device_timer_class = OrderManagerDeviceTimer
else:
raise ValueError(
"Supported values for device_timer keyword are "
"'queue_barrier', 'order_manager', got "
f"'{device_timer}'"
)
self._device_timer = None

def __call__(self, queue=None):
if isinstance(queue, SyclQueue):
if queue.has_enable_profiling:
self.queue = queue
self._device_timer = self._device_timer_class(queue)
else:
raise ValueError(
"The given queue was not created with the "
Expand All @@ -127,17 +222,17 @@ def __call__(self, queue=None):
return self

def __enter__(self):
self._event_start = self.queue.submit_barrier()
self._host_start = self.timer()
_event_start = self._device_timer.get_event()
_host_start = self.timer()
self._context_data.append((_event_start, _host_start))
return self

def __exit__(self, *args):
self.host_times.append((self._host_start, self.timer()))
self.bracketing_events.append(
(self._event_start, self.queue.submit_barrier())
)
del self._event_start
del self._host_start
_event_end = self._device_timer.get_event()
_host_end = self.timer()
_event_start, _host_start = self._context_data.pop()
self.host_times.append((_host_start, _host_end))
self.bracketing_events.append((_event_start, _event_end))

@property
def dt(self):
Expand Down
108 changes: 108 additions & 0 deletions dpctl/tests/test_sycl_timer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
# Data Parallel Control (dpctl)
#
# Copyright 2020-2024 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import time

import pytest

import dpctl
import dpctl.tensor as dpt


@pytest.fixture
def profiling_queue():
try:
q = dpctl.SyclQueue(property="enable_profiling")
except dpctl.SyclQueueCreationError:
pytest.skip(
"Could not created profiling queue " "for default-selected device"
)
return q


@pytest.mark.parametrize(
"device_timer", [None, "queue_barrier", "order_manager"]
)
def test_sycl_timer_queue_barrier(profiling_queue, device_timer):
dev = dpt.Device.create_device(profiling_queue)

timer = dpctl.SyclTimer(
host_timer=time.perf_counter, device_timer=device_timer, time_scale=1e3
)

with timer(dev.sycl_queue):
x = dpt.linspace(0, 1, num=10**6, device=dev)
y = 3.0 - dpt.square(x - 0.5)
z = dpt.sort(y)
res1 = z[-1]
res2 = dpt.max(y)

host_dt, device_dt = timer.dt

assert dpt.all(res1 == res2)
assert host_dt > 0
assert device_dt > 0


def test_sycl_timer_accumulation(profiling_queue):
q = profiling_queue

timer = dpctl.SyclTimer(
host_timer=time.perf_counter,
device_timer="order_manager",
time_scale=1e3,
)

# initial condition
x = dpt.linspace(0, 1, num=10**6, sycl_queue=q)

aitkens_data = [
x,
]

# 16 iterations of Aitken's accelerated Newton's method
# x <- x - f(x)/f'(x) for f(x) = x - cos(x)
for _ in range(16):
# only time Newton step
with timer(q):
s = dpt.sin(x)
x = (dpt.cos(x) + x * s) / (1 + s)
aitkens_data.append(x)
aitkens_data = aitkens_data[-3:]
if len(aitkens_data) == 3:
# apply Aitkens acceleration
d1 = aitkens_data[-1] - aitkens_data[-2]
d2 = aitkens_data[-2] - aitkens_data[-3]
if not dpt.any(d1 == d2):
x = aitkens_data[-1] - dpt.square(d1) / (d1 - d2)

# Total time for 16 iterations
dev_dt = timer.dt.device_dt
assert dev_dt > 0

# check convergence
assert dpt.max(x) - dpt.min(x) < 1e-5


def test_sycl_timer_validation():
with pytest.raises(ValueError):
dpctl.SyclTimer(device_timer="invalid")

timer = dpctl.SyclTimer()
mock_queue = Ellipsis

with pytest.raises(TypeError):
timer(mock_queue)
13 changes: 13 additions & 0 deletions dpctl/utils/src/order_keeper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,17 @@ PYBIND11_MODULE(_seq_order_keeper, m)
&SequentialOrder::add_to_submitted_events)
.def("wait", &SequentialOrder::wait,
py::call_guard<py::gil_scoped_release>());

auto submit_empty_task_fn =
[](sycl::queue &exec_q,
const std::vector<sycl::event> &depends) -> sycl::event {
return exec_q.submit([&](sycl::handler &cgh) {
cgh.depends_on(depends);
cgh.single_task([]() {
// empty body
});
});
};
m.def("_submit_empty_task", submit_empty_task_fn, py::arg("sycl_queue"),
py::arg("depends") = py::list());
}

0 comments on commit 34ae129

Please sign in to comment.