Skip to content

Commit

Permalink
feat(api): Validate plate reader status using live data hookups for e…
Browse files Browse the repository at this point in the history
…ngine and introduce lid status to the PAPI (#15872)

Closes: PLAT-208 PLAT-386
Ensure lid status utilizes live data, can no op open() and close() commands, and expose `is_lid_on()` to PAPI for use in protocols
  • Loading branch information
CaseyBatten authored Aug 14, 2024
1 parent 28fde62 commit b9daa39
Show file tree
Hide file tree
Showing 11 changed files with 219 additions and 82 deletions.
2 changes: 1 addition & 1 deletion api/docs/v2/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@
("py:class", r".*protocol_api\.config.*"),
("py:class", r".*opentrons_shared_data.*"),
("py:class", r".*protocol_api._parameters.Parameters.*"),
("py:class", r".*AbsorbanceReaderContext"), # shh it's a secret (for now)
("py:class", r".*AbsorbanceReaderContext"),
("py:class", r".*RobotContext"), # shh it's a secret (for now)
("py:class", r'.*AbstractLabware|APIVersion|LabwareLike|LoadedCoreMap|ModuleTypes|NoneType|OffDeckType|ProtocolCore|WellCore'), # laundry list of not fully qualified things
]
1 change: 1 addition & 0 deletions api/src/opentrons/drivers/absorbance_reader/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ async def get_available_wavelengths(self) -> List[int]:
return await self._connection.get_supported_wavelengths()

async def get_single_measurement(self, wavelength: int) -> List[float]:
# TODO (cb, 08-02-2024): The list of measurements for 96 wells is rotated 180 degrees (well A1 is where well H12 should be) this must be corrected
return await self._connection.get_single_measurement(wavelength)

async def initialize_measurement(self, wavelength: int) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,11 +204,11 @@ def status(self) -> AbsorbanceReaderStatus:

@property
def lid_status(self) -> AbsorbanceReaderLidStatus:
return AbsorbanceReaderLidStatus.UNKNOWN
return self._reader.lid_status

@property
def plate_presence(self) -> AbsorbanceReaderPlatePresence:
return AbsorbanceReaderPlatePresence.UNKNOWN
return self._reader.plate_presence

@property
def device_info(self) -> Mapping[str, str]:
Expand Down Expand Up @@ -321,6 +321,11 @@ async def get_current_wavelength(self) -> None:
"""Get the Absorbance Reader's current active wavelength."""
pass # TODO: implement

async def get_current_lid_status(self) -> AbsorbanceReaderLidStatus:
"""Get the Absorbance Reader's current lid status."""
await self._reader.get_lid_status()
return self._reader.lid_status

def _enter_error_state(self, error: Exception) -> None:
self._error = str(error)
if isinstance(error, AbsorbanceReaderDisconnectedError):
Expand Down
7 changes: 7 additions & 0 deletions api/src/opentrons/protocol_api/core/engine/module_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -562,3 +562,10 @@ def open_lid(self) -> None:
moduleId=self.module_id,
)
)

def is_lid_on(self) -> bool:
"""Returns True if the Absorbance Reader's lid is currently on the Reader slot."""
abs_state = self._engine_client.state.modules.get_absorbance_reader_substate(
self.module_id
)
return abs_state.is_lid_on
4 changes: 4 additions & 0 deletions api/src/opentrons/protocol_api/core/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,3 +369,7 @@ def close_lid(self) -> None:
@abstractmethod
def open_lid(self) -> None:
"""Open the Absorbance Reader's lid."""

@abstractmethod
def is_lid_on(self) -> bool:
"""Return True if the Absorbance Reader's lid is currently closed."""
7 changes: 6 additions & 1 deletion api/src/opentrons/protocol_api/module_contexts.py
Original file line number Diff line number Diff line change
Expand Up @@ -994,9 +994,14 @@ def close_lid(self) -> None:

@requires_version(2, 21)
def open_lid(self) -> None:
"""Close the lid of the Absorbance Reader."""
"""Open the lid of the Absorbance Reader."""
self._core.open_lid()

@requires_version(2, 21)
def is_lid_on(self) -> bool:
"""Return ``True`` if the Absorbance Reader's lid is currently closed."""
return self._core.is_lid_on()

@requires_version(2, 21)
def initialize(self, wavelength: int) -> None:
"""Initialize the Absorbance Reader by taking zero reading."""
Expand Down
8 changes: 8 additions & 0 deletions api/src/opentrons/protocol_api/protocol_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
RobotTypeError,
UnsupportedAPIError,
)
from opentrons_shared_data.errors.exceptions import CommandPreconditionViolated

from ._types import OffDeckType
from .core.common import ModuleCore, LabwareCore, ProtocolCore
Expand Down Expand Up @@ -707,6 +708,13 @@ def move_labware(
f"Expected labware of type 'Labware' but got {type(labware)}."
)

