Skip to content

Commit

Permalink
add unit tests + fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
vegano1 committed Aug 7, 2024
1 parent cf0a67c commit ed46e8f
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 36 deletions.
20 changes: 11 additions & 9 deletions api/src/opentrons/drivers/absorbance_reader/async_byonoy.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

SN_PARSER = re.compile(r'ATTRS{serial}=="(?P<serial>.+?)"')
VERSION_PARSER = re.compile(r"Absorbance (?P<version>V\d+\.\d+\.\d+)")
SERIAL_PARSER = re.compile(r"SN: (?P<serial>BYO[A-Z]{3}[0-9]{5})")


class AsyncByonoy:
Expand Down Expand Up @@ -111,7 +112,7 @@ def __init__(
self._device_handle: Optional[int] = None
self._current_config: Optional[AbsProtocol.MeasurementConfig] = None

async def open(self) -> int:
async def open(self) -> bool:
"""
Open the connection.
Expand Down Expand Up @@ -153,13 +154,14 @@ async def get_device_information(self) -> Dict[str, str]:
func=partial(self._interface.byonoy_get_device_information, handle),
)
self._raise_if_error(err.name, f"Error getting device information: {err}")

match = VERSION_PARSER.match(device_info.version)
version = match["version"].lower() if match else "v0.0.0"
serial_match = SERIAL_PARSER.match(device_info.sn)
version_match = VERSION_PARSER.match(device_info.version)
serial = serial_match["serial"] if serial_match else "BYOMAA00000"
version = version_match["version"].lower() if version_match else "v0.0.0"
info = {
"serial": self._device.sn,
"model": "ABS96",
"serial": serial,
"version": version,
"model": "ABS96",
}
return info

Expand All @@ -170,7 +172,7 @@ async def get_device_status(self) -> AbsorbanceReaderDeviceState:
executor=self._executor,
func=partial(self._interface.byonoy_get_device_status, handle),
)
self._raise_if_error(err.name, "Error getting device status: ")
self._raise_if_error(err.name, f"Error getting device status: {err}")
return self.convert_device_state(status.name)

async def update_firmware(self, firmware_file_path: str) -> Tuple[bool, str]:
Expand Down Expand Up @@ -257,14 +259,14 @@ def _get_supported_wavelengths(self) -> List[int]:
err, wavelengths = self._interface.byonoy_abs96_get_available_wavelengths(
handle
)
self._raise_if_error(err.name, "Error getting available wavelengths: ")
self._raise_if_error(err.name, f"Error getting available wavelengths: {err}")
self._supported_wavelengths = wavelengths
return wavelengths

def _initialize_measurement(self, conf: AbsProtocol.MeasurementConfig) -> None:
handle = self._verify_device_handle()
err = self._interface.byonoy_abs96_initialize_single_measurement(handle, conf)
self._raise_if_error(err.name, "Error initializing measurement: ")
self._raise_if_error(err.name, f"Error initializing measurement: {err}")
self._current_config = conf

def _set_sample_wavelength(self, wavelength: int) -> AbsProtocol.MeasurementConfig:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,15 +194,9 @@ def __init__(
self._device_info = device_info
self._reader = reader
self._poller = poller
self._updating = False
self._error: Optional[str] = None
self._reader.register_error_handler(self._enter_error_state)

@property
def updating(self) -> bool:
"""The device is updating is True."""
return self._updating

@property
def status(self) -> AbsorbanceReaderStatus:
"""Return some string describing status."""
Expand Down Expand Up @@ -284,8 +278,6 @@ def model(self) -> str:

def firmware_prefix(self) -> str:
"""The prefix used for looking up firmware"""
if isinstance(self._driver, SimulatingDriver):
return self._driver.model()
return "absorbance-96"

async def update_device(self, firmware_file_path: str) -> Tuple[bool, str]:
Expand Down
8 changes: 7 additions & 1 deletion api/src/opentrons/hardware_control/modules/mod_abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def parse_fw_version(version: str) -> Version:
raise InvalidVersion()
except InvalidVersion:
device_version = parse("v0.0.0")
return cast(Version, device_version)
return cast(Version, device_version) # type: ignore [redundant-cast]


class AbstractModule(abc.ABC):
Expand Down Expand Up @@ -73,6 +73,7 @@ def __init__(
self._execution_manager = execution_manager
self._bundled_fw: Optional[BundledFirmware] = self.get_bundled_fw()
self._disconnected_callback = disconnected_callback
self._updating = False

@staticmethod
def sort_key(inst: "AbstractModule") -> int:
Expand All @@ -91,6 +92,11 @@ def sort_key(inst: "AbstractModule") -> int:
def loop(self) -> asyncio.AbstractEventLoop:
return self._loop

@property
def updating(self) -> bool:
"""The device is updating is True."""
return self._updating

def disconnected_callback(self) -> None:
"""Called from within the module object to signify the object is no longer connected"""
if self._disconnected_callback is not None:
Expand Down
21 changes: 10 additions & 11 deletions api/src/opentrons/hardware_control/poller.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,20 @@ def __init__(self, reader: Reader, interval: float) -> None:
self._poll_forever_task: Optional["asyncio.Task[None]"] = None

async def start(self) -> None:
assert self._poll_forever_task is None, "Poller already started"
self._poll_forever_task = asyncio.create_task(self._poll_forever())
await self.wait_next_poll()
if self._poll_forever_task is None:
self._poll_forever_task = asyncio.create_task(self._poll_forever())
await self.wait_next_poll()

async def stop(self) -> None:
"""Stop polling."""
task = self._poll_forever_task

assert task is not None, "Poller never started"

async with self._use_read_lock():
task.cancel()
await asyncio.gather(task, return_exceptions=True)
for waiter in self._poll_waiters:
waiter.cancel(msg="Module was removed")
if task is not None:
async with self._use_read_lock():
task.cancel()
await asyncio.gather(task, return_exceptions=True)
for waiter in self._poll_waiters:
waiter.cancel(msg="Module was removed")
self._poll_forever_task = None

async def wait_next_poll(self) -> None:
"""Wait for the next poll to complete.
Expand Down
8 changes: 4 additions & 4 deletions api/tests/opentrons/drivers/absorbance_reader/test_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ async def test_driver_get_device_info(
) -> None:

DEVICE_INFO = MagicMock(AbsorbanceHidInterface.DeviceInfo)
DEVICE_INFO.ref_no = "456"
DEVICE_INFO.sn = "123"
DEVICE_INFO.version = "1.0"
DEVICE_INFO.ref_no = ""
DEVICE_INFO.sn = "SN: BYOMAA00013 REF: DE MAA 001"
DEVICE_INFO.version = "Absorbance V1.0.2 2024-04-18"

mock_interface.byonoy_get_device_information.return_value = (
MockErrorCode.BYONOY_ERROR_NO_ERROR,
Expand All @@ -88,7 +88,7 @@ async def test_driver_get_device_info(
info = await connected_driver.get_device_info()

mock_interface.byonoy_get_device_information.assert_called_once()
assert info == {"serial_number": "123", "reference_number": "456", "version": "1.0"}
assert info == {"serial": "BYOMAA00013", "model": "ABS96", "version": "v1.0.2"}


@pytest.mark.parametrize(
Expand Down
10 changes: 8 additions & 2 deletions api/tests/opentrons/hardware_control/test_module_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,17 +138,23 @@ async def test_register_modules_sort(
module_4 = decoy.mock(cls=AbstractModule)
decoy.when(module_4.usb_port).then_return(USBPort(name="x", port_number=2))

module_5 = decoy.mock(cls=AbstractModule)
decoy.when(module_5.usb_port).then_return(USBPort(name="z", port_number=1))

new_mods_at_ports = [ModuleAtPort(port="/dev/foo", name="bar")]
actual_ports = [
ModuleAtPort(port="/dev/a", name="magdeck", usb_port=module_1.usb_port),
ModuleAtPort(port="/dev/b", name="tempdeck", usb_port=module_2.usb_port),
ModuleAtPort(port="/dev/c", name="thermocycler", usb_port=module_3.usb_port),
ModuleAtPort(port="/dev/d", name="heatershaker", usb_port=module_4.usb_port),
ModuleAtPort(
port="/dev/d", name="absorbancereader", usb_port=module_5.usb_port
),
]

decoy.when(usb_bus.match_virtual_ports(new_mods_at_ports)).then_return(actual_ports)

for mod in [module_1, module_2, module_3, module_4]:
for mod in [module_1, module_2, module_3, module_4, module_5]:
decoy.when(
await build_module(
usb_port=mod.usb_port,
Expand All @@ -162,4 +168,4 @@ async def test_register_modules_sort(
await subject.register_modules(new_mods_at_ports=new_mods_at_ports)
result = subject.available_modules

assert result == [module_4, module_3, module_2, module_1]
assert result == [module_5, module_4, module_3, module_2, module_1]
54 changes: 53 additions & 1 deletion api/tests/opentrons/hardware_control/test_modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
TemperatureModuleModel,
HeaterShakerModuleModel,
ThermocyclerModuleModel,
AbsorbanceReaderModel,
ModuleType,
)
from opentrons.hardware_control.modules import (
TempDeck,
MagDeck,
Thermocycler,
HeaterShaker,
AbsorbanceReader,
AbstractModule,
SimulatingModule,
build as build_module,
Expand All @@ -44,6 +46,9 @@ async def test_get_modules_simulating() -> None:
"heatershaker": [
SimulatingModule(serial_number="444", model="heaterShakerModuleV1")
],
"absorbancereader": [
SimulatingModule(serial_number="555", model="absorbanceReaderV1")
],
}
api = await hardware_control.API.build_hardware_simulator(attached_modules=mods)
await asyncio.sleep(0.05)
Expand Down Expand Up @@ -104,6 +109,7 @@ async def test_module_caching() -> None:
(TemperatureModuleModel.TEMPERATURE_V1, TempDeck),
(ThermocyclerModuleModel.THERMOCYCLER_V1, Thermocycler),
(HeaterShakerModuleModel.HEATER_SHAKER_V1, HeaterShaker),
(AbsorbanceReaderModel.ABSORBANCE_READER_V1, AbsorbanceReader),
],
)
async def test_create_simulating_module(
Expand Down Expand Up @@ -232,13 +238,35 @@ async def mod_heatershaker() -> AsyncIterator[AbstractModule]:
await heatershaker.cleanup()


@pytest.fixture
async def mod_absorbancereader() -> AsyncIterator[AbstractModule]:
usb_port = USBPort(
name="",
hub=False,
port_number=0,
device_path="/dev/ot_module_sim_absorbancereader0",
)

absorbancereader = await build_module(
port="/dev/ot_module_sim_absorbancereader0",
usb_port=usb_port,
type=ModuleType.ABSORBANCE_READER,
simulating=True,
hw_control_loop=asyncio.get_running_loop(),
execution_manager=ExecutionManager(),
)
yield absorbancereader
await absorbancereader.cleanup()


async def test_module_update_integration(
monkeypatch: pytest.MonkeyPatch,
mod_tempdeck: AbstractModule,
mod_magdeck: AbstractModule,
mod_thermocycler: AbstractModule,
mod_heatershaker: AbstractModule,
mod_thermocycler_gen2: AbstractModule,
mod_absorbancereader: AbstractModule,
) -> None:
from opentrons.hardware_control import modules

Expand All @@ -252,6 +280,7 @@ def async_return(result: T) -> "asyncio.Future[T]":
bootloader_kwargs = {
"stdout": asyncio.subprocess.PIPE,
"stderr": asyncio.subprocess.PIPE,
"module": mod_tempdeck,
}

upload_via_avrdude_mock = mock.Mock(
Expand All @@ -267,13 +296,15 @@ async def mock_find_avrdude_bootloader_port() -> str:
)

# test temperature module update with avrdude bootloader
bootloader_kwargs["module"] = mod_tempdeck
await modules.update_firmware(mod_tempdeck, "fake_fw_file_path")
upload_via_avrdude_mock.assert_called_once_with(
"ot_module_avrdude_bootloader1", "fake_fw_file_path", bootloader_kwargs
)
upload_via_avrdude_mock.reset_mock()

# test magnetic module update with avrdude bootloader
bootloader_kwargs["module"] = mod_magdeck
await modules.update_firmware(mod_magdeck, "fake_fw_file_path")
upload_via_avrdude_mock.assert_called_once_with(
"ot_module_avrdude_bootloader1", "fake_fw_file_path", bootloader_kwargs
Expand All @@ -292,6 +323,7 @@ async def mock_find_bossa_bootloader_port() -> str:
modules.update, "find_bootloader_port", mock_find_bossa_bootloader_port
)

bootloader_kwargs["module"] = mod_thermocycler
await modules.update_firmware(mod_thermocycler, "fake_fw_file_path")
upload_via_bossa_mock.assert_called_once_with(
"ot_module_bossa_bootloader1", "fake_fw_file_path", bootloader_kwargs
Expand All @@ -310,25 +342,36 @@ async def mock_find_dfu_device_hs(pid: str, expected_device_count: int) -> str:

monkeypatch.setattr(modules.update, "find_dfu_device", mock_find_dfu_device_hs)

bootloader_kwargs["module"] = mod_heatershaker
await modules.update_firmware(mod_heatershaker, "fake_fw_file_path")
upload_via_dfu_mock.assert_called_once_with(
"df11", "fake_fw_file_path", bootloader_kwargs
)
upload_via_dfu_mock.reset_mock()

# test thermocycler-gen2 module update with dfu bootloader
async def mock_find_dfu_device_tc2(pid: str, expected_device_count: int) -> str:
if expected_device_count == 3:
return "df11"
return "none"

monkeypatch.setattr(modules.update, "find_dfu_device", mock_find_dfu_device_tc2)

bootloader_kwargs["module"] = mod_thermocycler_gen2
await modules.update_firmware(mod_thermocycler_gen2, "fake_fw_file_path")
upload_via_dfu_mock.assert_called_once_with(
"df11", "fake_fw_file_path", bootloader_kwargs
)

mod_thermocycler_gen2
# Test absorbancereader update with byonoy library
bootloader_kwargs["module"] = mod_absorbancereader
byonoy_update_firmware_mock = mock.Mock(return_value=(async_return((True, ""))))
mod_absorbancereader._driver.update_firmware = byonoy_update_firmware_mock # type: ignore

assert not mod_absorbancereader.updating
await modules.update_firmware(mod_absorbancereader, "fake_fw_file_path")
byonoy_update_firmware_mock.assert_called_once_with("fake_fw_file_path")
assert not mod_absorbancereader.updating


async def test_get_bundled_fw(monkeypatch: pytest.MonkeyPatch, tmpdir: Path) -> None:
Expand All @@ -346,6 +389,9 @@ async def test_get_bundled_fw(monkeypatch: pytest.MonkeyPatch, tmpdir: Path) ->
dummy_hs_file = Path(tmpdir) / "[email protected]"
dummy_hs_file.write_text("hello")

dummy_abs_file = Path(tmpdir) / "[email protected]"
dummy_abs_file.write_text("hello")

dummy_bogus_file = Path(tmpdir) / "[email protected]"
dummy_bogus_file.write_text("hello")

Expand All @@ -365,6 +411,9 @@ async def test_get_bundled_fw(monkeypatch: pytest.MonkeyPatch, tmpdir: Path) ->
"heatershaker": [
SimulatingModule(serial_number="444", model="heaterShakerModuleV1")
],
"absorbancereader": [
SimulatingModule(serial_number="555", model="absorbanceReaderV1")
],
}

api = await API.build_hardware_simulator(attached_modules=mods)
Expand All @@ -382,6 +431,9 @@ async def test_get_bundled_fw(monkeypatch: pytest.MonkeyPatch, tmpdir: Path) ->
assert api.attached_modules[3].bundled_fw == BundledFirmware(
version="2.10.2", path=dummy_hs_file
)
assert api.attached_modules[4].bundled_fw == BundledFirmware(
version="1.0.2", path=dummy_abs_file
)
for m in api.attached_modules:
await m.cleanup()

Expand Down

0 comments on commit ed46e8f

Please sign in to comment.