Skip to content

Commit

Permalink
feat(api,hardware): add hepa/uv config/control commands to api.
Browse files Browse the repository at this point in the history
  • Loading branch information
vegano1 committed Feb 13, 2024
1 parent c142da1 commit 754c78c
Show file tree
Hide file tree
Showing 6 changed files with 236 additions and 3 deletions.
18 changes: 17 additions & 1 deletion api/src/opentrons/hardware_control/backends/flex_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@
EstopState,
HardwareEventHandler,
HardwareEventUnsubscriber,
HepaFanState,
HepaUVState,
StatusBarState,
)
from opentrons.hardware_control.module_control import AttachedModulesControl
from ..dev_types import OT3AttachedInstruments
from ..types import StatusBarState
from .types import HWStopCondition

Cls = TypeVar("Cls")
Expand Down Expand Up @@ -417,3 +419,17 @@ def check_gripper_position_within_bounds(
hard_limit_upper: float,
) -> None:
...

async def set_hepa_fan_state(self, fan_on: bool, duty_cycle: int) -> bool:
"""Sets the state and duty cycle of the Hepa/UV module."""
...

async def get_hepa_fan_state(self) -> Optional[HepaFanState]:
...

async def set_hepa_uv_state(self, light_on: bool, timeout_s: int) -> bool:
"""Sets the state and timeout in seconds of the Hepa/UV module."""
...

async def get_hepa_uv_state(self) -> Optional[HepaUVState]:
...
37 changes: 36 additions & 1 deletion api/src/opentrons/hardware_control/backends/ot3controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,12 @@
from opentrons_hardware.hardware_control.gripper_settings import (
get_gripper_jaw_state,
)
from opentrons_hardware.hardware_control.hepa_uv_settings import (
set_hepa_fan_state as set_hepa_fan_state_fw,
get_hepa_fan_state as get_hepa_fan_state_fw,
set_hepa_uv_state as set_hepa_uv_state_fw,
get_hepa_uv_state as get_hepa_uv_state_fw,
)

from opentrons_hardware.drivers.gpio import OT3GPIO, RemoteOT3GPIO
from opentrons_shared_data.pipette.dev_types import PipetteName
Expand All @@ -193,7 +199,7 @@
AttachedGripper,
OT3AttachedInstruments,
)
from ..types import StatusBarState
from ..types import HepaFanState, HepaUVState, StatusBarState

from .types import HWStopCondition
from .flex_protocol import FlexBackend
Expand Down Expand Up @@ -1570,3 +1576,32 @@ def check_gripper_position_within_bounds(
"actual-jaw-width": current_gripper_position,
},
)

async def set_hepa_fan_state(self, fan_on: bool, duty_cycle: int) -> bool:
return await set_hepa_fan_state_fw(self._messenger, fan_on, duty_cycle)

async def get_hepa_fan_state(self) -> Optional[HepaFanState]:
res = await get_hepa_fan_state_fw(self._messenger)
return (
HepaFanState(
fan_on=res.fan_on,
duty_cycle=res.duty_cycle,
)
if res
else None
)

async def set_hepa_uv_state(self, light_on: bool, timeout_s: int) -> bool:
return await set_hepa_uv_state_fw(self._messenger, light_on, timeout_s)

async def get_hepa_uv_state(self) -> Optional[HepaUVState]:
res = await get_hepa_uv_state_fw(self._messenger)
return (
HepaUVState(
light_on=res.uv_light_on,
config_timeout=res.timeout_s,
remaining_time_s=res.remaining_time_s,
)
if res
else None
)
20 changes: 20 additions & 0 deletions api/src/opentrons/hardware_control/ot3api.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@
HardwareEvent,
HardwareEventHandler,
HardwareAction,
HepaFanState,
HepaUVState,
MotionChecks,
SubSystem,
PauseType,
Expand Down Expand Up @@ -2685,3 +2687,21 @@ def estop_acknowledge_and_clear(self) -> EstopOverallStatus:

def get_estop_state(self) -> EstopState:
return self._backend.get_estop_state()

async def set_hepa_fan_state(
self, turn_on: bool = False, duty_cycle: int = 75
) -> bool:
"""Sets the state and duty cycle of the Hepa/UV module."""
return await self._backend.set_hepa_fan_state(turn_on, duty_cycle)

async def get_hepa_fan_state(self) -> Optional[HepaFanState]:
return await self._backend.get_hepa_fan_state()

