Skip to content

Commit

Permalink
fix lint
Browse files Browse the repository at this point in the history
  • Loading branch information
ahiuchingau committed May 7, 2024
1 parent 64658ab commit be74b9b
Show file tree
Hide file tree
Showing 21 changed files with 336 additions and 197 deletions.
6 changes: 5 additions & 1 deletion api/src/opentrons/drivers/absorbance_reader/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,8 @@
from .driver import AbsorbanceReaderDriver
from .simulator import SimulatingDriver

__all__ = ["AbstractAbsorbanceReaderDriver", "AbsorbanceReaderDriver", "SimulatingDriver"]
__all__ = [
"AbstractAbsorbanceReaderDriver",
"AbsorbanceReaderDriver",
"SimulatingDriver",
]
6 changes: 3 additions & 3 deletions api/src/opentrons/drivers/absorbance_reader/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,16 @@ async def get_available_wavelengths(self) -> List[int]:
@abstractmethod
async def get_single_measurement(self, wavelength: int) -> List[float]:
...

@abstractmethod
async def set_sample_wavelength(self, wavelength: int) -> None:
...

@abstractmethod
async def get_status(self) -> None:
...

@abstractmethod
async def get_device_info(self) -> Dict[str, str]:
"""Get device info"""
...
...
169 changes: 93 additions & 76 deletions api/src/opentrons/drivers/absorbance_reader/async_byonoy.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import asyncio
import contextlib
import hid
from dataclasses import dataclass
from concurrent.futures.thread import ThreadPoolExecutor
from functools import partial
Expand All @@ -12,22 +11,25 @@

from opentrons.drivers.rpi_drivers.types import USBPort

import pybyonoy_device_library as byonoy # type: ignore[import-not-found]
import hid # type: ignore[import-not-found]
import pybyonoy_device_library as byonoy # type: ignore[import-not-found]

TimeoutProperties = Union[Literal["write_timeout"], Literal["timeout"]]


def get_byonoy_device_from_sn(sn: str) -> byonoy.ByonoyDevice:
found = byonoy.byonoy_available_devices()
for device in found:
if device.sn == sn:
return device


def get_device_number_at_port(usb_port: USBPort) -> str:
full_path = usb_port.name + ':' + usb_port.device_path.split('/')[0]
hid_port = [i for i in hid.enumerate() if full_path in i['path'].decode()]
full_path = usb_port.name + ":" + usb_port.device_path.split("/")[0]
hid_port = [i for i in hid.enumerate() if full_path in i["path"].decode()]
assert len(hid_port) == 1, f"Expected 1 device, found {len(hid_port)}"
return hid_port[0]['serial_number']
sn = hid_port[0]["serial_number"]
return str(sn)


