Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new timeout for observe_value #650

Merged
merged 11 commits into from
Dec 2, 2024
2 changes: 2 additions & 0 deletions src/ophyd_async/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
get_unique,
in_micros,
wait_for_connection,
wait_for_pending_wakeups,
)

__all__ = [
Expand Down Expand Up @@ -190,4 +191,5 @@
"in_micros",
"wait_for_connection",
"completed_status",
"wait_for_pending_wakeups",
]
5 changes: 3 additions & 2 deletions src/ophyd_async/core/_signal.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
Callback,
LazyMock,
T,
wait_for_pending_wakeups,
)


Expand Down Expand Up @@ -456,10 +457,10 @@ async def observe_value(
signal.subscribe_value(q.put_nowait)
try:
while True:
item = await asyncio.wait_for(q.get(), timeout)
# yield here in case something else is filling the queue
# like in test_observe_value_times_out_with_no_external_task()
await asyncio.sleep(0)
item = await asyncio.wait_for(q.get(), timeout)
await wait_for_pending_wakeups(raise_if_exceeded=False, max_yields=5)
if done_status and item is done_status:
if exc := done_status.exception():
raise exc
Expand Down
21 changes: 21 additions & 0 deletions src/ophyd_async/core/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,3 +295,24 @@ def __call__(self) -> Mock:
if self.parent is not None:
self.parent().attach_mock(self._mock, self.name)
return self._mock


async def wait_for_pending_wakeups(max_yields=20, raise_if_exceeded=True):
"""Allow any ready asyncio tasks to be woken up.

Used in:

- Tests to allow tasks like ``set()`` to start so that signal
puts can be tested
- `observe_value` to allow it to be wrapped in `asyncio.wait_for`
with a timeout
"""
loop = asyncio.get_event_loop()
# If anything has called loop.call_soon or is scheduled a wakeup
# then let it run
for _ in range(max_yields):
await asyncio.sleep(0)
if not loop._ready: # type: ignore # noqa: SLF001
return
if raise_if_exceeded:
raise RuntimeError(f"Tasks still scheduling wakeups after {max_yields} yields")
coretl marked this conversation as resolved.
Show resolved Hide resolved
2 changes: 1 addition & 1 deletion tests/epics/adcore/test_drivers.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ async def test_start_acquiring_driver_and_ensure_status_flags_immediate_failure(
):
set_mock_value(driver.detector_state, adcore.DetectorState.Error)
acquiring = await adcore.start_acquiring_driver_and_ensure_status(
driver, timeout=0.01
driver, timeout=0.05
)
with pytest.raises(ValueError):
await acquiring
Expand Down
7 changes: 2 additions & 5 deletions tests/epics/demo/test_demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,10 @@
get_mock,
get_mock_put,
set_mock_value,
wait_for_pending_wakeups,
)
from ophyd_async.epics import demo

# Long enough for multiple asyncio event loop cycles to run so
# all the tasks have a chance to run
A_WHILE = 0.001


@pytest.fixture
async def mock_mover() -> demo.Mover:
Expand Down Expand Up @@ -141,7 +138,7 @@ async def test_mover_moving_well(mock_mover: demo.Mover) -> None:
time_elapsed=pytest.approx(0.1, abs=0.05),
)
set_mock_value(mock_mover.readback, 0.5499999)
await asyncio.sleep(A_WHILE)
await wait_for_pending_wakeups()
assert s.done
assert s.success
done.assert_called_once_with(s)
Expand Down
6 changes: 6 additions & 0 deletions tests/epics/signal/test_records.db
Original file line number Diff line number Diff line change
Expand Up @@ -328,3 +328,9 @@ record(waveform, "$(P)ntndarray:data")
}
})
}