async def set_hepa_uv_state(
self, turn_on: bool = False, timeout_s: int = 900
) -> bool:
"""Sets the state and timeout in seconds of the Hepa/UV module."""
return await self._backend.set_hepa_uv_state(turn_on, timeout_s)

async def get_hepa_uv_state(self) -> Optional[HepaUVState]:
return await self._backend.get_hepa_uv_state()
13 changes: 13 additions & 0 deletions api/src/opentrons/hardware_control/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,19 @@ class EstopOverallStatus:
right_physical_state: EstopPhysicalStatus


@dataclass
class HepaFanState:
fan_on: bool
duty_cycle: int


@dataclass
class HepaUVState:
light_on: bool
config_timeout: int
remaining_time_s: int


@dataclass(frozen=True)
class DoorStateNotification:
event: Literal[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,7 @@ class SetHepaFanStateRequestPayload(EmptyPayload):
"""A request to set the state and pwm of a the hepa fan."""

duty_cycle: utils.UInt32Field
fan_on: utils.Int8Field
fan_on: utils.UInt8Field


@dataclass(eq=False)
Expand Down
149 changes: 149 additions & 0 deletions hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
"""Utilities for controlling the hepa/uv extension module."""
import logging
import asyncio
from typing import Optional
from dataclasses import dataclass
from opentrons_hardware.drivers.can_bus.can_messenger import CanMessenger
from opentrons_hardware.firmware_bindings.arbitration_id import ArbitrationId

Check warning on line 7 in hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py#L2-L7

Added lines #L2 - L7 were not covered by tests

from opentrons_hardware.firmware_bindings.messages import payloads
from opentrons_hardware.firmware_bindings.messages.messages import MessageDefinition
from opentrons_hardware.firmware_bindings.messages.message_definitions import (

Check warning on line 11 in hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py#L9-L11

Added lines #L9 - L11 were not covered by tests
SetHepaFanStateRequest,
GetHepaFanStateRequest,
GetHepaFanStateResponse,
SetHepaUVStateRequest,
GetHepaUVStateRequest,
GetHepaUVStateResponse,
)
from opentrons_hardware.firmware_bindings.utils import (

Check warning on line 19 in hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py#L19

Added line #L19 was not covered by tests
UInt8Field,
UInt32Field,
)
from opentrons_hardware.firmware_bindings.constants import (

Check warning on line 23 in hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py#L23

Added line #L23 was not covered by tests
MessageId,
NodeId,
ErrorCode,
)

log = logging.getLogger(__name__)

Check warning on line 29 in hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py#L29

Added line #L29 was not covered by tests


@dataclass(frozen=True)
class HepaFanState:

Check warning on line 33 in hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py#L32-L33

Added lines #L32 - L33 were not covered by tests
"""Hepa Fan Config."""

fan_on: bool
duty_cycle: int

Check warning on line 37 in hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py#L36-L37

Added lines #L36 - L37 were not covered by tests


@dataclass(frozen=True)
class HepaUVState:

Check warning on line 41 in hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py#L40-L41

Added lines #L40 - L41 were not covered by tests
"""Hepa UV Light Config."""

uv_light_on: bool
timeout_s: int
remaining_time_s: int

Check warning on line 46 in hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py#L44-L46

Added lines #L44 - L46 were not covered by tests


async def set_hepa_fan_state(

Check warning on line 49 in hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py#L49

Added line #L49 was not covered by tests
can_messenger: CanMessenger,
fan_on: bool,
duty_cycle: int,
) -> bool:
"""Set the Hepa fan state and duty cycle."""
error = await can_messenger.ensure_send(

Check warning on line 55 in hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py#L55

Added line #L55 was not covered by tests
node_id=NodeId.hepa_uv,
message=SetHepaFanStateRequest(
payload=payloads.SetHepaFanStateRequestPayload(
duty_cycle=UInt32Field(duty_cycle), fan_on=UInt8Field(fan_on)
),
),
expected_nodes=[NodeId.hepa_uv],
)
if error != ErrorCode.ok:
log.error(f"recieved error trying to set hepa fan state {str(error)}")
return error == ErrorCode.ok

Check warning on line 66 in hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py#L64-L66

Added lines #L64 - L66 were not covered by tests


async def get_hepa_fan_state(can_messenger: CanMessenger) -> Optional[HepaFanState]:

Check warning on line 69 in hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py#L69

Added line #L69 was not covered by tests
"""Gets the state of the Hepa fan."""
fan_state: Optional[HepaFanState] = None

Check warning on line 71 in hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py#L71

Added line #L71 was not covered by tests

event = asyncio.Event()

Check warning on line 73 in hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py#L73

Added line #L73 was not covered by tests

def _listener(message: MessageDefinition, arb_id: ArbitrationId) -> None:

Check warning on line 75 in hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py#L75

Added line #L75 was not covered by tests
nonlocal fan_state
if isinstance(message, GetHepaFanStateResponse):
event.set()
fan_state = HepaFanState(

Check warning on line 79 in hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py#L77-L79

Added lines #L77 - L79 were not covered by tests
fan_on=bool(message.payload.fan_on.value),
duty_cycle=int(message.payload.duty_cycle.value),
)

def _filter(arb_id: ArbitrationId) -> bool:
return (NodeId(arb_id.parts.originating_node_id) == NodeId.hepa_uv) and (

Check warning on line 85 in hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py#L84-L85

Added lines #L84 - L85 were not covered by tests
MessageId(arb_id.parts.message_id) == MessageId.get_hepa_fan_state_response
)

can_messenger.add_listener(_listener, _filter)
await can_messenger.send(node_id=NodeId.hepa_uv, message=GetHepaFanStateRequest())
try:
await asyncio.wait_for(event.wait(), 1.0)
except asyncio.TimeoutError:
log.warning("hepa fan state request timed out")

Check warning on line 94 in hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py#L89-L94

Added lines #L89 - L94 were not covered by tests
finally:
can_messenger.remove_listener(_listener)
return fan_state

Check warning on line 97 in hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py#L96-L97

Added lines #L96 - L97 were not covered by tests


async def set_hepa_uv_state(

Check warning on line 100 in hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py#L100

Added line #L100 was not covered by tests
can_messenger: CanMessenger,
uv_light_on: bool,
timeout_s: int,
) -> bool:
"""Set the Hepa UV state and timeout in seconds."""
error = await can_messenger.ensure_send(

Check warning on line 106 in hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py#L106

Added line #L106 was not covered by tests
node_id=NodeId.hepa_uv,
message=SetHepaUVStateRequest(
payload=payloads.SetHepaUVStateRequestPayload(
timeout_s=UInt32Field(timeout_s), uv_light_on=UInt8Field(uv_light_on)
),
),
expected_nodes=[NodeId.hepa_uv],
)
if error != ErrorCode.ok:
log.error(f"recieved error trying to set hepa uv light state {str(error)}")
return error == ErrorCode.ok

Check warning on line 117 in hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py#L115-L117

Added lines #L115 - L117 were not covered by tests


async def get_hepa_uv_state(can_messenger: CanMessenger) -> Optional[HepaUVState]:

Check warning on line 120 in hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py#L120

Added line #L120 was not covered by tests
"""Gets the state of the Hepa uv light."""
uv_state: Optional[HepaUVState] = None

Check warning on line 122 in hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py#L122

Added line #L122 was not covered by tests

event = asyncio.Event()

Check warning on line 124 in hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py#L124

Added line #L124 was not covered by tests

def _listener(message: MessageDefinition, arb_id: ArbitrationId) -> None:

Check warning on line 126 in hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py#L126

Added line #L126 was not covered by tests
nonlocal uv_state
if isinstance(message, GetHepaUVStateResponse):
event.set()
uv_state = HepaUVState(

Check warning on line 130 in hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py#L128-L130

Added lines #L128 - L130 were not covered by tests
uv_light_on=bool(message.payload.uv_light_on.value),
timeout_s=int(message.payload.timeout_s.value),
remaining_time_s=int(message.payload.remaining_time_s.value),
)

def _filter(arb_id: ArbitrationId) -> bool:
return (NodeId(arb_id.parts.originating_node_id) == NodeId.hepa_uv) and (

Check warning on line 137 in hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py#L136-L137

Added lines #L136 - L137 were not covered by tests
MessageId(arb_id.parts.message_id) == MessageId.get_hepa_uv_state_response
)

can_messenger.add_listener(_listener, _filter)
await can_messenger.send(node_id=NodeId.hepa_uv, message=GetHepaUVStateRequest())
try:
await asyncio.wait_for(event.wait(), 1.0)
except asyncio.TimeoutError:
log.warning("hepa uv light state request timed out")

Check warning on line 146 in hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py#L141-L146

Added lines #L141 - L146 were not covered by tests
finally:
can_messenger.remove_listener(_listener)
return uv_state

Check warning on line 149 in hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py#L148-L149

Added lines #L148 - L149 were not covered by tests

0 comments on commit 754c78c

Please sign in to comment.