Skip to content

Commit

Permalink
Addition of full instrument critical points
Browse files Browse the repository at this point in the history
  • Loading branch information
CaseyBatten committed Feb 5, 2024
1 parent 6957feb commit a6e9429
Show file tree
Hide file tree
Showing 9 changed files with 70 additions and 31 deletions.
45 changes: 42 additions & 3 deletions api/src/opentrons/hardware_control/nozzle_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ class NozzleMap:
configuration: NozzleConfigurationType
#: The kind of configuration this is

full_instrument_map_store: Dict[str, Point]
#: A map of all of the nozzles of an instrument
full_instrument_rows: Dict[str, List[str]]
#: A map of all the rows of an instrument

def __str__(self) -> str:
return f"back_left_nozzle: {self.back_left} front_right_nozzle: {self.front_right} configuration: {self.configuration}"

Expand All @@ -123,6 +128,23 @@ def front_right(self) -> str:
Note: This is the value relevant for this configuration, not the physical pipette. See the note on back_left.
"""
return next(reversed(list(self.rows.values())))[-1]

@property
def full_instrument_back_left(self) -> str:
"""The backest, leftest (i.e. back if it's a column, left if it's a row) nozzle of the full instrument.
Note: This value represents the back left nozzle of the underlying physical pipette. For instance,
the back-left nozzle of a 96-Channel pipette is A1.
"""
return next(iter(self.full_instrument_rows.values()))[0]

@property
def full_instrument_front_right(self) -> str:
"""The frontest, rightest (i.e. front if it's a column, right if it's a row) nozzle of the full instrument.
Note: This value represents the front right nozzle of the physical pipette. See the note on full_instrument_back_left.
"""
return next(reversed(list(self.full_instrument_rows.values())))[-1]

@property
def starting_nozzle_offset(self) -> Point:
Expand All @@ -133,13 +155,26 @@ def starting_nozzle_offset(self) -> Point:
def xy_center_offset(self) -> Point:
"""The position of the geometrical center of all nozzles in the configuration.
Note: This is the value relevant fro this configuration, not the physical pipette. See the note on back_left.
Note: This is the value relevant for this configuration, not the physical pipette. See the note on back_left.
"""
difference = self.map_store[self.front_right] - self.map_store[self.back_left]
difference = self.full_instrument_map_store[self.front_right] - self.full_instrument_map_store[self.back_left]
return self.map_store[self.back_left] + Point(
difference[0] / 2, difference[1] / 2, 0
)

@property
def instrument_xy_center_offset(self) -> Point:
"""The position of the geometrical center of all nozzles for the entire instrument.
Note: This the value reflects the center of the maximum number of nozzles of the physical pipette.
This would be the same as a full configuration.
"""
difference = self.full_instrument_map_store[self.full_instrument_front_right] - self.full_instrument_map_store[self.full_instrument_back_left]
return self.full_instrument_map_store[self.full_instrument_back_left] + Point(
difference[0] / 2, difference[1] / 2, 0
)


@property
def y_center_offset(self) -> Point:
"""The position in the center of the primary column of the map."""
Expand Down Expand Up @@ -169,6 +204,7 @@ def build(
starting_nozzle: str,
back_left_nozzle: str,
front_right_nozzle: str,

) -> "NozzleMap":
try:
back_left_row_index, back_left_column_index = _row_col_indices_for_nozzle(
Expand Down Expand Up @@ -216,6 +252,7 @@ def build(
(nozzle, physical_nozzles[nozzle]) for nozzle in chain(*rows.values())
)


return cls(
starting_nozzle=starting_nozzle,
map_store=map_store,
Expand Down Expand Up @@ -324,7 +361,9 @@ def critical_point_with_tip_length(
cp_override: Optional[CriticalPoint],
tip_length: float = 0.0,
) -> Point:
if cp_override == CriticalPoint.XY_CENTER:
if cp_override == CriticalPoint.INSTRUMENT_XY_CENTER:
current_nozzle = self._current_nozzle_configuration.
elif cp_override == CriticalPoint.XY_CENTER:
current_nozzle = self._current_nozzle_configuration.xy_center_offset
elif cp_override == CriticalPoint.Y_CENTER:
current_nozzle = self._current_nozzle_configuration.y_center_offset
Expand Down
7 changes: 7 additions & 0 deletions api/src/opentrons/hardware_control/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,13 @@ class CriticalPoint(enum.Enum):
point. This is the same as the GRIPPER_JAW_CENTER for grippers.
"""

INSTRUMENT_XY_CENTER = enum.auto()
"""
The INSTRUMENT_XY_CENTER means the critical point under consideration is
the XY center of the entire pipette, regardless of configuration.
No pipettes, single or multi, will change their instrument center point.
"""

