From 559dcc5843da44007de6603dbfc7cfe15310de83 Mon Sep 17 00:00:00 2001 From: Tom Cobb Date: Thu, 14 Nov 2024 15:09:33 +0000 Subject: [PATCH 01/10] Add epics test for busy signals --- src/ophyd_async/core/_signal.py | 2 +- tests/epics/signal/test_records.db | 6 ++++++ tests/epics/signal/test_signals.py | 20 ++++++++++++++++++++ 3 files changed, 27 insertions(+), 1 deletion(-) diff --git a/src/ophyd_async/core/_signal.py b/src/ophyd_async/core/_signal.py index cf16ea7988..549da86787 100644 --- a/src/ophyd_async/core/_signal.py +++ b/src/ophyd_async/core/_signal.py @@ -456,10 +456,10 @@ async def observe_value( signal.subscribe_value(q.put_nowait) try: while True: + item = await asyncio.wait_for(q.get(), timeout) # yield here in case something else is filling the queue # like in test_observe_value_times_out_with_no_external_task() await asyncio.sleep(0) - item = await asyncio.wait_for(q.get(), timeout) if done_status and item is done_status: if exc := done_status.exception(): raise exc diff --git a/tests/epics/signal/test_records.db b/tests/epics/signal/test_records.db index 18ad9a6678..931bc99623 100644 --- a/tests/epics/signal/test_records.db +++ b/tests/epics/signal/test_records.db @@ -328,3 +328,9 @@ record(waveform, "$(P)ntndarray:data") } }) } + +record(calc, "$(P)ticking") { + field(INPA, "$(P)ticking") + field(CALC, "A+1") + field(SCAN, ".1 second") +} diff --git a/tests/epics/signal/test_signals.py b/tests/epics/signal/test_signals.py index 3f79442c9f..5c5ce9bc84 100644 --- a/tests/epics/signal/test_signals.py +++ b/tests/epics/signal/test_signals.py @@ -27,12 +27,14 @@ Array1D, NotConnected, SignalBackend, + SignalR, SignalRW, StrictEnum, SubsetEnum, T, Table, load_from_yaml, + observe_value, save_to_yaml, ) from ophyd_async.epics.core import ( @@ -935,3 +937,21 @@ def my_plan(): def test_signal_module_emits_deprecation_warning(): with pytest.deprecated_call(): import ophyd_async.epics.signal # noqa: F401 + + +async def test_observe_ticking_signal_with_busy_loop(ioc: IOC): + sig = SignalR(await ioc.make_backend(int, "ticking")) + recv = [] + + async def watch(): + async for val in observe_value(sig): + time.sleep(0.15) + recv.append(val) + + start = time.time() + with pytest.raises(asyncio.TimeoutError): + await asyncio.wait_for(watch(), timeout=0.2) + assert time.time() - start == pytest.approx(0.3, abs=0.05) + assert len(recv) == 2 + # Don't check values as CA and PVA have different algorithms for + # dropping updates for slow callbacks From 6f0d1e6a1da9f33333f97e8237bd7125ec343e46 Mon Sep 17 00:00:00 2001 From: Tom Cobb Date: Fri, 15 Nov 2024 16:47:21 +0000 Subject: [PATCH 02/10] Use wait_for_pending_wakeups in observe_value --- src/ophyd_async/core/__init__.py | 2 ++ src/ophyd_async/core/_signal.py | 3 ++- src/ophyd_async/core/_utils.py | 24 ++++++++++++++++++++++++ tests/epics/signal/test_signals.py | 6 +++--- tests/epics/test_motor.py | 24 +++++++----------------- 5 files changed, 38 insertions(+), 21 deletions(-) diff --git a/src/ophyd_async/core/__init__.py b/src/ophyd_async/core/__init__.py index ff4d6f8be9..6d989eb60d 100644 --- a/src/ophyd_async/core/__init__.py +++ b/src/ophyd_async/core/__init__.py @@ -95,6 +95,7 @@ get_unique, in_micros, wait_for_connection, + wait_for_pending_wakeups, ) __all__ = [ @@ -190,4 +191,5 @@ "in_micros", "wait_for_connection", "completed_status", + "wait_for_pending_wakeups", ] diff --git a/src/ophyd_async/core/_signal.py b/src/ophyd_async/core/_signal.py index 549da86787..280b990a4c 100644 --- a/src/ophyd_async/core/_signal.py +++ b/src/ophyd_async/core/_signal.py @@ -36,6 +36,7 @@ Callback, LazyMock, T, + wait_for_pending_wakeups, ) @@ -459,7 +460,7 @@ async def observe_value( item = await asyncio.wait_for(q.get(), timeout) # yield here in case something else is filling the queue # like in test_observe_value_times_out_with_no_external_task() - await asyncio.sleep(0) + await wait_for_pending_wakeups() if done_status and item is done_status: if exc := done_status.exception(): raise exc diff --git a/src/ophyd_async/core/_utils.py b/src/ophyd_async/core/_utils.py index 2aa4b1c717..466b4e2032 100644 --- a/src/ophyd_async/core/_utils.py +++ b/src/ophyd_async/core/_utils.py @@ -2,6 +2,7 @@ import asyncio import logging +import warnings from collections.abc import Awaitable, Callable, Iterable, Mapping, Sequence from dataclasses import dataclass from enum import Enum, EnumMeta @@ -295,3 +296,26 @@ def __call__(self) -> Mock: if self.parent is not None: self.parent().attach_mock(self._mock, self.name) return self._mock + + +async def wait_for_pending_wakeups(max_yields=10): + """Allow any ready asyncio tasks to be woken up. + + Used in: + - Tests to allow tasks like ``set()`` to start so that signal + puts can be tested + - `observe_value` to allow it to be wrapped in `asyncio.wait_for` + with a timeout + """ + loop = asyncio.get_event_loop() + # If anything has called loop.call_soon or is scheduled a wakeup + # then let it run + for _ in range(max_yields): + await asyncio.sleep(0) + if not loop._ready: # type: ignore # noqa: SLF001 + return + warnings.warn( + f"Tasks still scheduling wakeups after {max_yields} yields", + RuntimeWarning, + stacklevel=2, + ) diff --git a/tests/epics/signal/test_signals.py b/tests/epics/signal/test_signals.py index 5c5ce9bc84..0a3945caa1 100644 --- a/tests/epics/signal/test_signals.py +++ b/tests/epics/signal/test_signals.py @@ -945,13 +945,13 @@ async def test_observe_ticking_signal_with_busy_loop(ioc: IOC): async def watch(): async for val in observe_value(sig): - time.sleep(0.15) + time.sleep(0.3) recv.append(val) start = time.time() with pytest.raises(asyncio.TimeoutError): - await asyncio.wait_for(watch(), timeout=0.2) - assert time.time() - start == pytest.approx(0.3, abs=0.05) + await asyncio.wait_for(watch(), timeout=0.4) + assert time.time() - start == pytest.approx(0.6, abs=0.1) assert len(recv) == 2 # Don't check values as CA and PVA have different algorithms for # dropping updates for slow callbacks diff --git a/tests/epics/test_motor.py b/tests/epics/test_motor.py index 1760f245ba..1842435824 100644 --- a/tests/epics/test_motor.py +++ b/tests/epics/test_motor.py @@ -15,21 +15,11 @@ set_mock_put_proceeds, set_mock_value, soft_signal_rw, + wait_for_pending_wakeups, ) from ophyd_async.epics import motor -async def wait_for_wakeups(max_yields=10): - loop = asyncio.get_event_loop() - # If anything has called loop.call_soon or is scheduled a wakeup - # then let it run - for _ in range(max_yields): - await asyncio.sleep(0) - if not loop._ready: - return - raise RuntimeError(f"Tasks still scheduling wakeups after {max_yields} yields") - - @pytest.fixture async def sim_motor(): async with DeviceCollector(mock=True): @@ -44,7 +34,7 @@ async def sim_motor(): async def wait_for_eq(item, attribute, comparison, timeout): timeout_time = time.monotonic() + timeout while getattr(item, attribute) != comparison: - await wait_for_wakeups() + await wait_for_pending_wakeups() if time.monotonic() > timeout_time: raise TimeoutError @@ -56,7 +46,7 @@ async def test_motor_moving_well(sim_motor: motor.Motor) -> None: s.watch(watcher) done = Mock() s.add_callback(done) - await wait_for_wakeups() + await wait_for_pending_wakeups() await wait_for_eq(watcher, "call_count", 1, 1) assert watcher.call_args == call( name="sim_motor", @@ -86,7 +76,7 @@ async def test_motor_moving_well(sim_motor: motor.Motor) -> None: set_mock_value(sim_motor.motor_done_move, True) set_mock_value(sim_motor.user_readback, 0.55) set_mock_put_proceeds(sim_motor.user_setpoint, True) - await wait_for_wakeups() + await wait_for_pending_wakeups() await wait_for_eq(s, "done", True, 1) done.assert_called_once_with(s) @@ -98,7 +88,7 @@ async def test_motor_moving_well_2(sim_motor: motor.Motor) -> None: s.watch(watcher) done = Mock() s.add_callback(done) - await wait_for_wakeups() + await wait_for_pending_wakeups() assert watcher.call_count == 1 assert watcher.call_args == call( name="sim_motor", @@ -126,7 +116,7 @@ async def test_motor_moving_well_2(sim_motor: motor.Motor) -> None: time_elapsed=pytest.approx(0.1, abs=0.2), ) set_mock_put_proceeds(sim_motor.user_setpoint, True) - await wait_for_wakeups() + await wait_for_pending_wakeups() assert s.done done.assert_called_once_with(s) @@ -165,7 +155,7 @@ async def test_motor_moving_stopped(sim_motor: motor.Motor): assert not s.done await sim_motor.stop() set_mock_put_proceeds(sim_motor.user_setpoint, True) - await wait_for_wakeups() + await wait_for_pending_wakeups() assert s.done assert s.success is False From 07a07792ee2f62aad22673d44a957f2b9d27c551 Mon Sep 17 00:00:00 2001 From: Tom Cobb Date: Mon, 18 Nov 2024 10:36:08 +0000 Subject: [PATCH 03/10] Tweak max_yields observe_value runs in a lot of tests, so the testing checks for max_yields needs to be much more than the one for observe_value otherwise they fight each other --- src/ophyd_async/core/_signal.py | 2 +- src/ophyd_async/core/_utils.py | 11 ++++------- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/src/ophyd_async/core/_signal.py b/src/ophyd_async/core/_signal.py index 280b990a4c..78413758e0 100644 --- a/src/ophyd_async/core/_signal.py +++ b/src/ophyd_async/core/_signal.py @@ -460,7 +460,7 @@ async def observe_value( item = await asyncio.wait_for(q.get(), timeout) # yield here in case something else is filling the queue # like in test_observe_value_times_out_with_no_external_task() - await wait_for_pending_wakeups() + await wait_for_pending_wakeups(raise_if_exceeded=False, max_yields=5) if done_status and item is done_status: if exc := done_status.exception(): raise exc diff --git a/src/ophyd_async/core/_utils.py b/src/ophyd_async/core/_utils.py index 466b4e2032..a66c209649 100644 --- a/src/ophyd_async/core/_utils.py +++ b/src/ophyd_async/core/_utils.py @@ -2,7 +2,6 @@ import asyncio import logging -import warnings from collections.abc import Awaitable, Callable, Iterable, Mapping, Sequence from dataclasses import dataclass from enum import Enum, EnumMeta @@ -298,10 +297,11 @@ def __call__(self) -> Mock: return self._mock -async def wait_for_pending_wakeups(max_yields=10): +async def wait_for_pending_wakeups(max_yields=20, raise_if_exceeded=True): """Allow any ready asyncio tasks to be woken up. Used in: + - Tests to allow tasks like ``set()`` to start so that signal puts can be tested - `observe_value` to allow it to be wrapped in `asyncio.wait_for` @@ -314,8 +314,5 @@ async def wait_for_pending_wakeups(max_yields=10): await asyncio.sleep(0) if not loop._ready: # type: ignore # noqa: SLF001 return - warnings.warn( - f"Tasks still scheduling wakeups after {max_yields} yields", - RuntimeWarning, - stacklevel=2, - ) + if raise_if_exceeded: + raise RuntimeError(f"Tasks still scheduling wakeups after {max_yields} yields") From bd5b00a12e92211eaef3095f4b28df702ef95d86 Mon Sep 17 00:00:00 2001 From: Tom Cobb Date: Mon, 18 Nov 2024 11:01:00 +0000 Subject: [PATCH 04/10] Slacken timing for windows --- tests/epics/adcore/test_drivers.py | 2 +- tests/epics/demo/test_demo.py | 7 ++----- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/tests/epics/adcore/test_drivers.py b/tests/epics/adcore/test_drivers.py index 6d6cddd199..2fe67c3df5 100644 --- a/tests/epics/adcore/test_drivers.py +++ b/tests/epics/adcore/test_drivers.py @@ -70,7 +70,7 @@ async def test_start_acquiring_driver_and_ensure_status_flags_immediate_failure( ): set_mock_value(driver.detector_state, adcore.DetectorState.Error) acquiring = await adcore.start_acquiring_driver_and_ensure_status( - driver, timeout=0.01 + driver, timeout=0.05 ) with pytest.raises(ValueError): await acquiring diff --git a/tests/epics/demo/test_demo.py b/tests/epics/demo/test_demo.py index e29e13bf29..7ab6c27d9b 100644 --- a/tests/epics/demo/test_demo.py +++ b/tests/epics/demo/test_demo.py @@ -19,13 +19,10 @@ get_mock, get_mock_put, set_mock_value, + wait_for_pending_wakeups, ) from ophyd_async.epics import demo -# Long enough for multiple asyncio event loop cycles to run so -# all the tasks have a chance to run -A_WHILE = 0.001 - @pytest.fixture async def mock_mover() -> demo.Mover: @@ -141,7 +138,7 @@ async def test_mover_moving_well(mock_mover: demo.Mover) -> None: time_elapsed=pytest.approx(0.1, abs=0.05), ) set_mock_value(mock_mover.readback, 0.5499999) - await asyncio.sleep(A_WHILE) + await wait_for_pending_wakeups() assert s.done assert s.success done.assert_called_once_with(s) From 2d96cca113e5b34c18631161210f6a0f0ab80c78 Mon Sep 17 00:00:00 2001 From: Oliver Silvester Date: Thu, 28 Nov 2024 15:18:30 +0000 Subject: [PATCH 05/10] More reliable timeout for observe_value --- src/ophyd_async/core/_signal.py | 32 +++++++++++++++---- src/ophyd_async/epics/testing/test_records.db | 6 ++++ tests/epics/signal/test_signals.py | 22 ++++++++----- 3 files changed, 46 insertions(+), 14 deletions(-) diff --git a/src/ophyd_async/core/_signal.py b/src/ophyd_async/core/_signal.py index e9c69d80e7..3801245f5c 100644 --- a/src/ophyd_async/core/_signal.py +++ b/src/ophyd_async/core/_signal.py @@ -2,6 +2,7 @@ import asyncio import functools +import time from collections.abc import AsyncGenerator, Awaitable, Callable, Mapping from typing import Any, Generic, cast @@ -36,7 +37,6 @@ Callback, LazyMock, T, - wait_for_pending_wakeups, ) @@ -426,6 +426,7 @@ async def observe_value( signal: SignalR[SignalDatatypeT], timeout: float | None = None, done_status: Status | None = None, + except_after_time: float | None = None, ) -> AsyncGenerator[SignalDatatypeT, None]: """Subscribe to the value of a signal so it can be iterated from. @@ -440,9 +441,17 @@ async def observe_value( done_status: If this status is complete, stop observing and make the iterator return. If it raises an exception then this exception will be raised by the iterator. + except_after_time: + If given, the maximum time to watch a value in seconds. If the loop is still + being watched after this length, raise asyncio.TimeoutError. This should be used + instead of on an 'asyncio.wait_for' timeout Notes ----- + Due to a rare condition with busy signals, it is not recommended to use this + function with asyncio.timeout, including in an 'asyncio.wait_for' loop. Instead, + this timeout should be given to the except_after_time parameter. + Example usage:: async for value in observe_value(sig): @@ -450,7 +459,10 @@ async def observe_value( """ async for _, value in observe_signals_value( - signal, timeout=timeout, done_status=done_status + signal, + timeout=timeout, + done_status=done_status, + except_after_time=except_after_time, ): yield value @@ -459,6 +471,7 @@ async def observe_signals_value( *signals: SignalR[SignalDatatypeT], timeout: float | None = None, done_status: Status | None = None, + except_after_time: float | None = None, ) -> AsyncGenerator[tuple[SignalR[SignalDatatypeT], SignalDatatypeT], None]: """Subscribe to the value of a signal so it can be iterated from. @@ -473,6 +486,10 @@ async def observe_signals_value( done_status: If this status is complete, stop observing and make the iterator return. If it raises an exception then this exception will be raised by the iterator. + except_after_time: + If given, the maximum time to watch a value in seconds. If the loop is still + being watched after this length, raise asyncio.TimeoutError. This should be used + instead of on an 'asyncio.wait_for' timeout Notes ----- @@ -505,13 +522,16 @@ def queue_value(value: SignalDatatypeT, signal=signal): if done_status is not None: done_status.add_callback(q.put_nowait) - + start_time = time.time() try: while True: + if except_after_time and time.time() - start_time > except_after_time: + raise asyncio.TimeoutError( + f"observe_value was still observing signals " + f"{[signal.source for signal in signals]} after " + f"timeout {except_after_time}s" + ) item = await asyncio.wait_for(q.get(), timeout) - # yield here in case something else is filling the queue - # like in test_observe_value_times_out_with_no_external_task() - await wait_for_pending_wakeups(raise_if_exceeded=False, max_yields=5) item = await get_value() if done_status and item is done_status: if exc := done_status.exception(): diff --git a/src/ophyd_async/epics/testing/test_records.db b/src/ophyd_async/epics/testing/test_records.db index fbd81b02c2..ff45a6350c 100644 --- a/src/ophyd_async/epics/testing/test_records.db +++ b/src/ophyd_async/epics/testing/test_records.db @@ -150,3 +150,9 @@ record(lsi, "$(device)longstr2") { field(INP, {const:"a string that is just longer than forty characters"}) field(PINI, "YES") } + +record(calc, "$(device)ticking") { + field(INPA, "$(device)ticking") + field(CALC, "A+1") + field(SCAN, ".1 second") +} diff --git a/tests/epics/signal/test_signals.py b/tests/epics/signal/test_signals.py index 5148e9233e..df65384772 100644 --- a/tests/epics/signal/test_signals.py +++ b/tests/epics/signal/test_signals.py @@ -21,7 +21,6 @@ Array1D, NotConnected, SignalBackend, - SignalR, StrictEnum, SubsetEnum, T, @@ -38,7 +37,9 @@ epics_signal_w, epics_signal_x, ) -from ophyd_async.epics.core._signal import _epics_signal_backend # noqa: PLC2701 +from ophyd_async.epics.core._signal import ( + _epics_signal_backend, # noqa: PLC2701 +) from ophyd_async.epics.testing import ( ExampleCaDevice, ExampleEnum, @@ -936,19 +937,24 @@ def test_signal_module_emits_deprecation_warning(): import ophyd_async.epics.signal # noqa: F401 -async def test_observe_ticking_signal_with_busy_loop(ioc: TestingIOC): - sig = SignalR(await ioc.make_backend(int, "ticking")) +@PARAMETERISE_PROTOCOLS +async def test_observe_ticking_signal_with_busy_loop(ioc, protocol): + sig = epics_signal_rw(int, f"{protocol}://{get_prefix(ioc, protocol)}ticking") + sig.set_name("hello") + await sig.connect() + recv = [] async def watch(): - async for val in observe_value(sig): - time.sleep(0.3) + async for val in observe_value(sig, except_after_time=0.35): + time.sleep(0.15) recv.append(val) start = time.time() + with pytest.raises(asyncio.TimeoutError): - await asyncio.wait_for(watch(), timeout=0.4) - assert time.time() - start == pytest.approx(0.6, abs=0.1) + await watch() + assert time.time() - start == pytest.approx(0.35, abs=0.15) assert len(recv) == 2 # Don't check values as CA and PVA have different algorithms for # dropping updates for slow callbacks From 76773b464b275cd473f7280f29ff2fde2e0b151b Mon Sep 17 00:00:00 2001 From: Oliver Silvester Date: Fri, 29 Nov 2024 10:21:51 +0000 Subject: [PATCH 06/10] Use overall timeout for get value too --- src/ophyd_async/core/_signal.py | 27 ++++++++++++++++++--------- tests/core/test_observe.py | 6 +++--- tests/epics/signal/test_signals.py | 7 +++---- 3 files changed, 24 insertions(+), 16 deletions(-) diff --git a/src/ophyd_async/core/_signal.py b/src/ophyd_async/core/_signal.py index 3801245f5c..8cf282f463 100644 --- a/src/ophyd_async/core/_signal.py +++ b/src/ophyd_async/core/_signal.py @@ -467,6 +467,14 @@ async def observe_value( yield value +def _get_iteration_timeout( + timeout: float | None, overall_deadline: float | None +) -> float | None: + # Test this works if overall timeout <=0 + overall_deadline = overall_deadline - time.monotonic() if overall_deadline else None + return min([x for x in [overall_deadline, timeout] if x is not None], default=None) + + async def observe_signals_value( *signals: SignalR[SignalDatatypeT], timeout: float | None = None, @@ -504,12 +512,11 @@ async def observe_signals_value( q: asyncio.Queue[tuple[SignalR[SignalDatatypeT], SignalDatatypeT] | Status] = ( asyncio.Queue() ) - if timeout is None: - get_value = q.get - else: - async def get_value(): - return await asyncio.wait_for(q.get(), timeout) + async def get_value(timeout: float | None = None): + if timeout is None: + return await q.get() + return await asyncio.wait_for(q.get(), timeout) cbs: dict[SignalR, Callback] = {} for signal in signals: @@ -522,17 +529,19 @@ def queue_value(value: SignalDatatypeT, signal=signal): if done_status is not None: done_status.add_callback(q.put_nowait) - start_time = time.time() + overall_deadline = ( + time.monotonic() + except_after_time if except_after_time else None + ) try: while True: - if except_after_time and time.time() - start_time > except_after_time: + if overall_deadline and time.monotonic() >= overall_deadline: raise asyncio.TimeoutError( f"observe_value was still observing signals " f"{[signal.source for signal in signals]} after " f"timeout {except_after_time}s" ) - item = await asyncio.wait_for(q.get(), timeout) - item = await get_value() + + item = await get_value(_get_iteration_timeout(timeout, overall_deadline)) if done_status and item is done_status: if exc := done_status.exception(): raise exc diff --git a/tests/core/test_observe.py b/tests/core/test_observe.py index 14b9443ac2..8cd576034c 100644 --- a/tests/core/test_observe.py +++ b/tests/core/test_observe.py @@ -105,13 +105,13 @@ async def test_observe_value_times_out_with_no_external_task(): recv = [] - async def watch(): - async for val in observe_value(sig): + async def watch(except_after_time): + async for val in observe_value(sig, except_after_time=except_after_time): recv.append(val) setter(val + 1) start = time.time() with pytest.raises(asyncio.TimeoutError): - await asyncio.wait_for(watch(), timeout=0.1) + await watch(except_after_time=0.1) assert recv assert time.time() - start == pytest.approx(0.1, abs=0.05) diff --git a/tests/epics/signal/test_signals.py b/tests/epics/signal/test_signals.py index df65384772..ac3ac4721e 100644 --- a/tests/epics/signal/test_signals.py +++ b/tests/epics/signal/test_signals.py @@ -940,21 +940,20 @@ def test_signal_module_emits_deprecation_warning(): @PARAMETERISE_PROTOCOLS async def test_observe_ticking_signal_with_busy_loop(ioc, protocol): sig = epics_signal_rw(int, f"{protocol}://{get_prefix(ioc, protocol)}ticking") - sig.set_name("hello") await sig.connect() recv = [] async def watch(): - async for val in observe_value(sig, except_after_time=0.35): - time.sleep(0.15) + async for val in observe_value(sig, except_after_time=0.4): + time.sleep(0.3) recv.append(val) start = time.time() with pytest.raises(asyncio.TimeoutError): await watch() - assert time.time() - start == pytest.approx(0.35, abs=0.15) + assert time.time() - start == pytest.approx(0.6, abs=0.1) assert len(recv) == 2 # Don't check values as CA and PVA have different algorithms for # dropping updates for slow callbacks From dd5664010fd66dfc275ebf646dec26eec1b06480 Mon Sep 17 00:00:00 2001 From: Oliver Silvester Date: Fri, 29 Nov 2024 10:38:24 +0000 Subject: [PATCH 07/10] don't use wait for in observe tests --- src/ophyd_async/core/_signal.py | 5 ++--- tests/core/test_observe.py | 21 +++++++++++++++++---- 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/src/ophyd_async/core/_signal.py b/src/ophyd_async/core/_signal.py index 8cf282f463..ea42e98f1b 100644 --- a/src/ophyd_async/core/_signal.py +++ b/src/ophyd_async/core/_signal.py @@ -442,7 +442,7 @@ async def observe_value( If this status is complete, stop observing and make the iterator return. If it raises an exception then this exception will be raised by the iterator. except_after_time: - If given, the maximum time to watch a value in seconds. If the loop is still + If given, the maximum time to watch a signal, in seconds. If the loop is still being watched after this length, raise asyncio.TimeoutError. This should be used instead of on an 'asyncio.wait_for' timeout @@ -470,7 +470,6 @@ async def observe_value( def _get_iteration_timeout( timeout: float | None, overall_deadline: float | None ) -> float | None: - # Test this works if overall timeout <=0 overall_deadline = overall_deadline - time.monotonic() if overall_deadline else None return min([x for x in [overall_deadline, timeout] if x is not None], default=None) @@ -495,7 +494,7 @@ async def observe_signals_value( If this status is complete, stop observing and make the iterator return. If it raises an exception then this exception will be raised by the iterator. except_after_time: - If given, the maximum time to watch a value in seconds. If the loop is still + If given, the maximum time to watch a signal, in seconds. If the loop is still being watched after this length, raise asyncio.TimeoutError. This should be used instead of on an 'asyncio.wait_for' timeout diff --git a/tests/core/test_observe.py b/tests/core/test_observe.py index 8cd576034c..c38c85cfdf 100644 --- a/tests/core/test_observe.py +++ b/tests/core/test_observe.py @@ -60,14 +60,14 @@ async def tick(): recv = [] async def watch(): - async for val in observe_value(sig): + async for val in observe_value(sig, except_after_time=0.2): recv.append(val) t = asyncio.create_task(tick()) start = time.time() try: with pytest.raises(asyncio.TimeoutError): - await asyncio.wait_for(watch(), timeout=0.2) + await watch() assert recv == [0, 1] assert time.time() - start == pytest.approx(0.2, abs=0.05) finally: @@ -85,7 +85,7 @@ async def tick(): recv = [] async def watch(): - async for val in observe_value(sig): + async for val in observe_value(sig, except_after_time=0.2): time.sleep(0.15) recv.append(val) @@ -93,7 +93,7 @@ async def watch(): start = time.time() try: with pytest.raises(asyncio.TimeoutError): - await asyncio.wait_for(watch(), timeout=0.2) + await watch() assert recv == [0, 1] assert time.time() - start == pytest.approx(0.3, abs=0.05) finally: @@ -115,3 +115,16 @@ async def watch(except_after_time): await watch(except_after_time=0.1) assert recv assert time.time() - start == pytest.approx(0.1, abs=0.05) + + +async def test_observe_value_uses_correct_timeout(): + sig, _ = soft_signal_r_and_setter(float) + + async def watch(timeout, except_after_time): + async for _ in observe_value(sig, timeout, except_after_time=except_after_time): + ... + + start = time.time() + with pytest.raises(asyncio.TimeoutError): + await watch(timeout=0.3, except_after_time=0.15) + assert time.time() - start == pytest.approx(0.15, abs=0.05) From 1f5356dd0caa35399132232608759ab084dce20f Mon Sep 17 00:00:00 2001 From: Oliver Silvester Date: Fri, 29 Nov 2024 12:07:19 +0000 Subject: [PATCH 08/10] Review suggestions --- src/ophyd_async/core/_signal.py | 32 +++++++++++++++--------------- tests/core/test_observe.py | 16 +++++++-------- tests/epics/signal/test_signals.py | 2 +- 3 files changed, 25 insertions(+), 25 deletions(-) diff --git a/src/ophyd_async/core/_signal.py b/src/ophyd_async/core/_signal.py index ea42e98f1b..092a9c59fc 100644 --- a/src/ophyd_async/core/_signal.py +++ b/src/ophyd_async/core/_signal.py @@ -426,7 +426,7 @@ async def observe_value( signal: SignalR[SignalDatatypeT], timeout: float | None = None, done_status: Status | None = None, - except_after_time: float | None = None, + done_timeout: float | None = None, ) -> AsyncGenerator[SignalDatatypeT, None]: """Subscribe to the value of a signal so it can be iterated from. @@ -441,7 +441,7 @@ async def observe_value( done_status: If this status is complete, stop observing and make the iterator return. If it raises an exception then this exception will be raised by the iterator. - except_after_time: + done_timeout: If given, the maximum time to watch a signal, in seconds. If the loop is still being watched after this length, raise asyncio.TimeoutError. This should be used instead of on an 'asyncio.wait_for' timeout @@ -450,7 +450,7 @@ async def observe_value( ----- Due to a rare condition with busy signals, it is not recommended to use this function with asyncio.timeout, including in an 'asyncio.wait_for' loop. Instead, - this timeout should be given to the except_after_time parameter. + this timeout should be given to the done_timeout parameter. Example usage:: @@ -462,7 +462,7 @@ async def observe_value( signal, timeout=timeout, done_status=done_status, - except_after_time=except_after_time, + done_timeout=done_timeout, ): yield value @@ -478,7 +478,7 @@ async def observe_signals_value( *signals: SignalR[SignalDatatypeT], timeout: float | None = None, done_status: Status | None = None, - except_after_time: float | None = None, + done_timeout: float | None = None, ) -> AsyncGenerator[tuple[SignalR[SignalDatatypeT], SignalDatatypeT], None]: """Subscribe to the value of a signal so it can be iterated from. @@ -493,7 +493,7 @@ async def observe_signals_value( done_status: If this status is complete, stop observing and make the iterator return. If it raises an exception then this exception will be raised by the iterator. - except_after_time: + done_timeout: If given, the maximum time to watch a signal, in seconds. If the loop is still being watched after this length, raise asyncio.TimeoutError. This should be used instead of on an 'asyncio.wait_for' timeout @@ -512,10 +512,12 @@ async def observe_signals_value( asyncio.Queue() ) - async def get_value(timeout: float | None = None): - if timeout is None: - return await q.get() - return await asyncio.wait_for(q.get(), timeout) + if timeout is None: + get_value = q.get + else: + + async def get_value(): + return await asyncio.wait_for(q.get(), timeout) cbs: dict[SignalR, Callback] = {} for signal in signals: @@ -528,19 +530,17 @@ def queue_value(value: SignalDatatypeT, signal=signal): if done_status is not None: done_status.add_callback(q.put_nowait) - overall_deadline = ( - time.monotonic() + except_after_time if except_after_time else None - ) + overall_deadline = time.monotonic() + done_timeout if done_timeout else None try: while True: if overall_deadline and time.monotonic() >= overall_deadline: raise asyncio.TimeoutError( f"observe_value was still observing signals " f"{[signal.source for signal in signals]} after " - f"timeout {except_after_time}s" + f"timeout {done_timeout}s" ) - - item = await get_value(_get_iteration_timeout(timeout, overall_deadline)) + iteration_timeout = _get_iteration_timeout(timeout, overall_deadline) + item = await asyncio.wait_for(q.get(), iteration_timeout) if done_status and item is done_status: if exc := done_status.exception(): raise exc diff --git a/tests/core/test_observe.py b/tests/core/test_observe.py index c38c85cfdf..50e0a167b5 100644 --- a/tests/core/test_observe.py +++ b/tests/core/test_observe.py @@ -60,7 +60,7 @@ async def tick(): recv = [] async def watch(): - async for val in observe_value(sig, except_after_time=0.2): + async for val in observe_value(sig, done_timeout=0.2): recv.append(val) t = asyncio.create_task(tick()) @@ -85,7 +85,7 @@ async def tick(): recv = [] async def watch(): - async for val in observe_value(sig, except_after_time=0.2): + async for val in observe_value(sig, done_timeout=0.2): time.sleep(0.15) recv.append(val) @@ -105,14 +105,14 @@ async def test_observe_value_times_out_with_no_external_task(): recv = [] - async def watch(except_after_time): - async for val in observe_value(sig, except_after_time=except_after_time): + async def watch(done_timeout): + async for val in observe_value(sig, done_timeout=done_timeout): recv.append(val) setter(val + 1) start = time.time() with pytest.raises(asyncio.TimeoutError): - await watch(except_after_time=0.1) + await watch(done_timeout=0.1) assert recv assert time.time() - start == pytest.approx(0.1, abs=0.05) @@ -120,11 +120,11 @@ async def watch(except_after_time): async def test_observe_value_uses_correct_timeout(): sig, _ = soft_signal_r_and_setter(float) - async def watch(timeout, except_after_time): - async for _ in observe_value(sig, timeout, except_after_time=except_after_time): + async def watch(timeout, done_timeout): + async for _ in observe_value(sig, timeout, done_timeout=done_timeout): ... start = time.time() with pytest.raises(asyncio.TimeoutError): - await watch(timeout=0.3, except_after_time=0.15) + await watch(timeout=0.3, done_timeout=0.15) assert time.time() - start == pytest.approx(0.15, abs=0.05) diff --git a/tests/epics/signal/test_signals.py b/tests/epics/signal/test_signals.py index ac3ac4721e..12295aac99 100644 --- a/tests/epics/signal/test_signals.py +++ b/tests/epics/signal/test_signals.py @@ -945,7 +945,7 @@ async def test_observe_ticking_signal_with_busy_loop(ioc, protocol): recv = [] async def watch(): - async for val in observe_value(sig, except_after_time=0.4): + async for val in observe_value(sig, done_timeout=0.4): time.sleep(0.3) recv.append(val) From aabfdcb07427a043db9725a3826c43a29d19d842 Mon Sep 17 00:00:00 2001 From: Oliver Silvester Date: Fri, 29 Nov 2024 13:28:20 +0000 Subject: [PATCH 09/10] Remove redundant code --- src/ophyd_async/core/_signal.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/src/ophyd_async/core/_signal.py b/src/ophyd_async/core/_signal.py index 092a9c59fc..180cee5604 100644 --- a/src/ophyd_async/core/_signal.py +++ b/src/ophyd_async/core/_signal.py @@ -512,13 +512,6 @@ async def observe_signals_value( asyncio.Queue() ) - if timeout is None: - get_value = q.get - else: - - async def get_value(): - return await asyncio.wait_for(q.get(), timeout) - cbs: dict[SignalR, Callback] = {} for signal in signals: From fb10a80acbbb99f500e579c25dcb2cc706328ee2 Mon Sep 17 00:00:00 2001 From: Oliver Silvester Date: Fri, 29 Nov 2024 15:38:41 +0000 Subject: [PATCH 10/10] Move testing function to new testing directory --- src/ophyd_async/core/__init__.py | 2 -- src/ophyd_async/core/_utils.py | 21 --------------------- src/ophyd_async/testing/__init__.py | 22 ++++++++++++++++++++++ tests/epics/demo/test_demo.py | 2 +- tests/epics/test_motor.py | 2 +- 5 files changed, 24 insertions(+), 25 deletions(-) create mode 100644 src/ophyd_async/testing/__init__.py diff --git a/src/ophyd_async/core/__init__.py b/src/ophyd_async/core/__init__.py index ee0ef2af2a..9563537339 100644 --- a/src/ophyd_async/core/__init__.py +++ b/src/ophyd_async/core/__init__.py @@ -96,7 +96,6 @@ get_unique, in_micros, wait_for_connection, - wait_for_pending_wakeups, ) __all__ = [ @@ -193,5 +192,4 @@ "in_micros", "wait_for_connection", "completed_status", - "wait_for_pending_wakeups", ] diff --git a/src/ophyd_async/core/_utils.py b/src/ophyd_async/core/_utils.py index 8d3ffe8c35..43ce473b8b 100644 --- a/src/ophyd_async/core/_utils.py +++ b/src/ophyd_async/core/_utils.py @@ -311,24 +311,3 @@ def __call__(self) -> Mock: if self.parent is not None: self.parent().attach_mock(self._mock, self.name) return self._mock - - -async def wait_for_pending_wakeups(max_yields=20, raise_if_exceeded=True): - """Allow any ready asyncio tasks to be woken up. - - Used in: - - - Tests to allow tasks like ``set()`` to start so that signal - puts can be tested - - `observe_value` to allow it to be wrapped in `asyncio.wait_for` - with a timeout - """ - loop = asyncio.get_event_loop() - # If anything has called loop.call_soon or is scheduled a wakeup - # then let it run - for _ in range(max_yields): - await asyncio.sleep(0) - if not loop._ready: # type: ignore # noqa: SLF001 - return - if raise_if_exceeded: - raise RuntimeError(f"Tasks still scheduling wakeups after {max_yields} yields") diff --git a/src/ophyd_async/testing/__init__.py b/src/ophyd_async/testing/__init__.py new file mode 100644 index 0000000000..d3efd849bd --- /dev/null +++ b/src/ophyd_async/testing/__init__.py @@ -0,0 +1,22 @@ +import asyncio + + +async def wait_for_pending_wakeups(max_yields=20, raise_if_exceeded=True): + """Allow any ready asyncio tasks to be woken up. + + Used in: + + - Tests to allow tasks like ``set()`` to start so that signal + puts can be tested + - `observe_value` to allow it to be wrapped in `asyncio.wait_for` + with a timeout + """ + loop = asyncio.get_event_loop() + # If anything has called loop.call_soon or is scheduled a wakeup + # then let it run + for _ in range(max_yields): + await asyncio.sleep(0) + if not loop._ready: # type: ignore # noqa: SLF001 + return + if raise_if_exceeded: + raise RuntimeError(f"Tasks still scheduling wakeups after {max_yields} yields") diff --git a/tests/epics/demo/test_demo.py b/tests/epics/demo/test_demo.py index 640aea8742..fba02106c5 100644 --- a/tests/epics/demo/test_demo.py +++ b/tests/epics/demo/test_demo.py @@ -19,9 +19,9 @@ get_mock, get_mock_put, set_mock_value, - wait_for_pending_wakeups, ) from ophyd_async.epics import demo +from ophyd_async.testing import wait_for_pending_wakeups @pytest.fixture diff --git a/tests/epics/test_motor.py b/tests/epics/test_motor.py index 9960a7a2fa..01f34fd785 100644 --- a/tests/epics/test_motor.py +++ b/tests/epics/test_motor.py @@ -15,9 +15,9 @@ set_mock_put_proceeds, set_mock_value, soft_signal_rw, - wait_for_pending_wakeups, ) from ophyd_async.epics import motor +from ophyd_async.testing import wait_for_pending_wakeups @pytest.fixture