record(calc, "$(P)ticking") {
field(INPA, "$(P)ticking")
field(CALC, "A+1")
field(SCAN, ".1 second")
}
20 changes: 20 additions & 0 deletions tests/epics/signal/test_signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@
Array1D,
NotConnected,
SignalBackend,
SignalR,
SignalRW,
StrictEnum,
SubsetEnum,
T,
Table,
load_from_yaml,
observe_value,
save_to_yaml,
)
from ophyd_async.epics.core import (
Expand Down Expand Up @@ -935,3 +937,21 @@ def my_plan():
def test_signal_module_emits_deprecation_warning():
with pytest.deprecated_call():
import ophyd_async.epics.signal # noqa: F401


async def test_observe_ticking_signal_with_busy_loop(ioc: IOC):
sig = SignalR(await ioc.make_backend(int, "ticking"))
recv = []

async def watch():
async for val in observe_value(sig):
time.sleep(0.3)
recv.append(val)

start = time.time()
with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(watch(), timeout=0.4)
assert time.time() - start == pytest.approx(0.6, abs=0.1)
assert len(recv) == 2
# Don't check values as CA and PVA have different algorithms for
# dropping updates for slow callbacks
24 changes: 7 additions & 17 deletions tests/epics/test_motor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,11 @@
set_mock_put_proceeds,
set_mock_value,
soft_signal_rw,
wait_for_pending_wakeups,
)
from ophyd_async.epics import motor


async def wait_for_wakeups(max_yields=10):
loop = asyncio.get_event_loop()
# If anything has called loop.call_soon or is scheduled a wakeup
# then let it run
for _ in range(max_yields):
await asyncio.sleep(0)
if not loop._ready:
return
raise RuntimeError(f"Tasks still scheduling wakeups after {max_yields} yields")


@pytest.fixture
async def sim_motor():
async with DeviceCollector(mock=True):
Expand All @@ -44,7 +34,7 @@ async def sim_motor():
async def wait_for_eq(item, attribute, comparison, timeout):
timeout_time = time.monotonic() + timeout
while getattr(item, attribute) != comparison:
await wait_for_wakeups()
await wait_for_pending_wakeups()
if time.monotonic() > timeout_time:
raise TimeoutError

Expand All @@ -56,7 +46,7 @@ async def test_motor_moving_well(sim_motor: motor.Motor) -> None:
s.watch(watcher)
done = Mock()
s.add_callback(done)
await wait_for_wakeups()
await wait_for_pending_wakeups()
await wait_for_eq(watcher, "call_count", 1, 1)
assert watcher.call_args == call(
name="sim_motor",
Expand Down Expand Up @@ -86,7 +76,7 @@ async def test_motor_moving_well(sim_motor: motor.Motor) -> None:
set_mock_value(sim_motor.motor_done_move, True)
set_mock_value(sim_motor.user_readback, 0.55)
set_mock_put_proceeds(sim_motor.user_setpoint, True)
await wait_for_wakeups()
await wait_for_pending_wakeups()
await wait_for_eq(s, "done", True, 1)
done.assert_called_once_with(s)

Expand All @@ -98,7 +88,7 @@ async def test_motor_moving_well_2(sim_motor: motor.Motor) -> None:
s.watch(watcher)
done = Mock()
s.add_callback(done)
await wait_for_wakeups()
await wait_for_pending_wakeups()
assert watcher.call_count == 1
assert watcher.call_args == call(
name="sim_motor",
Expand Down Expand Up @@ -126,7 +116,7 @@ async def test_motor_moving_well_2(sim_motor: motor.Motor) -> None:
time_elapsed=pytest.approx(0.1, abs=0.2),
)
set_mock_put_proceeds(sim_motor.user_setpoint, True)
await wait_for_wakeups()
await wait_for_pending_wakeups()
assert s.done
done.assert_called_once_with(s)

Expand Down Expand Up @@ -165,7 +155,7 @@ async def test_motor_moving_stopped(sim_motor: motor.Motor):
assert not s.done
await sim_motor.stop()
set_mock_put_proceeds(sim_motor.user_setpoint, True)
await wait_for_wakeups()
await wait_for_pending_wakeups()
assert s.done
assert s.success is False

Expand Down
Loading