FRONT_NOZZLE = enum.auto()
"""
The end of the front-most nozzle of a multipipette with a tip attached.
Expand Down
14 changes: 4 additions & 10 deletions api/src/opentrons/protocol_api/core/engine/deck_conflict.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,16 +325,10 @@ def _check_deck_conflict_for_96_channel( # noqa: C901
# TODO (spp, 2023-12-18): change this eventually to "column 1"/"column 12"
# via the column mappings in the pipette geometry definitions.
# if we are handling commands in the trash or in the waste chute, skip these checks
addressable_area = (
engine_state.addressable_areas.get_addressable_area_by_deck_slot_name(
labware_slot
)
)
if "movableTrash" not in addressable_area.area_name:
if primary_nozzle == "A12":
adjacent_slot_num = get_west_slot(destination_slot_num)
elif primary_nozzle == "A1":
adjacent_slot_num = get_east_slot(destination_slot_num)
if primary_nozzle == "A12":
adjacent_slot_num = get_west_slot(destination_slot_num)
elif primary_nozzle == "A1":
adjacent_slot_num = get_east_slot(destination_slot_num)

def _check_conflict_with_slot_item(
adjacent_slot: DeckSlotName,
Expand Down
1 change: 1 addition & 0 deletions api/src/opentrons/protocol_api/core/engine/instrument.py
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,7 @@ def _move_to_disposal_location(
speed=speed,
minimum_z_height=None,
alternate_drop_location=alternate_tip_drop,
ignore_tip_configuration=True,
)

if isinstance(disposal_location, WasteChute):
Expand Down
2 changes: 2 additions & 0 deletions api/src/opentrons/protocol_engine/clients/sync_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ def move_to_addressable_area_for_drop_tip(
force_direct: bool,
speed: Optional[float],
alternate_drop_location: Optional[bool],
ignore_tip_configuration: Optional[bool] = False,
) -> commands.MoveToAddressableAreaForDropTipResult:
"""Execute a MoveToAddressableArea command and return the result."""
request = commands.MoveToAddressableAreaForDropTipCreate(
Expand All @@ -231,6 +232,7 @@ def move_to_addressable_area_for_drop_tip(
minimumZHeight=minimum_z_height,
speed=speed,
alternateDropLocation=alternate_drop_location,
ignoreTipConfiguration=ignore_tip_configuration,
)
)
result = self._transport.execute_command(request=request)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,15 @@ class MoveToAddressableAreaForDropTipParams(PipetteIdMixin, MovementMixin):
" If False, the tip will be dropped at the top center of the area."
),
)
ignoreTipConfiguration: Optional[bool] = Field(
False,
description=(
"Whether to utilize the critical point of the tip configuraiton when moving to an addressable area."
" If True, this command will ignore the tip configuration and use the center of the entire instrument"
" of the entire instrument as the critical point for movement."
" If False, this command will use the critical point provided by the current tip configuration."
),
)


class MoveToAddressableAreaForDropTipResult(DestinationPositionResult):
Expand Down
2 changes: 2 additions & 0 deletions api/src/opentrons/protocol_engine/execution/movement.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ async def move_to_addressable_area(
minimum_z_height: Optional[float] = None,
speed: Optional[float] = None,
stay_at_highest_possible_z: bool = False,
ignore_tip_configuration: Optional[bool] = False,
) -> Point:
"""Move to a specific addressable area."""
# Check for presence of heater shakers on deck, and if planned
Expand Down Expand Up @@ -193,6 +194,7 @@ async def move_to_addressable_area(
force_direct=force_direct,
minimum_z_height=minimum_z_height,
stay_at_max_travel_z=stay_at_highest_possible_z,
ignore_tip_configuration=ignore_tip_configuration,
)

speed = self._state_store.pipettes.get_movement_speed(
Expand Down
16 changes: 0 additions & 16 deletions api/src/opentrons/protocol_engine/state/addressable_areas.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,22 +331,6 @@ def get_addressable_area(self, addressable_area_name: str) -> AddressableArea:
else:
return self._get_addressable_area_from_deck_data(addressable_area_name)

def get_addressable_area_by_deck_slot_name(
self, slot_name: DeckSlotName
) -> AddressableArea:
"""Get addressable area by Deck Slot Name."""
areas = self.get_all()
for area in areas:
if self.get_addressable_area_base_slot(area) == slot_name:
if not self._state.use_simulated_deck_config:
return self._get_loaded_addressable_area(area)
else:
return self._get_addressable_area_from_deck_data(area)

raise AddressableAreaDoesNotExistError(
f"No Addressable Area could be found for provided slot {slot_name}"
)

def get_all(self) -> List[str]:
"""Get a list of all loaded addressable area names."""
return list(self._state.loaded_addressable_areas_by_name)
Expand Down
5 changes: 3 additions & 2 deletions api/src/opentrons/protocol_engine/state/motion.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ def get_movement_waypoints_to_addressable_area(
force_direct: bool = False,
minimum_z_height: Optional[float] = None,
stay_at_max_travel_z: bool = False,
ignore_tip_configuration: Optional[bool] = False,
) -> List[motion_planning.Waypoint]:
"""Calculate waypoints to a destination that's specified as an addressable area."""
location = self._pipettes.get_current_location()
Expand Down Expand Up @@ -177,8 +178,8 @@ def get_movement_waypoints_to_addressable_area(
destination = base_destination + Point(offset.x, offset.y, offset.z)

# TODO(jbl 11-28-2023) This may need to change for partial tip configurations on a 96
if "movableTrash" in addressable_area_name:
destination_cp = None
if ignore_tip_configuration:
destination_cp = CriticalPoint.INSTRUMENT_XY_CENTER
else:
destination_cp = CriticalPoint.XY_CENTER

Expand Down

0 comments on commit a6e9429

Please sign in to comment.