@dataclass
Expand Down Expand Up @@ -69,11 +71,7 @@ async def create(
executor = ThreadPoolExecutor(max_workers=1)
device_sn = get_device_number_at_port(usb_port)
device = await loop.run_in_executor(
executor=executor,
func=partial(
get_byonoy_device_from_sn,
device_sn
)
executor=executor, func=partial(get_byonoy_device_from_sn, device_sn)
)
return cls(
device=device,
Expand Down Expand Up @@ -113,72 +111,92 @@ def _open(self) -> None:
if err != byonoy.ByonoyErrorCode.BYONOY_ERROR_NO_ERROR:
raise RuntimeError(f"Error opening device: {err}")
self._device_handle = device_handle

def _free(self) -> None:
if self._device_handle:
byonoy.byonoy_free_device(self._device_handle)
self._cleanup()

def verify_device_handle(self) -> None:
assert self._device_handle, RuntimeError("Device handle not initialized")

def _get_device_information(self) -> byonoy.ByonoyDeviceInfo:
if self._device_handle:
err, device_info = byonoy.byonoy_get_device_information(self._device_handle)
if err != byonoy.ByonoyErrorCode.BYONOY_ERROR_NO_ERROR:
raise RuntimeError(f"Error getting device information: {err}")
return device_info
self.verify_device_handle()
err, device_info = byonoy.byonoy_get_device_information(self._device_handle)
if err != byonoy.ByonoyErrorCode.BYONOY_ERROR_NO_ERROR:
raise RuntimeError(f"Error getting device information: {err}")
return device_info

def _get_device_status(self) -> byonoy.ByonoyDeviceStatus:
if self._device_handle:
err, status = byonoy.byonoy_get_device_status(self._device_handle)
if err != byonoy.ByonoyErrorCode.BYONOY_ERROR_NO_ERROR:
raise RuntimeError(f"Error getting device status: {err}")
return status
self.verify_device_handle()
err, status = byonoy.byonoy_get_device_status(self._device_handle)
if err != byonoy.ByonoyErrorCode.BYONOY_ERROR_NO_ERROR:
raise RuntimeError(f"Error getting device status: {err}")
return status

def _get_slot_status(self) -> byonoy.ByonoyDeviceSlotStatus:
if self._device_handle:
err, slot_status = byonoy.byonoy_get_device_slot_status(self._device_handle)
if err != byonoy.ByonoyErrorCode.BYONOY_ERROR_NO_ERROR:
raise RuntimeError(f"Error getting slot status: {err}")
return slot_status
self.verify_device_handle()
err, slot_status = byonoy.byonoy_get_device_slot_status(self._device_handle)
if err != byonoy.ByonoyErrorCode.BYONOY_ERROR_NO_ERROR:
raise RuntimeError(f"Error getting slot status: {err}")
return slot_status

def _get_lid_status(self) -> bool:
if self._device_handle:
err, lid_on = byonoy.byonoy_get_device_parts_aligned(self._device_handle)
if err != byonoy.ByonoyErrorCode.BYONOY_ERROR_NO_ERROR:
raise RuntimeError(f"Error getting slot status: {err}")
return lid_on

self.verify_device_handle()
lid_on: bool
err, lid_on = byonoy.byonoy_get_device_parts_aligned(self._device_handle)
if err != byonoy.ByonoyErrorCode.BYONOY_ERROR_NO_ERROR:
raise RuntimeError(f"Error getting slot status: {err}")
return lid_on

def _get_supported_wavelengths(self) -> List[int]:
if self._device_handle:
err, wavelengths = byonoy.byonoy_abs96_get_available_wavelengths(self._device_handle)
if err != byonoy.ByonoyErrorCode.BYONOY_ERROR_NO_ERROR:
raise RuntimeError(f"Error getting supported wavelengths: {err}")
self._supported_wavelengths = wavelengths
return wavelengths

def _initialize_measurement(self, conf: byonoy.ByonoyAbs96SingleMeasurementConfig) -> None:
if self._device_handle:
err = byonoy.byonoy_abs96_initialize_single_measurement(self._device_handle, conf)
if err != byonoy.ByonoyErrorCode.BYONOY_ERROR_NO_ERROR:
raise RuntimeError(f"Error initializing measurement: {err}")

def _single_measurement(self, conf: byonoy.ByonoyAbs96SingleMeasurementConfig) -> List[float]:
if self._device_handle:
err, measurements = byonoy.byonoy_abs96_single_measure(self._device_handle, conf)
if err != byonoy.ByonoyErrorCode.BYONOY_ERROR_NO_ERROR:
raise RuntimeError(f"Error getting single measurement: {err}")
return measurements
self.verify_device_handle()
wavelengths: List[int]
err, wavelengths = byonoy.byonoy_abs96_get_available_wavelengths(
self._device_handle
)
if err != byonoy.ByonoyErrorCode.BYONOY_ERROR_NO_ERROR:
raise RuntimeError(f"Error getting supported wavelengths: {err}")
self._supported_wavelengths = wavelengths
return wavelengths

def _set_sample_wavelength(self, wavelength: int) -> byonoy.ByonoyAbs96SingleMeasurementConfig:
if self._device_handle:
if not self._supported_wavelengths:
self._get_supported_wavelengths()
assert self._supported_wavelengths
if wavelength in self._supported_wavelengths:
conf = byonoy.ByonoyAbs96SingleMeasurementConfig()
conf.sample_wavelength = wavelength
return conf
else:
raise ValueError(f"Unsupported wavelength: {wavelength}, expected: {self._supported_wavelengths}")
def _initialize_measurement(
self, conf: byonoy.ByonoyAbs96SingleMeasurementConfig
) -> None:
self.verify_device_handle()
err = byonoy.byonoy_abs96_initialize_single_measurement(
self._device_handle, conf
)
if err != byonoy.ByonoyErrorCode.BYONOY_ERROR_NO_ERROR:
raise RuntimeError(f"Error initializing measurement: {err}")

def _single_measurement(
self, conf: byonoy.ByonoyAbs96SingleMeasurementConfig
) -> List[float]:
self.verify_device_handle()
measurements: List[float]
err, measurements = byonoy.byonoy_abs96_single_measure(
self._device_handle, conf
)
if err != byonoy.ByonoyErrorCode.BYONOY_ERROR_NO_ERROR:
raise RuntimeError(f"Error getting single measurement: {err}")
return measurements

def _set_sample_wavelength(
self, wavelength: int
) -> byonoy.ByonoyAbs96SingleMeasurementConfig:
self.verify_device_handle()
if not self._supported_wavelengths:
self._get_supported_wavelengths()
assert self._supported_wavelengths
if wavelength in self._supported_wavelengths:
conf = byonoy.ByonoyAbs96SingleMeasurementConfig()
conf.sample_wavelength = wavelength
return conf
else:
raise ValueError(
f"Unsupported wavelength: {wavelength}, expected: {self._supported_wavelengths}"
)

def _initialize(self, wavelength: int) -> None:
conf = self._set_sample_wavelength(wavelength)
Expand All @@ -204,9 +222,7 @@ async def close(self) -> None:
Returns: None
"""
await self._loop.run_in_executor(
executor=self._executor, func=self._free
)
await self._loop.run_in_executor(executor=self._executor, func=self._free)

async def is_open(self) -> bool:
"""
Expand All @@ -215,23 +231,25 @@ async def is_open(self) -> bool:
Returns: boolean
"""
return self._device_handle is not None

async def get_device_information(self) -> DeviceInfo:
device_info = await self._loop.run_in_executor(
executor=self._executor, func=self._get_device_information
)
return DeviceInfo(
serial_number=device_info.sn,
reference_number=device_info.ref_no,
version=device_info.version
version=device_info.version,
)

async def get_lid_status(self) -> AbsorbanceReaderLidStatus:
lid_info = await self._loop.run_in_executor(
executor=self._executor, func=self._get_lid_status
)
return AbsorbanceReaderLidStatus.ON if lid_info else AbsorbanceReaderLidStatus.OFF

return (
AbsorbanceReaderLidStatus.ON if lid_info else AbsorbanceReaderLidStatus.OFF
)

async def get_supported_wavelengths(self) -> list[int]:
return await self._loop.run_in_executor(
executor=self._executor, func=self._get_supported_wavelengths
Expand All @@ -244,7 +262,6 @@ async def initialize(self, wavelength: int) -> None:

async def get_single_measurement(self, wavelength: int) -> List[float]:
return await self._loop.run_in_executor(
executor=self._executor, func=partial(self._get_single_measurement, wavelength)
executor=self._executor,
func=partial(self._get_single_measurement, wavelength),
)


9 changes: 4 additions & 5 deletions api/src/opentrons/drivers/absorbance_reader/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@ async def create(
loop: Optional[asyncio.AbstractEventLoop],
) -> AbsorbanceReaderDriver:
"""Create an absorbance reader driver."""
connection = await AsyncByonoy.create(
port=port, usb_port=usb_port, loop=loop)
connection = await AsyncByonoy.create(port=port, usb_port=usb_port, loop=loop)
return cls(connection=connection)

def __init__(self, connection: AsyncByonoy) -> None:
self._connection = connection

Expand Down Expand Up @@ -51,9 +50,9 @@ async def get_available_wavelengths(self) -> List[int]:

async def get_single_measurement(self, wavelength: int) -> List[float]:
return await self._connection.get_single_measurement(wavelength)

async def set_sample_wavelength(self, wavelength: int) -> None:
await self._connection.initialize(wavelength)

async def get_status(self) -> None:
pass
9 changes: 4 additions & 5 deletions api/src/opentrons/drivers/absorbance_reader/simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@


class SimulatingDriver(AbstractAbsorbanceReaderDriver):

def __init__(self, serial_number: Optional[str] = None) -> None:
self._serial_number = serial_number

Expand Down Expand Up @@ -45,13 +44,13 @@ async def get_available_wavelengths(self) -> List[int]:
return [450, 570, 600, 650]

@ensure_yield
async def get_single_measurement(self, wavelength: int) -> None:
pass
async def get_single_measurement(self, wavelength: int) -> List[float]:
return [0.0]

@ensure_yield
async def set_sample_wavelength(self, wavelength: int) -> None:
pass

@ensure_yield
async def get_status(self) -> None:
pass
12 changes: 7 additions & 5 deletions api/src/opentrons/drivers/rpi_drivers/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,12 @@ class PortGroup:
(tty/tty(\w{4})/dev | [\w:\.]+?/hidraw/hidraw\d/dev)
)
""",
re.VERBOSE
re.VERBOSE,
)

HUB_PATTERN = re.compile(r"(\d-[\d.]+\d?)[\/:]")


@dataclass(frozen=True)
class USBPort:
name: str
Expand All @@ -58,7 +59,9 @@ class USBPort:
device_path: str = ""

@classmethod
def build(cls, full_path: str, board_revision: BoardRevision) -> Optional["USBPort"]:
def build(
cls, full_path: str, board_revision: BoardRevision
) -> Optional["USBPort"]:
"""
Build a USBPort dataclass.
Expand Down Expand Up @@ -127,9 +130,9 @@ def find_hub(
port_nodes = [node for node in port_nodes if "." in node]
if len(port_nodes) > 2:
port_info = port_nodes[2].split(".")
hub: Optional[int] = int(port_info[1])
hub = int(port_info[1])
port = int(port_info[2])
hub_port: Optional[int] = int(port_info[3])
hub_port = int(port_info[3])
name = port_nodes[2]
elif len(port_nodes) > 1:
if board_revision == BoardRevision.OG:
Expand Down Expand Up @@ -202,7 +205,6 @@ def get_unique_nodes(full_name: str) -> List[str]:
match_set.append(match)
return match_set


@staticmethod
def map_to_revision(
board_revision: BoardRevision,
Expand Down
2 changes: 1 addition & 1 deletion api/src/opentrons/drivers/rpi_drivers/usb.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,5 +99,5 @@ def match_virtual_ports(
vp.usb_port = p
sorted_virtual_ports.append(vp)
break

return sorted_virtual_ports or virtual_ports
2 changes: 1 addition & 1 deletion api/src/opentrons/drivers/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,4 @@ class AbsorbanceReaderPlatePresence(str, Enum):

UNKNOWN = "unknown"
PRESENT = "present"
ABSENCE = "absence"
ABSENCE = "absence"
Loading

0 comments on commit be74b9b

Please sign in to comment.