diff --git a/api/src/opentrons/hardware_control/backends/flex_protocol.py b/api/src/opentrons/hardware_control/backends/flex_protocol.py index 53efde79a23..7bd2969de6b 100644 --- a/api/src/opentrons/hardware_control/backends/flex_protocol.py +++ b/api/src/opentrons/hardware_control/backends/flex_protocol.py @@ -383,7 +383,9 @@ async def capacitive_pass( def subsystems(self) -> Dict[SubSystem, SubSystemState]: ... - async def get_tip_status(self, mount: OT3Mount) -> TipStateType: + async def get_tip_status( + self, mount: OT3Mount, ht_operation_sensor: Optional[InstrumentProbeType] = None + ) -> TipStateType: ... def current_tip_state(self, mount: OT3Mount) -> Optional[bool]: diff --git a/api/src/opentrons/hardware_control/backends/ot3controller.py b/api/src/opentrons/hardware_control/backends/ot3controller.py index 9316fb67e90..ea0b610f8b4 100644 --- a/api/src/opentrons/hardware_control/backends/ot3controller.py +++ b/api/src/opentrons/hardware_control/backends/ot3controller.py @@ -1521,8 +1521,14 @@ async def update_tip_detector(self, mount: OT3Mount, sensor_count: int) -> None: async def teardown_tip_detector(self, mount: OT3Mount) -> None: await self._tip_presence_manager.clear_detector(mount) - async def get_tip_status(self, mount: OT3Mount) -> TipStateType: - return await self.tip_presence_manager.get_tip_status(mount) + async def get_tip_status( + self, + mount: OT3Mount, + ht_operational_sensor: Optional[InstrumentProbeType] = None, + ) -> TipStateType: + return await self.tip_presence_manager.get_tip_status( + mount, ht_operational_sensor + ) def current_tip_state(self, mount: OT3Mount) -> Optional[bool]: return self.tip_presence_manager.current_tip_state(mount) diff --git a/api/src/opentrons/hardware_control/backends/ot3simulator.py b/api/src/opentrons/hardware_control/backends/ot3simulator.py index b96be54026e..26d6237e9a3 100644 --- a/api/src/opentrons/hardware_control/backends/ot3simulator.py +++ b/api/src/opentrons/hardware_control/backends/ot3simulator.py @@ -780,7 +780,11 @@ def subsystems(self) -> Dict[SubSystem, SubSystemState]: for axis in self._present_axes } - async def get_tip_status(self, mount: OT3Mount) -> TipStateType: + async def get_tip_status( + self, + mount: OT3Mount, + ht_operational_sensor: Optional[InstrumentProbeType] = None, + ) -> TipStateType: return TipStateType(self._sim_tip_state[mount]) def current_tip_state(self, mount: OT3Mount) -> Optional[bool]: diff --git a/api/src/opentrons/hardware_control/backends/tip_presence_manager.py b/api/src/opentrons/hardware_control/backends/tip_presence_manager.py index 9d2be3901da..0e46d713955 100644 --- a/api/src/opentrons/hardware_control/backends/tip_presence_manager.py +++ b/api/src/opentrons/hardware_control/backends/tip_presence_manager.py @@ -3,7 +3,7 @@ from typing import cast, Callable, Optional, List, Set from typing_extensions import TypedDict, Literal -from opentrons.hardware_control.types import TipStateType, OT3Mount +from opentrons.hardware_control.types import TipStateType, OT3Mount, InstrumentProbeType from opentrons_hardware.drivers.can_bus import CanMessenger from opentrons_hardware.firmware_bindings.constants import NodeId @@ -14,8 +14,11 @@ from opentrons_shared_data.errors.exceptions import ( TipDetectorNotFound, UnmatchedTipPresenceStates, + GeneralError, ) +from .ot3utils import sensor_id_for_instrument + log = logging.getLogger(__name__) TipListener = Callable[[OT3Mount, bool], None] @@ -111,7 +114,24 @@ def current_tip_state(self, mount: OT3Mount) -> Optional[bool]: return state @staticmethod - def _get_tip_presence(results: List[tip_types.TipNotification]) -> TipStateType: + def _get_tip_presence( + results: List[tip_types.TipNotification], + ht_operational_sensor: Optional[InstrumentProbeType] = None, + ) -> TipStateType: + """ + We can use ht_operational_sensor used to specify that we only care + about the status of one tip presence sensor on a high throughput + pipette, and the other is allowed to be different. + """ + if ht_operational_sensor: + target_sensor_id = sensor_id_for_instrument(ht_operational_sensor) + for r in results: + if r.sensor == target_sensor_id: + return TipStateType(r.presence) + # raise an error if requested sensor response isn't found + raise GeneralError( + message=f"Requested status for sensor {ht_operational_sensor} not found." + ) # more than one sensor reported, we have to check if their states match if len(set(r.presence for r in results)) > 1: raise UnmatchedTipPresenceStates( @@ -119,9 +139,15 @@ def _get_tip_presence(results: List[tip_types.TipNotification]) -> TipStateType: ) return TipStateType(results[0].presence) - async def get_tip_status(self, mount: OT3Mount) -> TipStateType: + async def get_tip_status( + self, + mount: OT3Mount, + ht_operational_sensor: Optional[InstrumentProbeType] = None, + ) -> TipStateType: detector = self.get_detector(mount) - return self._get_tip_presence(await detector.request_tip_status()) + return self._get_tip_presence( + await detector.request_tip_status(), ht_operational_sensor + ) def get_detector(self, mount: OT3Mount) -> TipDetector: detector = self._detectors[self._get_key(mount)] diff --git a/api/src/opentrons/hardware_control/ot3api.py b/api/src/opentrons/hardware_control/ot3api.py index 93763876575..2aa170df489 100644 --- a/api/src/opentrons/hardware_control/ot3api.py +++ b/api/src/opentrons/hardware_control/ot3api.py @@ -2078,6 +2078,7 @@ async def _high_throughput_check_tip(self) -> AsyncIterator[None]: async def get_tip_presence_status( self, mount: Union[top_types.Mount, OT3Mount], + ht_operational_sensor: Optional[InstrumentProbeType] = None, ) -> TipStateType: """ Check tip presence status. If a high throughput pipette is present, @@ -2091,14 +2092,19 @@ async def get_tip_presence_status( and self._gantry_load == GantryLoad.HIGH_THROUGHPUT ): await stack.enter_async_context(self._high_throughput_check_tip()) - result = await self._backend.get_tip_status(real_mount) + result = await self._backend.get_tip_status( + real_mount, ht_operational_sensor + ) return result async def verify_tip_presence( - self, mount: Union[top_types.Mount, OT3Mount], expected: TipStateType + self, + mount: Union[top_types.Mount, OT3Mount], + expected: TipStateType, + ht_operational_sensor: Optional[InstrumentProbeType] = None, ) -> None: real_mount = OT3Mount.from_mount(mount) - status = await self.get_tip_presence_status(real_mount) + status = await self.get_tip_presence_status(real_mount, ht_operational_sensor) if status != expected: raise FailedTipStateCheck(expected, status.value) diff --git a/api/tests/opentrons/hardware_control/backends/test_ot3_tip_presence_manager.py b/api/tests/opentrons/hardware_control/backends/test_ot3_tip_presence_manager.py index 543f7b3b400..6ea39738fc2 100644 --- a/api/tests/opentrons/hardware_control/backends/test_ot3_tip_presence_manager.py +++ b/api/tests/opentrons/hardware_control/backends/test_ot3_tip_presence_manager.py @@ -2,7 +2,7 @@ from typing import AsyncIterator, Dict from decoy import Decoy -from opentrons.hardware_control.types import OT3Mount, TipStateType +from opentrons.hardware_control.types import OT3Mount, TipStateType, InstrumentProbeType from opentrons.hardware_control.backends.tip_presence_manager import TipPresenceManager from opentrons_hardware.hardware_control.tip_presence import ( TipDetector, @@ -110,6 +110,51 @@ async def test_get_tip_status_for_high_throughput( result == expected_type +@pytest.mark.parametrize( + "tip_presence,expected_type,sensor_to_look_at", + [ + ( + {SensorId.S0: False, SensorId.S1: False}, + TipStateType.ABSENT, + InstrumentProbeType.PRIMARY, + ), + ( + {SensorId.S0: True, SensorId.S1: True}, + TipStateType.PRESENT, + InstrumentProbeType.SECONDARY, + ), + ( + {SensorId.S0: False, SensorId.S1: True}, + TipStateType.ABSENT, + InstrumentProbeType.PRIMARY, + ), + ( + {SensorId.S0: False, SensorId.S1: True}, + TipStateType.PRESENT, + InstrumentProbeType.SECONDARY, + ), + ], +) +async def test_allow_different_tip_states_ht( + subject: TipPresenceManager, + tip_detector_controller: TipDetectorController, + tip_presence: Dict[SensorId, bool], + expected_type: TipStateType, + sensor_to_look_at: InstrumentProbeType, +) -> None: + mount = OT3Mount.LEFT + await tip_detector_controller.retrieve_tip_status_highthroughput(tip_presence) + + result = await subject.get_tip_status(mount, sensor_to_look_at) + result == expected_type + + # if sensor_to_look_at is not used, different tip states + # should result in an UnmatchedTipStates error + if len(set(tip_presence[t] for t in tip_presence)) > 1: + with pytest.raises(UnmatchedTipPresenceStates): + result = await subject.get_tip_status(mount) + + @pytest.mark.parametrize( "tip_presence", [