Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
sanni-t committed Feb 7, 2024
1 parent 8622a64 commit be6111a
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 136 deletions.
24 changes: 21 additions & 3 deletions api/src/opentrons/motion_planning/adjacent_slots_getters.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
"""Getters for specific adjacent slots."""

from dataclasses import dataclass
from typing import Optional, List, Dict, Union

from opentrons_shared_data.robot.dev_types import RobotType

from opentrons.types import DeckSlotName, StagingSlotName


Expand Down Expand Up @@ -73,7 +75,13 @@ def get_south_east_slot(slot: int) -> Optional[int]:
return south_slot + 1 if south_slot else None


def get_surrounding_slots(slot: int) -> List[int]:
@dataclass
class _MixedTypeSlots:
regular_slots: List[DeckSlotName]
staging_slots: List[StagingSlotName]


def get_surrounding_slots(slot: int, robot_type: RobotType) -> _MixedTypeSlots:
"""Get all the surrounding slots, i.e., adjacent slots as well as corner slots."""
corner_slots: List[Union[int, None]] = [
get_north_west_slot(slot),
Expand All @@ -82,9 +90,19 @@ def get_surrounding_slots(slot: int) -> List[int]:
get_south_east_slot(slot),
]

return get_adjacent_slots(slot) + [
surrounding_regular_slots_int = get_adjacent_slots(slot) + [
maybe_slot for maybe_slot in corner_slots if maybe_slot is not None
]
surrounding_regular_slots = [
DeckSlotName.from_primitive(slot_int).to_equivalent_for_robot_type(robot_type)
for slot_int in surrounding_regular_slots_int
]
surrounding_staging_slots = _SURROUNDING_STAGING_SLOTS_MAP.get(
DeckSlotName.from_primitive(slot).to_equivalent_for_robot_type(robot_type), []
)
return _MixedTypeSlots(
regular_slots=surrounding_regular_slots, staging_slots=surrounding_staging_slots
)


_WEST_OF_STAGING_SLOT_MAP: Dict[StagingSlotName, DeckSlotName] = {
Expand Down
62 changes: 34 additions & 28 deletions api/src/opentrons/protocol_api/core/engine/deck_conflict.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,15 @@
overload,
Union,
TYPE_CHECKING,
List,
)

from opentrons_shared_data.errors.exceptions import MotionPlanningFailureError

from opentrons.hardware_control.nozzle_manager import NozzleConfigurationType
from opentrons.hardware_control.modules.types import ModuleType
from opentrons.motion_planning import deck_conflict as wrapped_deck_conflict
from opentrons.motion_planning.adjacent_slots_getters import (
get_surrounding_slots,
get_surrounding_staging_slots,
)
from opentrons.motion_planning import adjacent_slots_getters

from opentrons.protocol_engine import (
StateView,
DeckSlotLocation,
Expand Down Expand Up @@ -198,7 +195,7 @@ def check(
)


def check_safe_for_pipette_movement(
def check_safe_for_pipette_movement( # noqa: C901
engine_state: StateView,
pipette_id: str,
labware_id: str,
Expand Down Expand Up @@ -237,7 +234,7 @@ def check_safe_for_pipette_movement(
engine_state=engine_state, pipette_id=pipette_id, location=well_location_point
):
raise PartialTipMovementNotAllowedError(
f"Requested motion with the {primary_nozzle} nozzle column configuration"
f"Requested motion with the {primary_nozzle} nozzle partial configuration"
f" is outside of robot bounds for the pipette."
)

Expand All @@ -248,14 +245,14 @@ def check_safe_for_pipette_movement(
destination_position=well_location_point,
)
)

surrounding_regular_slots = get_surrounding_slots(labware_slot.as_int())
surrounding_staging_slots = get_surrounding_staging_slots(labware_slot)
surrounding_slots = adjacent_slots_getters.get_surrounding_slots(
slot=labware_slot.as_int(), robot_type=engine_state.config.robot_type
)

def _check_conflict_with_slot_item(
surrounding_slot: Union[DeckSlotName, StagingSlotName],
) -> None:
"""Raises error if the pipette is expected to collide with adjacent slot items."""
"""Raises error if the pipette is expected to collide with surrounding slot items."""
# Check if slot overlaps with pipette position
slot_pos = engine_state.addressable_areas.get_addressable_area_position(
addressable_area_name=surrounding_slot.id,
Expand All @@ -266,37 +263,44 @@ def _check_conflict_with_slot_item(
do_compatibility_check=False,
)
for bound_vertex in pipette_bounds_at_well_location:
if not (
slot_pos.x < bound_vertex.x < slot_pos.x + slot_bounds.x
and slot_pos.y < bound_vertex.y < slot_pos.y + slot_bounds.y
if not _point_overlaps_with_slot(
slot_pos, slot_bounds, nozzle_point=bound_vertex
):
continue

slot_location: Union[DeckSlotLocation, StagingSlotLocation]
# Check z-height of items in overlapping slot
if isinstance(surrounding_slot, DeckSlotName):
slot_location = DeckSlotLocation(slotName=surrounding_slot)
slot_highest_z = engine_state.geometry.get_highest_z_in_slot(
DeckSlotLocation(slotName=surrounding_slot)
)
else:
slot_location = StagingSlotLocation(slotName=surrounding_slot)
slot_highest_z = engine_state.geometry.get_highest_z_in_slot(slot_location)

slot_highest_z = engine_state.geometry.get_highest_z_in_slot(
StagingSlotLocation(slotName=surrounding_slot)
)
if slot_highest_z + Z_SAFETY_MARGIN > pipette_bounds_at_well_location[0].z:
raise PartialTipMovementNotAllowedError(
f"Moving to {engine_state.labware.get_display_name(labware_id)} in slot"
f" {labware_slot} with pipette column {primary_nozzle} nozzle configuration"
f" {labware_slot} with {primary_nozzle} nozzle partial configuration"
f" will result in collision with items in deck slot {surrounding_slot}."
)

for slot in surrounding_regular_slots:
_check_conflict_with_slot_item(
DeckSlotName.from_primitive(slot).to_equivalent_for_robot_type(
engine_state.config.robot_type
)
)
for staging_slot in surrounding_staging_slots:
for regular_slot in surrounding_slots.regular_slots:
_check_conflict_with_slot_item(regular_slot)
for staging_slot in surrounding_slots.staging_slots:
_check_conflict_with_slot_item(staging_slot)


def _point_overlaps_with_slot(
slot_position: Point,
slot_dimensions: Dimensions,
nozzle_point: Point,
) -> bool:
"""Check if the given nozzle point overlaps with any slot area in x & y"""
return (
slot_position.x <= nozzle_point.x <= slot_position.x + slot_dimensions.x
and slot_position.y <= nozzle_point.y <= slot_position.y + slot_dimensions.y
)


def check_safe_for_tip_pickup_and_return(
engine_state: StateView,
pipette_id: str,
Expand Down Expand Up @@ -351,6 +355,8 @@ def check_safe_for_tip_pickup_and_return(
)


# TODO (spp, 2023-02-06): update the extents check to use all nozzle bounds instead of
# just position of primary nozzle when checking if the pipette is out-of-bounds
def _is_within_pipette_extents(
engine_state: StateView,
pipette_id: str,
Expand Down
Loading

0 comments on commit be6111a

Please sign in to comment.