diff --git a/api/src/opentrons/drivers/absorbance_reader/__init__.py b/api/src/opentrons/drivers/absorbance_reader/__init__.py index 149da466ddd9..df870c69acee 100644 --- a/api/src/opentrons/drivers/absorbance_reader/__init__.py +++ b/api/src/opentrons/drivers/absorbance_reader/__init__.py @@ -2,4 +2,8 @@ from .driver import AbsorbanceReaderDriver from .simulator import SimulatingDriver -__all__ = ["AbstractAbsorbanceReaderDriver", "AbsorbanceReaderDriver", "SimulatingDriver"] +__all__ = [ + "AbstractAbsorbanceReaderDriver", + "AbsorbanceReaderDriver", + "SimulatingDriver", +] diff --git a/api/src/opentrons/drivers/absorbance_reader/abstract.py b/api/src/opentrons/drivers/absorbance_reader/abstract.py index 38cf9fd37893..618da2a43789 100644 --- a/api/src/opentrons/drivers/absorbance_reader/abstract.py +++ b/api/src/opentrons/drivers/absorbance_reader/abstract.py @@ -30,11 +30,11 @@ async def get_available_wavelengths(self) -> List[int]: @abstractmethod async def get_single_measurement(self, wavelength: int) -> List[float]: ... - + @abstractmethod async def set_sample_wavelength(self, wavelength: int) -> None: ... - + @abstractmethod async def get_status(self) -> None: ... @@ -42,4 +42,4 @@ async def get_status(self) -> None: @abstractmethod async def get_device_info(self) -> Dict[str, str]: """Get device info""" - ... \ No newline at end of file + ... diff --git a/api/src/opentrons/drivers/absorbance_reader/async_byonoy.py b/api/src/opentrons/drivers/absorbance_reader/async_byonoy.py index 50299f302c1e..736d3a35ee5d 100644 --- a/api/src/opentrons/drivers/absorbance_reader/async_byonoy.py +++ b/api/src/opentrons/drivers/absorbance_reader/async_byonoy.py @@ -2,7 +2,6 @@ import asyncio import contextlib -import hid from dataclasses import dataclass from concurrent.futures.thread import ThreadPoolExecutor from functools import partial @@ -12,22 +11,25 @@ from opentrons.drivers.rpi_drivers.types import USBPort -import pybyonoy_device_library as byonoy # type: ignore[import-not-found] +import hid # type: ignore[import-not-found] +import pybyonoy_device_library as byonoy # type: ignore[import-not-found] TimeoutProperties = Union[Literal["write_timeout"], Literal["timeout"]] + def get_byonoy_device_from_sn(sn: str) -> byonoy.ByonoyDevice: found = byonoy.byonoy_available_devices() for device in found: if device.sn == sn: return device - + def get_device_number_at_port(usb_port: USBPort) -> str: - full_path = usb_port.name + ':' + usb_port.device_path.split('/')[0] - hid_port = [i for i in hid.enumerate() if full_path in i['path'].decode()] + full_path = usb_port.name + ":" + usb_port.device_path.split("/")[0] + hid_port = [i for i in hid.enumerate() if full_path in i["path"].decode()] assert len(hid_port) == 1, f"Expected 1 device, found {len(hid_port)}" - return hid_port[0]['serial_number'] + sn = hid_port[0]["serial_number"] + return str(sn) @dataclass @@ -69,11 +71,7 @@ async def create( executor = ThreadPoolExecutor(max_workers=1) device_sn = get_device_number_at_port(usb_port) device = await loop.run_in_executor( - executor=executor, - func=partial( - get_byonoy_device_from_sn, - device_sn - ) + executor=executor, func=partial(get_byonoy_device_from_sn, device_sn) ) return cls( device=device, @@ -113,72 +111,92 @@ def _open(self) -> None: if err != byonoy.ByonoyErrorCode.BYONOY_ERROR_NO_ERROR: raise RuntimeError(f"Error opening device: {err}") self._device_handle = device_handle - + def _free(self) -> None: if self._device_handle: byonoy.byonoy_free_device(self._device_handle) self._cleanup() + def verify_device_handle(self) -> None: + assert self._device_handle, RuntimeError("Device handle not initialized") + def _get_device_information(self) -> byonoy.ByonoyDeviceInfo: - if self._device_handle: - err, device_info = byonoy.byonoy_get_device_information(self._device_handle) - if err != byonoy.ByonoyErrorCode.BYONOY_ERROR_NO_ERROR: - raise RuntimeError(f"Error getting device information: {err}") - return device_info - + self.verify_device_handle() + err, device_info = byonoy.byonoy_get_device_information(self._device_handle) + if err != byonoy.ByonoyErrorCode.BYONOY_ERROR_NO_ERROR: + raise RuntimeError(f"Error getting device information: {err}") + return device_info + def _get_device_status(self) -> byonoy.ByonoyDeviceStatus: - if self._device_handle: - err, status = byonoy.byonoy_get_device_status(self._device_handle) - if err != byonoy.ByonoyErrorCode.BYONOY_ERROR_NO_ERROR: - raise RuntimeError(f"Error getting device status: {err}") - return status - + self.verify_device_handle() + err, status = byonoy.byonoy_get_device_status(self._device_handle) + if err != byonoy.ByonoyErrorCode.BYONOY_ERROR_NO_ERROR: + raise RuntimeError(f"Error getting device status: {err}") + return status + def _get_slot_status(self) -> byonoy.ByonoyDeviceSlotStatus: - if self._device_handle: - err, slot_status = byonoy.byonoy_get_device_slot_status(self._device_handle) - if err != byonoy.ByonoyErrorCode.BYONOY_ERROR_NO_ERROR: - raise RuntimeError(f"Error getting slot status: {err}") - return slot_status + self.verify_device_handle() + err, slot_status = byonoy.byonoy_get_device_slot_status(self._device_handle) + if err != byonoy.ByonoyErrorCode.BYONOY_ERROR_NO_ERROR: + raise RuntimeError(f"Error getting slot status: {err}") + return slot_status def _get_lid_status(self) -> bool: - if self._device_handle: - err, lid_on = byonoy.byonoy_get_device_parts_aligned(self._device_handle) - if err != byonoy.ByonoyErrorCode.BYONOY_ERROR_NO_ERROR: - raise RuntimeError(f"Error getting slot status: {err}") - return lid_on - + self.verify_device_handle() + lid_on: bool + err, lid_on = byonoy.byonoy_get_device_parts_aligned(self._device_handle) + if err != byonoy.ByonoyErrorCode.BYONOY_ERROR_NO_ERROR: + raise RuntimeError(f"Error getting slot status: {err}") + return lid_on + def _get_supported_wavelengths(self) -> List[int]: - if self._device_handle: - err, wavelengths = byonoy.byonoy_abs96_get_available_wavelengths(self._device_handle) - if err != byonoy.ByonoyErrorCode.BYONOY_ERROR_NO_ERROR: - raise RuntimeError(f"Error getting supported wavelengths: {err}") - self._supported_wavelengths = wavelengths - return wavelengths - - def _initialize_measurement(self, conf: byonoy.ByonoyAbs96SingleMeasurementConfig) -> None: - if self._device_handle: - err = byonoy.byonoy_abs96_initialize_single_measurement(self._device_handle, conf) - if err != byonoy.ByonoyErrorCode.BYONOY_ERROR_NO_ERROR: - raise RuntimeError(f"Error initializing measurement: {err}") - - def _single_measurement(self, conf: byonoy.ByonoyAbs96SingleMeasurementConfig) -> List[float]: - if self._device_handle: - err, measurements = byonoy.byonoy_abs96_single_measure(self._device_handle, conf) - if err != byonoy.ByonoyErrorCode.BYONOY_ERROR_NO_ERROR: - raise RuntimeError(f"Error getting single measurement: {err}") - return measurements + self.verify_device_handle() + wavelengths: List[int] + err, wavelengths = byonoy.byonoy_abs96_get_available_wavelengths( + self._device_handle + ) + if err != byonoy.ByonoyErrorCode.BYONOY_ERROR_NO_ERROR: + raise RuntimeError(f"Error getting supported wavelengths: {err}") + self._supported_wavelengths = wavelengths + return wavelengths - def _set_sample_wavelength(self, wavelength: int) -> byonoy.ByonoyAbs96SingleMeasurementConfig: - if self._device_handle: - if not self._supported_wavelengths: - self._get_supported_wavelengths() - assert self._supported_wavelengths - if wavelength in self._supported_wavelengths: - conf = byonoy.ByonoyAbs96SingleMeasurementConfig() - conf.sample_wavelength = wavelength - return conf - else: - raise ValueError(f"Unsupported wavelength: {wavelength}, expected: {self._supported_wavelengths}") + def _initialize_measurement( + self, conf: byonoy.ByonoyAbs96SingleMeasurementConfig + ) -> None: + self.verify_device_handle() + err = byonoy.byonoy_abs96_initialize_single_measurement( + self._device_handle, conf + ) + if err != byonoy.ByonoyErrorCode.BYONOY_ERROR_NO_ERROR: + raise RuntimeError(f"Error initializing measurement: {err}") + + def _single_measurement( + self, conf: byonoy.ByonoyAbs96SingleMeasurementConfig + ) -> List[float]: + self.verify_device_handle() + measurements: List[float] + err, measurements = byonoy.byonoy_abs96_single_measure( + self._device_handle, conf + ) + if err != byonoy.ByonoyErrorCode.BYONOY_ERROR_NO_ERROR: + raise RuntimeError(f"Error getting single measurement: {err}") + return measurements + + def _set_sample_wavelength( + self, wavelength: int + ) -> byonoy.ByonoyAbs96SingleMeasurementConfig: + self.verify_device_handle() + if not self._supported_wavelengths: + self._get_supported_wavelengths() + assert self._supported_wavelengths + if wavelength in self._supported_wavelengths: + conf = byonoy.ByonoyAbs96SingleMeasurementConfig() + conf.sample_wavelength = wavelength + return conf + else: + raise ValueError( + f"Unsupported wavelength: {wavelength}, expected: {self._supported_wavelengths}" + ) def _initialize(self, wavelength: int) -> None: conf = self._set_sample_wavelength(wavelength) @@ -204,9 +222,7 @@ async def close(self) -> None: Returns: None """ - await self._loop.run_in_executor( - executor=self._executor, func=self._free - ) + await self._loop.run_in_executor(executor=self._executor, func=self._free) async def is_open(self) -> bool: """ @@ -215,7 +231,7 @@ async def is_open(self) -> bool: Returns: boolean """ return self._device_handle is not None - + async def get_device_information(self) -> DeviceInfo: device_info = await self._loop.run_in_executor( executor=self._executor, func=self._get_device_information @@ -223,15 +239,17 @@ async def get_device_information(self) -> DeviceInfo: return DeviceInfo( serial_number=device_info.sn, reference_number=device_info.ref_no, - version=device_info.version + version=device_info.version, ) - + async def get_lid_status(self) -> AbsorbanceReaderLidStatus: lid_info = await self._loop.run_in_executor( executor=self._executor, func=self._get_lid_status ) - return AbsorbanceReaderLidStatus.ON if lid_info else AbsorbanceReaderLidStatus.OFF - + return ( + AbsorbanceReaderLidStatus.ON if lid_info else AbsorbanceReaderLidStatus.OFF + ) + async def get_supported_wavelengths(self) -> list[int]: return await self._loop.run_in_executor( executor=self._executor, func=self._get_supported_wavelengths @@ -244,7 +262,6 @@ async def initialize(self, wavelength: int) -> None: async def get_single_measurement(self, wavelength: int) -> List[float]: return await self._loop.run_in_executor( - executor=self._executor, func=partial(self._get_single_measurement, wavelength) + executor=self._executor, + func=partial(self._get_single_measurement, wavelength), ) - - diff --git a/api/src/opentrons/drivers/absorbance_reader/driver.py b/api/src/opentrons/drivers/absorbance_reader/driver.py index 1f54df292b31..f4e9f841bbdd 100644 --- a/api/src/opentrons/drivers/absorbance_reader/driver.py +++ b/api/src/opentrons/drivers/absorbance_reader/driver.py @@ -19,10 +19,9 @@ async def create( loop: Optional[asyncio.AbstractEventLoop], ) -> AbsorbanceReaderDriver: """Create an absorbance reader driver.""" - connection = await AsyncByonoy.create( - port=port, usb_port=usb_port, loop=loop) + connection = await AsyncByonoy.create(port=port, usb_port=usb_port, loop=loop) return cls(connection=connection) - + def __init__(self, connection: AsyncByonoy) -> None: self._connection = connection @@ -51,9 +50,9 @@ async def get_available_wavelengths(self) -> List[int]: async def get_single_measurement(self, wavelength: int) -> List[float]: return await self._connection.get_single_measurement(wavelength) - + async def set_sample_wavelength(self, wavelength: int) -> None: await self._connection.initialize(wavelength) - + async def get_status(self) -> None: pass diff --git a/api/src/opentrons/drivers/absorbance_reader/simulator.py b/api/src/opentrons/drivers/absorbance_reader/simulator.py index 14e752fe0a87..534d85f2c4b5 100644 --- a/api/src/opentrons/drivers/absorbance_reader/simulator.py +++ b/api/src/opentrons/drivers/absorbance_reader/simulator.py @@ -8,7 +8,6 @@ class SimulatingDriver(AbstractAbsorbanceReaderDriver): - def __init__(self, serial_number: Optional[str] = None) -> None: self._serial_number = serial_number @@ -45,13 +44,13 @@ async def get_available_wavelengths(self) -> List[int]: return [450, 570, 600, 650] @ensure_yield - async def get_single_measurement(self, wavelength: int) -> None: - pass - + async def get_single_measurement(self, wavelength: int) -> List[float]: + return [0.0] + @ensure_yield async def set_sample_wavelength(self, wavelength: int) -> None: pass - + @ensure_yield async def get_status(self) -> None: pass diff --git a/api/src/opentrons/drivers/rpi_drivers/types.py b/api/src/opentrons/drivers/rpi_drivers/types.py index 3a4a1873ace2..59b1f61658a7 100644 --- a/api/src/opentrons/drivers/rpi_drivers/types.py +++ b/api/src/opentrons/drivers/rpi_drivers/types.py @@ -43,11 +43,12 @@ class PortGroup: (tty/tty(\w{4})/dev | [\w:\.]+?/hidraw/hidraw\d/dev) ) """, - re.VERBOSE + re.VERBOSE, ) HUB_PATTERN = re.compile(r"(\d-[\d.]+\d?)[\/:]") + @dataclass(frozen=True) class USBPort: name: str @@ -58,7 +59,9 @@ class USBPort: device_path: str = "" @classmethod - def build(cls, full_path: str, board_revision: BoardRevision) -> Optional["USBPort"]: + def build( + cls, full_path: str, board_revision: BoardRevision + ) -> Optional["USBPort"]: """ Build a USBPort dataclass. @@ -127,9 +130,9 @@ def find_hub( port_nodes = [node for node in port_nodes if "." in node] if len(port_nodes) > 2: port_info = port_nodes[2].split(".") - hub: Optional[int] = int(port_info[1]) + hub = int(port_info[1]) port = int(port_info[2]) - hub_port: Optional[int] = int(port_info[3]) + hub_port = int(port_info[3]) name = port_nodes[2] elif len(port_nodes) > 1: if board_revision == BoardRevision.OG: @@ -202,7 +205,6 @@ def get_unique_nodes(full_name: str) -> List[str]: match_set.append(match) return match_set - @staticmethod def map_to_revision( board_revision: BoardRevision, diff --git a/api/src/opentrons/drivers/rpi_drivers/usb.py b/api/src/opentrons/drivers/rpi_drivers/usb.py index a127c0f8bc6a..3e6069aac55b 100644 --- a/api/src/opentrons/drivers/rpi_drivers/usb.py +++ b/api/src/opentrons/drivers/rpi_drivers/usb.py @@ -99,5 +99,5 @@ def match_virtual_ports( vp.usb_port = p sorted_virtual_ports.append(vp) break - + return sorted_virtual_ports or virtual_ports diff --git a/api/src/opentrons/drivers/types.py b/api/src/opentrons/drivers/types.py index 8cf0779749f0..09691ff051ff 100644 --- a/api/src/opentrons/drivers/types.py +++ b/api/src/opentrons/drivers/types.py @@ -73,4 +73,4 @@ class AbsorbanceReaderPlatePresence(str, Enum): UNKNOWN = "unknown" PRESENT = "present" - ABSENCE = "absence" \ No newline at end of file + ABSENCE = "absence" diff --git a/api/src/opentrons/hardware_control/modules/absorbance_reader.py b/api/src/opentrons/hardware_control/modules/absorbance_reader.py index 291537aa701f..1dc5ab74a715 100644 --- a/api/src/opentrons/hardware_control/modules/absorbance_reader.py +++ b/api/src/opentrons/hardware_control/modules/absorbance_reader.py @@ -1,28 +1,40 @@ - import asyncio import logging -from typing import Optional, Mapping, List +from typing import Optional, Mapping, List, Dict, Any, Tuple from opentrons.drivers.rpi_drivers.types import USBPort -from opentrons.drivers.absorbance_reader import AbstractAbsorbanceReaderDriver, AbsorbanceReaderDriver, SimulatingDriver +from opentrons.drivers.absorbance_reader import ( + AbstractAbsorbanceReaderDriver, + AbsorbanceReaderDriver, + SimulatingDriver, +) from opentrons.hardware_control.execution_manager import ExecutionManager from opentrons.hardware_control.modules import mod_abc -from opentrons.hardware_control.poller import Reader, Poller -from opentrons.hardware_control.modules.types import AbsorbanceReaderStatus, LiveData +from opentrons.hardware_control.modules.types import ( + ModuleType, + AbsorbanceReaderStatus, + LiveData, + UploadFunction, +) + +async def upload_func_placeholder( + dfu_serial: str, firmware_file_path: str, kwargs: Dict[str, Any] +) -> Tuple[bool, str]: + return False, "Not implemented" class AbsorbanceReader(mod_abc.AbstractModule): """Hardware control interface for an attached Absorbance Reader module.""" - MODULE_TYPE = mod_abc.ModuleType.ABSORBANCE_READER + MODULE_TYPE = ModuleType.ABSORBANCE_READER @classmethod async def build( cls, port: str, usb_port: USBPort, - execution_manager: mod_abc.ExecutionManager, + execution_manager: ExecutionManager, hw_control_loop: asyncio.AbstractEventLoop, poll_interval_seconds: Optional[float] = None, simulating: bool = False, @@ -33,10 +45,10 @@ async def build( driver: AbstractAbsorbanceReaderDriver if not simulating: driver = await AbsorbanceReaderDriver.create( - port, - usb_port, - hw_control_loop + port, usb_port, hw_control_loop ) + else: + driver = SimulatingDriver(serial_number=sim_serial_number) module = cls( port=port, usb_port=usb_port, @@ -48,13 +60,13 @@ async def build( return module def __init__( - self, - port: str, - usb_port: USBPort, - driver: AbstractAbsorbanceReaderDriver, - device_info: Mapping[str, str], - execution_manager: ExecutionManager, - hw_control_loop: asyncio.AbstractEventLoop + self, + port: str, + usb_port: USBPort, + driver: AbstractAbsorbanceReaderDriver, + device_info: Mapping[str, str], + execution_manager: ExecutionManager, + hw_control_loop: asyncio.AbstractEventLoop, ) -> None: super().__init__(port, usb_port, execution_manager, hw_control_loop) self._driver = driver @@ -84,7 +96,7 @@ def live_data(self) -> LiveData: "status": "idle", "data": { "sampleWavelength": 400, - } + }, } @property @@ -101,13 +113,12 @@ def port(self) -> str: def usb_port(self) -> USBPort: """The physical port where the module is connected.""" return self._usb_port - + async def wait_for_is_running(self) -> None: if not self.is_simulated: await self._execution_manager.wait_for_is_running() - - async def prep_for_update(self) -> None: + async def prep_for_update(self) -> str: """Prepare for an update. By the time this coroutine completes, the hardware should be ready @@ -117,7 +128,7 @@ async def prep_for_update(self) -> None: :returns str: The port we're running on. """ - pass + return "" def model(self) -> str: """A name for this specific module, matching module defs""" @@ -133,6 +144,10 @@ def firmware_prefix(self) -> str: # TODO: (AA) This is a placeholder return "" + def bootloader(self) -> UploadFunction: + """Bootloader mode""" + return upload_func_placeholder + async def cleanup(self) -> None: """Clean up the module instance. diff --git a/api/src/opentrons/hardware_control/modules/types.py b/api/src/opentrons/hardware_control/modules/types.py index 5f59bba4cc6a..b1289b7a4cdd 100644 --- a/api/src/opentrons/hardware_control/modules/types.py +++ b/api/src/opentrons/hardware_control/modules/types.py @@ -51,7 +51,7 @@ class ModuleType(str, Enum): MAGNETIC: MagneticModuleType = "magneticModuleType" HEATER_SHAKER: HeaterShakerModuleType = "heaterShakerModuleType" MAGNETIC_BLOCK: MagneticBlockType = "magneticBlockType" - ABSORBANCE_READER: AbsorbanceReaderType = "absorbanceReaderType" + ABSORBANCE_READER: AbsorbanceReaderType = "absorbanceReaderType" @classmethod def from_model(cls, model: ModuleModel) -> ModuleType: diff --git a/api/src/opentrons/hardware_control/modules/update.py b/api/src/opentrons/hardware_control/modules/update.py index 51c7d1cd32a4..2fdbc2a095f3 100644 --- a/api/src/opentrons/hardware_control/modules/update.py +++ b/api/src/opentrons/hardware_control/modules/update.py @@ -6,6 +6,7 @@ from typing import Any, AsyncGenerator, Dict, Tuple, Union from .types import UpdateError from .mod_abc import AbstractModule +from .absorbance_reader import AbsorbanceReader from opentrons.hardware_control.threaded_async_lock import ThreadedAsyncLock from contextlib import asynccontextmanager diff --git a/api/src/opentrons/protocol_engine/commands/__init__.py b/api/src/opentrons/protocol_engine/commands/__init__.py index 123425e464f5..26ba0af9b0e6 100644 --- a/api/src/opentrons/protocol_engine/commands/__init__.py +++ b/api/src/opentrons/protocol_engine/commands/__init__.py @@ -18,6 +18,7 @@ from . import temperature_module from . import thermocycler from . import calibration +from . import absorbance_reader from .hash_command_params import hash_protocol_command_params from .generate_command_schema import generate_command_schema @@ -532,6 +533,7 @@ "magnetic_module", "temperature_module", "thermocycler", + "absorbance_reader", # calibration command bundle "calibration", # configure pipette volume command bundle diff --git a/api/src/opentrons/protocol_engine/commands/absorbance_reader/__init__.py b/api/src/opentrons/protocol_engine/commands/absorbance_reader/__init__.py index 9a273d2a9be1..b1bd90510d89 100644 --- a/api/src/opentrons/protocol_engine/commands/absorbance_reader/__init__.py +++ b/api/src/opentrons/protocol_engine/commands/absorbance_reader/__init__.py @@ -1 +1,17 @@ """Command models for Absorbance Reader commands.""" + +from .measure import ( + AbsorbanceMeasureCommandType, + AbsorbanceMeasureParams, + AbsorbanceMeasureResult, + AbsorbanceMeasure, + AbsorbanceMeasureCreate, +) + +__all__ = [ + "AbsorbanceMeasureCommandType", + "AbsorbanceMeasureParams", + "AbsorbanceMeasureResult", + "AbsorbanceMeasure", + "AbsorbanceMeasureCreate", +] diff --git a/api/src/opentrons/protocol_engine/commands/absorbance_reader/measure.py b/api/src/opentrons/protocol_engine/commands/absorbance_reader/measure.py new file mode 100644 index 000000000000..a8d12d163d19 --- /dev/null +++ b/api/src/opentrons/protocol_engine/commands/absorbance_reader/measure.py @@ -0,0 +1,75 @@ +"""Command models to execute a Thermocycler profile.""" +from __future__ import annotations +from typing import List, Optional, TYPE_CHECKING +from typing_extensions import Literal, Type + +from pydantic import BaseModel, Field + +from opentrons.hardware_control.modules.types import ThermocyclerStep + +from ..command import AbstractCommandImpl, BaseCommand, BaseCommandCreate + +if TYPE_CHECKING: + from opentrons.protocol_engine.state import StateView + from opentrons.protocol_engine.execution import EquipmentHandler + + +AbsorbanceMeasureCommandType = Literal["absorbanceReader/measure"] + + +class AbsorbanceMeasureParams(BaseModel): + """Input parameters for a single absorbance reading.""" + + moduleId: str = Field(..., description="Unique ID of the Thermocycler.") + sampleWavelength: float = Field(..., description="Sample wavelength in nm.") + + +class AbsorbanceMeasureResult(BaseModel): + """Result data from running an aborbance reading.""" + + data: List[float] = Field(..., description="Absorbance data points.") + + +class AbsorbanceMeasureImpl( + AbstractCommandImpl[AbsorbanceMeasureParams, AbsorbanceMeasureResult] +): + """Execution implementation of a Thermocycler's run profile command.""" + + def __init__( + self, + state_view: StateView, + equipment: EquipmentHandler, + **unused_dependencies: object, + ) -> None: + self._state_view = state_view + self._equipment = equipment + + async def execute(self, params: AbsorbanceMeasureParams) -> AbsorbanceMeasureResult: + """Initiate a single absorbance measurement.""" + abs_reader_substate = self._state_view.modules.get_absorbance_reader_substate( + module_id=params.moduleId + ) + # Allow propagation of ModuleNotAttachedError. + abs_reader = self._equipment.get_module_hardware_api( + abs_reader_substate.module_id + ) + return AbsorbanceMeasureResult(data=[]) + + +class AbsorbanceMeasure(BaseCommand[AbsorbanceMeasureParams, AbsorbanceMeasureResult]): + """A command to execute an Absorbance Reader measurement.""" + + commandType: AbsorbanceMeasureCommandType = "absorbanceReader/measure" + params: AbsorbanceMeasureParams + result: Optional[AbsorbanceMeasureResult] + + _ImplementationCls: Type[AbsorbanceMeasureImpl] = AbsorbanceMeasureImpl + + +class AbsorbanceMeasureCreate(BaseCommandCreate[AbsorbanceMeasureParams]): + """A request to execute an Absorbance Reader measurement.""" + + commandType: AbsorbanceMeasureCommandType = "absorbanceReader/measure" + params: AbsorbanceMeasureParams + + _CommandCls: Type[AbsorbanceMeasure] = AbsorbanceMeasure diff --git a/api/src/opentrons/protocol_engine/commands/absorbance_reader/run_profile.py b/api/src/opentrons/protocol_engine/commands/absorbance_reader/run_profile.py deleted file mode 100644 index 467f1d7cac04..000000000000 --- a/api/src/opentrons/protocol_engine/commands/absorbance_reader/run_profile.py +++ /dev/null @@ -1,68 +0,0 @@ -"""Command models to execute a Thermocycler profile.""" -from __future__ import annotations -from typing import List, Optional, TYPE_CHECKING -from typing_extensions import Literal, Type - -from pydantic import BaseModel, Field - -from opentrons.hardware_control.modules.types import ThermocyclerStep - -from ..command import AbstractCommandImpl, BaseCommand, BaseCommandCreate - -if TYPE_CHECKING: - from opentrons.protocol_engine.state import StateView - from opentrons.protocol_engine.execution import EquipmentHandler - - -RunProfileCommandType = Literal["thermocycler/runProfile"] - - -class AbsorbanceReadParams(BaseModel): - """Input parameters for a single absorbance reading.""" - - moduleId: str = Field(..., description="Unique ID of the Thermocycler.") - sampleWavelength: float = Field(..., description="Sample wavelength in nm.") - - -class AbsorbanceReadResult(BaseModel): - """Result data from running an aborbance reading.""" - - data: List[float] = Field(..., description="Absorbance data points.") - - -class AbsorbanceReadImpl(AbstractCommandImpl[AbsorbanceReadParams, AbsorbanceReadResult]): - """Execution implementation of a Thermocycler's run profile command.""" - - def __init__( - self, - state_view: StateView, - equipment: EquipmentHandler, - **unused_dependencies: object, - ) -> None: - self._state_view = state_view - self._equipment = equipment - - async def execute(self, params: AbsorbanceReadParams) -> AbsorbanceReadResult: - """Initiate a single absorbance measurement.""" - # TODO: Implement this - return AbsorbanceReadResult(data=[]) - - -class AbsorbanceRead(BaseCommand[AbsorbanceReadParams, AbsorbanceReadResult]): - """A command to execute a Thermocycler profile run.""" - - # TODO: fix this - commandType: AbsorbanceReadCommandType = "absorbanceReader/measure" - params: AbsorbanceReadParams - result: Optional[AbsorbanceReadResult] - - _ImplementationCls: Type[AbsorbanceReadImpl] = AbsorbanceReadImpl - - -class RunProfileCreate(BaseCommandCreate[AbsorbanceReadParams]): - """A request to execute a Thermocycler profile run.""" - - commandType: AbsorbanceReadCommandType = "absorbanceReader/measure" - params: AbsorbanceReadParams - - _CommandCls: Type[AbsorbanceRead] = AbsorbanceRead diff --git a/api/src/opentrons/protocol_engine/commands/command_unions.py b/api/src/opentrons/protocol_engine/commands/command_unions.py index 7674508cc96f..e0675cb2c766 100644 --- a/api/src/opentrons/protocol_engine/commands/command_unions.py +++ b/api/src/opentrons/protocol_engine/commands/command_unions.py @@ -9,6 +9,7 @@ from . import magnetic_module from . import temperature_module from . import thermocycler +from . import absorbance_reader from . import calibration @@ -353,6 +354,7 @@ thermocycler.OpenLid, thermocycler.CloseLid, thermocycler.RunProfile, + absorbance_reader.AbsorbanceMeasure, calibration.CalibrateGripper, calibration.CalibratePipette, calibration.CalibrateModule, @@ -419,6 +421,7 @@ thermocycler.CloseLidParams, thermocycler.RunProfileParams, thermocycler.RunProfileStepParams, + absorbance_reader.AbsorbanceMeasureParams, calibration.CalibrateGripperParams, calibration.CalibratePipetteParams, calibration.CalibrateModuleParams, @@ -482,6 +485,7 @@ thermocycler.OpenLidCommandType, thermocycler.CloseLidCommandType, thermocycler.RunProfileCommandType, + absorbance_reader.AbsorbanceMeasureCommandType, calibration.CalibrateGripperCommandType, calibration.CalibratePipetteCommandType, calibration.CalibrateModuleCommandType, @@ -546,6 +550,7 @@ thermocycler.OpenLidCreate, thermocycler.CloseLidCreate, thermocycler.RunProfileCreate, + absorbance_reader.AbsorbanceMeasureCreate, calibration.CalibrateGripperCreate, calibration.CalibratePipetteCreate, calibration.CalibrateModuleCreate, @@ -611,6 +616,7 @@ thermocycler.OpenLidResult, thermocycler.CloseLidResult, thermocycler.RunProfileResult, + absorbance_reader.AbsorbanceMeasureResult, calibration.CalibrateGripperResult, calibration.CalibratePipetteResult, calibration.CalibrateModuleResult, diff --git a/api/src/opentrons/protocol_engine/execution/equipment.py b/api/src/opentrons/protocol_engine/execution/equipment.py index 7dc2f3bcfaaf..11e2e042d67a 100644 --- a/api/src/opentrons/protocol_engine/execution/equipment.py +++ b/api/src/opentrons/protocol_engine/execution/equipment.py @@ -21,6 +21,7 @@ HeaterShakerModuleId, TemperatureModuleId, ThermocyclerModuleId, + AbsorbanceReaderId, ) from ..errors import ( FailedToLoadPipetteError, @@ -488,6 +489,13 @@ def get_module_hardware_api( ) -> Optional[Thermocycler]: ... + @overload + def get_module_hardware_api( + self, + module_id: AbsorbanceReaderId, + ) -> Optional[Thermocycler]: + ... + def get_module_hardware_api(self, module_id: str) -> Optional[AbstractModule]: """Get the hardware API for a given module.""" use_virtual_modules = self._state_store.config.use_virtual_modules diff --git a/api/src/opentrons/protocol_engine/state/module_substates/absorbance_reader_substate.py b/api/src/opentrons/protocol_engine/state/module_substates/absorbance_reader_substate.py index 5b1cada15cc7..f694f798a71e 100644 --- a/api/src/opentrons/protocol_engine/state/module_substates/absorbance_reader_substate.py +++ b/api/src/opentrons/protocol_engine/state/module_substates/absorbance_reader_substate.py @@ -1,12 +1,7 @@ """Heater-Shaker Module sub-state.""" from dataclasses import dataclass -from typing import NewType, List +from typing import List, NewType, Optional -from opentrons.protocol_engine.types import ( - TemperatureRange, - SpeedRange, - HeaterShakerLatchStatus, -) AbsorbanceReaderId = NewType("AbsorbanceReaderId", str) @@ -16,7 +11,10 @@ class AbsorbanceReaderSubState: """Absorbance-Plate-Reader-specific state.""" module_id: AbsorbanceReaderId + initialized: bool is_lid_open: bool is_loaded: bool - sample_wavelength: int - supported_wavelengths: List[int] + is_measuring: bool + temperature: float + sample_wavelength: Optional[int] + supported_wavelengths: Optional[List[int]] diff --git a/api/src/opentrons/protocol_engine/state/modules.py b/api/src/opentrons/protocol_engine/state/modules.py index 0e79dd53cf25..6a4c93ce1e84 100644 --- a/api/src/opentrons/protocol_engine/state/modules.py +++ b/api/src/opentrons/protocol_engine/state/modules.py @@ -54,14 +54,17 @@ heater_shaker, temperature_module, thermocycler, + absorbance_reader, ) from ..actions import Action, SucceedCommandAction, AddModuleAction from .abstract_store import HasState, HandlesActions from .module_substates import ( + AbsorbanceReaderSubState, MagneticModuleSubState, HeaterShakerModuleSubState, TemperatureModuleSubState, ThermocyclerModuleSubState, + AbsorbanceReaderId, MagneticModuleId, HeaterShakerModuleId, TemperatureModuleId, @@ -321,6 +324,18 @@ def _add_module_substate( self._state.substate_by_module_id[module_id] = MagneticBlockSubState( module_id=MagneticBlockId(module_id) ) + elif ModuleModel.is_absorbance_reader_model(actual_model): + self._state.substate_by_module_id[module_id] = AbsorbanceReaderSubState( + module_id=AbsorbanceReaderId(module_id), + # TODO: fix live data + initialized=live_data is not None, + is_lid_open=live_data is not None, + is_loaded=live_data is not None, + is_measuring=live_data is not None, + temperature=live_data["temperature"] if live_data else None, # type: ignore[arg-type] + sample_wavelength=live_data["wavelength"] if live_data else None, # type: ignore[arg-type] + supported_wavelengths=live_data["supportedWavelengths"] if live_data else None, # type: ignore[arg-type] + ) def _update_additional_slots_occupied_by_thermocycler( self, @@ -420,6 +435,29 @@ def _handle_heater_shaker_commands( plate_target_temperature=prev_state.plate_target_temperature, ) + def _handle_absorbance_reader_commands( + self, command: absorbance_reader.AbsorbanceMeasure + ) -> None: + module_id = command.params.moduleId + abs_state = self._state.substate_by_module_id[module_id] + assert isinstance( + abs_state, AbsorbanceReaderSubState + ), f"{module_id} is not a absorbance reader." + + prev_state: AbsorbanceReaderSubState = abs_state + + if isinstance(command.result, absorbance_reader.AbsorbanceMeasureResult): + self._state.substate_by_module_id[module_id] = AbsorbanceReaderSubState( + module_id=AbsorbanceReaderId(module_id), + initialized=prev_state.initialized, + is_lid_open=prev_state.is_lid_open, + is_loaded=prev_state.is_loaded, + is_measuring=False, + temperature=prev_state.temperature, + sample_wavelength=None, + supported_wavelengths=[], + ) + def _handle_temperature_module_commands( self, command: Union[ @@ -644,6 +682,22 @@ def get_thermocycler_module_substate( expected_name="Thermocycler Module", ) + def get_absorbance_reader_substate( + self, module_id: str + ) -> AbsorbanceReaderSubState: + """Return a `AbsorbanceReaderSubState` for the given Absorbance Reader. + + Raises: + ModuleNotLoadedError: If module_id has not been loaded. + WrongModuleTypeError: If module_id has been loaded, + but it's not a Absorbance Reader. + """ + return self._get_module_substate( + module_id=module_id, + expected_type=AbsorbanceReaderSubState, + expected_name="Absorbance Reader", + ) + def get_location(self, module_id: str) -> DeckSlotLocation: """Get the slot location of the given module.""" location = self.get(module_id).location diff --git a/api/src/opentrons/protocol_engine/types.py b/api/src/opentrons/protocol_engine/types.py index 13e9515e4479..9bb334b2f3f9 100644 --- a/api/src/opentrons/protocol_engine/types.py +++ b/api/src/opentrons/protocol_engine/types.py @@ -306,6 +306,7 @@ class ModuleModel(str, Enum): THERMOCYCLER_MODULE_V2 = "thermocyclerModuleV2" HEATER_SHAKER_MODULE_V1 = "heaterShakerModuleV1" MAGNETIC_BLOCK_V1 = "magneticBlockV1" + ABSORBANCE_READER_V1 = "absorbanceReaderV1" def as_type(self) -> ModuleType: """Get the ModuleType of this model.""" @@ -319,6 +320,8 @@ def as_type(self) -> ModuleType: return ModuleType.HEATER_SHAKER elif ModuleModel.is_magnetic_block(self): return ModuleType.MAGNETIC_BLOCK + elif ModuleModel.is_absorbance_reader_model(self): + return ModuleType.ABSORBANCE_READER assert False, f"Invalid ModuleModel {self}" @@ -355,6 +358,13 @@ def is_magnetic_block(cls, model: ModuleModel) -> TypeGuard[MagneticBlockModel]: """Whether a given model is a Magnetic block.""" return model == cls.MAGNETIC_BLOCK_V1 + @classmethod + def is_absorbance_reader_model( + cls, model: ModuleModel + ) -> TypeGuard[AbsorbanceReaderModel]: + """Whether a given model is a Absorbance reader.""" + return model == cls.ABSORBANCE_READER_V1 + TemperatureModuleModel = Literal[ ModuleModel.TEMPERATURE_MODULE_V1, ModuleModel.TEMPERATURE_MODULE_V2 @@ -367,6 +377,7 @@ def is_magnetic_block(cls, model: ModuleModel) -> TypeGuard[MagneticBlockModel]: ] HeaterShakerModuleModel = Literal[ModuleModel.HEATER_SHAKER_MODULE_V1] MagneticBlockModel = Literal[ModuleModel.MAGNETIC_BLOCK_V1] +AbsorbanceReaderModel = Literal[ModuleModel.ABSORBANCE_READER_V1] class ModuleDimensions(BaseModel): diff --git a/shared-data/python/opentrons_shared_data/module/dev_types.py b/shared-data/python/opentrons_shared_data/module/dev_types.py index 972816b6a7b6..c6dbbc4acad0 100644 --- a/shared-data/python/opentrons_shared_data/module/dev_types.py +++ b/shared-data/python/opentrons_shared_data/module/dev_types.py @@ -18,7 +18,7 @@ ThermocyclerModuleType = Literal["thermocyclerModuleType"] HeaterShakerModuleType = Literal["heaterShakerModuleType"] MagneticBlockType = Literal["magneticBlockType"] -AbsorbanceReaderType = Literal["AbsorbanceReaderType"] +AbsorbanceReaderType = Literal["absorbanceReaderType"] ModuleType = Union[ MagneticModuleType,