Skip to content

Commit

Permalink
fix(api): update the byonoy library to 2024.07.0
Browse files Browse the repository at this point in the history
  • Loading branch information
vegano1 committed Aug 7, 2024
1 parent ed46e8f commit 025e54e
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 69 deletions.
46 changes: 20 additions & 26 deletions api/src/opentrons/drivers/absorbance_reader/async_byonoy.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,13 @@ async def create(
loop = loop or asyncio.get_running_loop()
executor = ThreadPoolExecutor(max_workers=1)

import pybyonoy_device_library as byonoy # type: ignore[import-not-found]
import byonoy_devices as byonoy # type: ignore[import-not-found]

interface: AbsProtocol = byonoy

device_sn = cls.serial_number_from_port(usb_port.name)
found: List[AbsProtocol.Device] = await loop.run_in_executor(
executor=executor, func=byonoy.byonoy_available_devices
executor=executor, func=byonoy.available_devices
)
device = cls.match_device_with_sn(device_sn, found)

Expand Down Expand Up @@ -121,7 +121,7 @@ async def open(self) -> bool:

err, device_handle = await self._loop.run_in_executor(
executor=self._executor,
func=partial(self._interface.byonoy_open_device, self._device),
func=partial(self._interface.open_device, self._device),
)
self._raise_if_error(err.name, f"Error opening device: {err}")
self._device_handle = device_handle
Expand All @@ -132,7 +132,7 @@ async def close(self) -> None:
handle = self._verify_device_handle()
await self._loop.run_in_executor(
executor=self._executor,
func=partial(self._interface.byonoy_free_device, handle),
func=partial(self._interface.free_device, handle),
)
self._device_handle = None

Expand All @@ -143,15 +143,15 @@ async def is_open(self) -> bool:
handle = self._verify_device_handle()
return await self._loop.run_in_executor(
executor=self._executor,
func=partial(self._interface.byonoy_device_open, handle),
func=partial(self._interface.device_open, handle),
)

async def get_device_information(self) -> Dict[str, str]:
"""Get serial number and version info."""
handle = self._verify_device_handle()
err, device_info = await self._loop.run_in_executor(
executor=self._executor,
func=partial(self._interface.byonoy_get_device_information, handle),
func=partial(self._interface.get_device_information, handle),
)
self._raise_if_error(err.name, f"Error getting device information: {err}")
serial_match = SERIAL_PARSER.match(device_info.sn)
Expand All @@ -170,7 +170,7 @@ async def get_device_status(self) -> AbsorbanceReaderDeviceState:
handle = self._verify_device_handle()
err, status = await self._loop.run_in_executor(
executor=self._executor,
func=partial(self._interface.byonoy_get_device_status, handle),
func=partial(self._interface.get_device_status, handle),
)
self._raise_if_error(err.name, f"Error getting device status: {err}")
return self.convert_device_state(status.name)
Expand All @@ -182,11 +182,9 @@ async def update_firmware(self, firmware_file_path: str) -> Tuple[bool, str]:
return False, f"Firmware file not found: {firmware_file_path}"
err = await self._loop.run_in_executor(
executor=self._executor,
func=partial(
self._interface.byonoy_update_device, handle, firmware_file_path
),
func=partial(self._interface.update_device, handle, firmware_file_path),
)
if err.name != "BYONOY_ERROR_NO_ERROR":
if err.name != "ERROR_NO_ERROR":
return False, f"Byonoy update failed with error: {err}"
return True, ""

Expand All @@ -195,7 +193,7 @@ async def get_device_uptime(self) -> int:
handle = self._verify_device_handle()
err, uptime = await self._loop.run_in_executor(
executor=self._executor,
func=partial(self._interface.byonoy_get_device_uptime, handle),
func=partial(self._interface.get_device_uptime, handle),
)
self._raise_if_error(err.name, "Error getting device uptime: ")
return uptime
Expand All @@ -205,7 +203,7 @@ async def get_lid_status(self) -> AbsorbanceReaderLidStatus:
handle = self._verify_device_handle()
err, lid_info = await self._loop.run_in_executor(
executor=self._executor,
func=partial(self._interface.byonoy_get_device_parts_aligned, handle),
func=partial(self._interface.get_device_parts_aligned, handle),
)
self._raise_if_error(err.name, f"Error getting lid status: {err}")
return (
Expand All @@ -217,9 +215,7 @@ async def get_supported_wavelengths(self) -> list[int]:
handle = self._verify_device_handle()
err, wavelengths = await self._loop.run_in_executor(
executor=self._executor,
func=partial(
self._interface.byonoy_abs96_get_available_wavelengths, handle
),
func=partial(self._interface.abs96_get_available_wavelengths, handle),
)
self._raise_if_error(err.name, "Error getting available wavelengths: ")
self._supported_wavelengths = wavelengths
Expand All @@ -235,7 +231,7 @@ async def get_single_measurement(self, wavelength: int) -> List[float]:
err, measurements = await self._loop.run_in_executor(
executor=self._executor,
func=partial(
self._interface.byonoy_abs96_single_measure,
self._interface.abs96_single_measure,
handle,
self._current_config,
),
Expand All @@ -248,24 +244,22 @@ async def get_plate_presence(self) -> AbsorbanceReaderPlatePresence:
handle = self._verify_device_handle()
err, presence = await self._loop.run_in_executor(
executor=self._executor,
func=partial(self._interface.byonoy_get_device_slot_status, handle),
func=partial(self._interface.get_device_slot_status, handle),
)
self._raise_if_error(err.name, f"Error getting slot status: {err}")
return self.convert_plate_presence(presence.name)

def _get_supported_wavelengths(self) -> List[int]:
handle = self._verify_device_handle()
wavelengths: List[int]
err, wavelengths = self._interface.byonoy_abs96_get_available_wavelengths(
handle
)
err, wavelengths = self._interface.abs96_get_available_wavelengths(handle)
self._raise_if_error(err.name, f"Error getting available wavelengths: {err}")
self._supported_wavelengths = wavelengths
return wavelengths

def _initialize_measurement(self, conf: AbsProtocol.MeasurementConfig) -> None:
handle = self._verify_device_handle()
err = self._interface.byonoy_abs96_initialize_single_measurement(handle, conf)
err = self._interface.abs96_initialize_single_measurement(handle, conf)
self._raise_if_error(err.name, f"Error initializing measurement: {err}")
self._current_config = conf

Expand Down Expand Up @@ -304,12 +298,12 @@ def _raise_if_error(
msg: str = "Error occurred: ",
) -> None:
if err_name in [
"BYONOY_ERROR_DEVICE_CLOSED",
"BYONOY_ERROR_DEVICE_COMMUNICATION_FAILURE",
"BYONOY_ERROR_UNSUPPORTED_OPERATION",
"ERROR_DEVICE_CLOSED",
"ERROR_DEVICE_COMMUNICATION_FAILURE",
"ERROR_UNSUPPORTED_OPERATION",
]:
raise AbsorbanceReaderDisconnectedError(self._device.sn)
if err_name != "BYONOY_ERROR_NO_ERROR":
if err_name != "ERROR_NO_ERROR":
raise RuntimeError(msg, err_name)

@staticmethod
Expand Down
78 changes: 35 additions & 43 deletions api/src/opentrons/drivers/absorbance_reader/hid_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,28 @@
Response = TypeVar("Response")

ErrorCodeNames = Literal[
"BYONOY_ERROR_NO_ERROR",
"BYONOY_ERROR_UNKNOWN_ERROR",
"BYONOY_ERROR_DEVICE_CLOSED",
"BYONOY_ERROR_INVALID_ARGUMENT",
"BYONOY_ERROR_NO_MEMORY",
"BYONOY_ERROR_UNSUPPORTED_OPERATION",
"BYONOY_ERROR_DEVICE_COMMUNICATION_FAILURE",
"BYONOY_ERROR_DEVICE_OPERATION_FAILED",
"BYONOY_ERROR_DEVICE_OPEN_PREFIX",
"BYONOY_ERROR_DEVICE_NOT_FOUND",
"BYONOY_ERROR_DEVICE_TOO_NEW",
"BYONOY_ERROR_DEVICE_ALREADY_OPEN",
"BYONOY_ERROR_FIRMWARE_UPDATE_ERROR_PREFIX",
"BYONOY_ERROR_FIRMWARE_UPDATE_FILE_NOT_FOUND",
"BYONOY_ERROR_FIRMWARE_UPDATE_FILE_NOT_VALID",
"BYONOY_ERROR_FIRMWARE_UPDATE_FAILED",
"BYONOY_ERROR_FILE_ERROR_PREFIX",
"BYONOY_ERROR_FILE_WRITE_ERROR",
"BYONOY_ERROR_MEASUTEMNT_ERROR_PREFIX",
"BYONOY_ERROR_MEASUTEMNT_SLOT_NOT_EMPTY",
"BYONOY_ERROR_NOT_INITIALIZED",
"BYONOY_ERROR_INTERNAL",
"ERROR_NO_ERROR",
"ERROR_UNKNOWN_ERROR",
"ERROR_DEVICE_CLOSED",
"ERROR_INVALID_ARGUMENT",
"ERROR_NO_MEMORY",
"ERROR_UNSUPPORTED_OPERATION",
"ERROR_DEVICE_COMMUNICATION_FAILURE",
"ERROR_DEVICE_OPERATION_FAILED",
"ERROR_DEVICE_OPEN_PREFIX",
"ERROR_DEVICE_NOT_FOUND",
"ERROR_DEVICE_TOO_NEW",
"ERROR_DEVICE_ALREADY_OPEN",
"ERROR_FIRMWARE_UPDATE_ERROR_PREFIX",
"ERROR_FIRMWARE_UPDATE_FILE_NOT_FOUND",
"ERROR_FIRMWARE_UPDATE_FILE_NOT_VALID",
"ERROR_FIRMWARE_UPDATE_FAILED",
"ERROR_FILE_ERROR_PREFIX",
"ERROR_FILE_WRITE_ERROR",
"ERROR_MEASUTEMNT_ERROR_PREFIX",
"ERROR_MEASUTEMNT_SLOT_NOT_EMPTY",
"ERROR_NOT_INITIALIZED",
"ERROR_INTERNAL",
]

SlotStateNames = Literal[
Expand Down Expand Up @@ -87,57 +87,49 @@ class DeviceState(Protocol):
def ByonoyAbs96SingleMeasurementConfig(self) -> MeasurementConfig:
...

def byonoy_open_device(self, device: Device) -> Tuple[ErrorCode, int]:
def open_device(self, device: Device) -> Tuple[ErrorCode, int]:
...

def byonoy_free_device(self, device_handle: int) -> Tuple[ErrorCode, bool]:
def free_device(self, device_handle: int) -> Tuple[ErrorCode, bool]:
...

def byonoy_device_open(self, device_handle: int) -> bool:
def device_open(self, device_handle: int) -> bool:
...

def byonoy_get_device_information(
def get_device_information(
self, device_handle: int
) -> Tuple[ErrorCode, DeviceInfo]:
...

def byonoy_update_device(
self, device_handle: int, firmware_file_path: str
) -> ErrorCode:
def update_device(self, device_handle: int, firmware_file_path: str) -> ErrorCode:
...

def byonoy_get_device_status(
self, device_handle: int
) -> Tuple[ErrorCode, DeviceState]:
def get_device_status(self, device_handle: int) -> Tuple[ErrorCode, DeviceState]:
...

def byonoy_get_device_uptime(self, device_handle: int) -> Tuple[ErrorCode, int]:
def get_device_uptime(self, device_handle: int) -> Tuple[ErrorCode, int]:
...

def byonoy_get_device_slot_status(
self, device_handle: int
) -> Tuple[ErrorCode, SlotState]:
def get_device_slot_status(self, device_handle: int) -> Tuple[ErrorCode, SlotState]:
...

def byonoy_get_device_parts_aligned(
self, device_handle: int
) -> Tuple[ErrorCode, bool]:
def get_device_parts_aligned(self, device_handle: int) -> Tuple[ErrorCode, bool]:
...

def byonoy_abs96_get_available_wavelengths(
def abs96_get_available_wavelengths(
self, device_handle: int
) -> Tuple[ErrorCode, List[int]]:
...

def byonoy_abs96_initialize_single_measurement(
def abs96_initialize_single_measurement(
self, device_handle: int, conf: MeasurementConfig
) -> ErrorCode:
...

def byonoy_abs96_single_measure(
def abs96_single_measure(
self, device_handle: int, conf: MeasurementConfig
) -> Tuple[ErrorCode, List[float]]:
...

def byonoy_available_devices(self) -> List[Device]:
def available_devices(self) -> List[Device]:
...

0 comments on commit 025e54e

Please sign in to comment.