# Ensure that when moving to an absorbance reader than the lid is open
if isinstance(new_location, AbsorbanceReaderContext):
if new_location.is_lid_on():
raise CommandPreconditionViolated(
f"Cannot move {labware.name} onto the Absorbance Reader Module when its lid is closed."
)

location: Union[
ModuleCore,
LabwareCore,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,14 @@

from ..command import AbstractCommandImpl, BaseCommand, BaseCommandCreate, SuccessData
from ...errors.error_occurrence import ErrorOccurrence
from ...errors import CannotPerformModuleAction
from opentrons.protocol_engine.types import AddressableAreaLocation

from opentrons.protocol_engine.resources import labware_validation
from .types import MoveLidResult

from opentrons.drivers.types import AbsorbanceReaderLidStatus

if TYPE_CHECKING:
from opentrons.protocol_engine.state import StateView
from opentrons.protocol_engine.execution import (
Expand Down Expand Up @@ -55,59 +59,79 @@ async def execute(
mod_substate = self._state_view.modules.get_absorbance_reader_substate(
module_id=params.moduleId
)
# Make sure the lid is open
mod_substate.raise_if_lid_status_not_expected(lid_on_expected=False)

# Allow propagation of ModuleNotAttachedError.
_ = self._equipment.get_module_hardware_api(mod_substate.module_id)

# lid should currently be docked
# lid should currently be on the module
assert mod_substate.lid_id is not None
loaded_lid = self._state_view.labware.get(mod_substate.lid_id)
assert labware_validation.is_absorbance_reader_lid(loaded_lid.loadName)

current_location = loaded_lid.location
validated_current_location = (
self._state_view.geometry.ensure_valid_gripper_location(current_location)
)
hardware_lid_status = AbsorbanceReaderLidStatus.OFF
# If the lid is closed, if the lid is open No-op out
if not self._state_view.config.use_virtual_modules:
abs_reader = self._equipment.get_module_hardware_api(mod_substate.module_id)

if abs_reader is not None:
result = await abs_reader.get_current_lid_status()
hardware_lid_status = result
else:
raise CannotPerformModuleAction(
"Could not reach the Hardware API for Opentrons Plate Reader Module."
)

# If the lid is already ON, no-op losing lid
if hardware_lid_status is AbsorbanceReaderLidStatus.ON:
# The lid is already On, so we can no-op and return the lids current location data
assert isinstance(loaded_lid.location, AddressableAreaLocation)
new_location = loaded_lid.location
new_offset_id = self._equipment.find_applicable_labware_offset_id(
labware_definition_uri=loaded_lid.definitionUri,
labware_location=loaded_lid.location,
)
else:
# Allow propagation of ModuleNotAttachedError.
_ = self._equipment.get_module_hardware_api(mod_substate.module_id)

# we need to move the lid onto the module reader
absorbance_model = self._state_view.modules.get_requested_model(params.moduleId)
assert absorbance_model is not None
new_location = AddressableAreaLocation(
addressableAreaName=self._state_view.modules.ensure_and_convert_module_fixture_location(
deck_slot=self._state_view.modules.get_location(
params.moduleId
).slotName,
deck_type=self._state_view.config.deck_type,
model=absorbance_model,
current_location = self._state_view.modules.absorbance_reader_dock_location(
params.moduleId
)
)
validated_new_location = (
self._state_view.geometry.ensure_valid_gripper_location(new_location)
)

lid_gripper_offsets = self._state_view.labware.get_labware_gripper_offsets(
loaded_lid.id, None
)
if lid_gripper_offsets is None:
raise ValueError(
"Gripper Offset values for Absorbance Reader Lid labware must not be None."
# we need to move the lid onto the module reader
absorbance_model = self._state_view.modules.get_requested_model(
params.moduleId
)
assert absorbance_model is not None
new_location = AddressableAreaLocation(
addressableAreaName=self._state_view.modules.ensure_and_convert_module_fixture_location(
deck_slot=self._state_view.modules.get_location(
params.moduleId
).slotName,
deck_type=self._state_view.config.deck_type,
model=absorbance_model,
)
)

# Skips gripper moves when using virtual gripper
await self._labware_movement.move_labware_with_gripper(
labware_id=loaded_lid.id,
current_location=validated_current_location,
new_location=validated_new_location,
user_offset_data=lid_gripper_offsets,
post_drop_slide_offset=None,
)
lid_gripper_offsets = self._state_view.labware.get_labware_gripper_offsets(
loaded_lid.id, None
)
if lid_gripper_offsets is None:
raise ValueError(
"Gripper Offset values for Absorbance Reader Lid labware must not be None."
)

# Skips gripper moves when using virtual gripper
await self._labware_movement.move_labware_with_gripper(
labware_id=loaded_lid.id,
current_location=current_location,
new_location=new_location,
user_offset_data=lid_gripper_offsets,
post_drop_slide_offset=None,
)

new_offset_id = self._equipment.find_applicable_labware_offset_id(
labware_definition_uri=loaded_lid.definitionUri,
labware_location=new_location,
)

new_offset_id = self._equipment.find_applicable_labware_offset_id(
labware_definition_uri=loaded_lid.definitionUri,
labware_location=new_location,
)
return SuccessData(
public=CloseLidResult(
lidId=loaded_lid.id, newLocation=new_location, offsetId=new_offset_id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,13 @@

from ..command import AbstractCommandImpl, BaseCommand, BaseCommandCreate, SuccessData
from ...errors.error_occurrence import ErrorOccurrence
from ...errors import CannotPerformModuleAction

from .types import MoveLidResult
from opentrons.protocol_engine.resources import labware_validation
from opentrons.protocol_engine.types import AddressableAreaLocation

from opentrons.drivers.types import AbsorbanceReaderLidStatus

if TYPE_CHECKING:
from opentrons.protocol_engine.state import StateView
Expand Down Expand Up @@ -50,54 +55,80 @@ async def execute(self, params: OpenLidParams) -> SuccessData[OpenLidResult, Non
mod_substate = self._state_view.modules.get_absorbance_reader_substate(
module_id=params.moduleId
)
# Make sure the lid is closed
mod_substate.raise_if_lid_status_not_expected(lid_on_expected=True)

# Allow propagation of ModuleNotAttachedError.
_ = self._equipment.get_module_hardware_api(mod_substate.module_id)

# lid should currently be on the module
assert mod_substate.lid_id is not None
loaded_lid = self._state_view.labware.get(mod_substate.lid_id)
assert labware_validation.is_absorbance_reader_lid(loaded_lid.loadName)

current_location = loaded_lid.location
validated_current_location = (
self._state_view.geometry.ensure_valid_gripper_location(current_location)
)
hardware_lid_status = AbsorbanceReaderLidStatus.ON
# If the lid is closed, if the lid is open No-op out
if not self._state_view.config.use_virtual_modules:
abs_reader = self._equipment.get_module_hardware_api(mod_substate.module_id)

if abs_reader is not None:
result = await abs_reader.get_current_lid_status()
hardware_lid_status = result
else:
raise CannotPerformModuleAction(
"Could not reach the Hardware API for Opentrons Plate Reader Module."
)

# If the lid is already OFF, no-op the lid removal
if hardware_lid_status is AbsorbanceReaderLidStatus.OFF:
assert isinstance(loaded_lid.location, AddressableAreaLocation)
new_location = loaded_lid.location
new_offset_id = self._equipment.find_applicable_labware_offset_id(
labware_definition_uri=loaded_lid.definitionUri,
labware_location=loaded_lid.location,
)
else:
# Allow propagation of ModuleNotAttachedError.
_ = self._equipment.get_module_hardware_api(mod_substate.module_id)

# we need to move the lid to the lid dock
new_location = self._state_view.modules.absorbance_reader_dock_location(
mod_substate.module_id
)
validated_new_location = (
self._state_view.geometry.ensure_valid_gripper_location(new_location)
)
absorbance_model = self._state_view.modules.get_requested_model(
params.moduleId
)
assert absorbance_model is not None
current_location = AddressableAreaLocation(
addressableAreaName=self._state_view.modules.ensure_and_convert_module_fixture_location(
deck_slot=self._state_view.modules.get_location(
params.moduleId
).slotName,
deck_type=self._state_view.config.deck_type,
model=absorbance_model,
)
)

lid_gripper_offsets = self._state_view.labware.get_labware_gripper_offsets(
loaded_lid.id, None
)
if lid_gripper_offsets is None:
raise ValueError(
"Gripper Offset values for Absorbance Reader Lid labware must not be None."
# we need to move the lid to the lid dock
new_location = self._state_view.modules.absorbance_reader_dock_location(
mod_substate.module_id
)

lid_gripper_offsets = self._state_view.labware.get_labware_gripper_offsets(
loaded_lid.id, None
)
if lid_gripper_offsets is None:
raise ValueError(
"Gripper Offset values for Absorbance Reader Lid labware must not be None."
)

# Skips gripper moves when using virtual gripper
await self._labware_movement.move_labware_with_gripper(
labware_id=loaded_lid.id,
current_location=current_location,
new_location=new_location,
user_offset_data=lid_gripper_offsets,
post_drop_slide_offset=None,
)
new_offset_id = self._equipment.find_applicable_labware_offset_id(
labware_definition_uri=loaded_lid.definitionUri,
labware_location=new_location,
)

# Skips gripper moves when using virtual gripper
await self._labware_movement.move_labware_with_gripper(
labware_id=loaded_lid.id,
current_location=validated_current_location,
new_location=validated_new_location,
user_offset_data=lid_gripper_offsets,
post_drop_slide_offset=None,
)
new_offset_id = self._equipment.find_applicable_labware_offset_id(
labware_definition_uri=loaded_lid.definitionUri,
labware_location=new_location,
)
return SuccessData(
public=OpenLidResult(
lidId=loaded_lid.id,
newLocation=validated_new_location,
newLocation=new_location,
offsetId=new_offset_id,
),
private=None,
Expand Down
Loading

0 comments on commit b9daa39

Please sign in to comment.