Skip to content

Commit

Permalink
add some changes for FPY
Browse files Browse the repository at this point in the history
* modified get serial number method for pass test device id
* adjust humidity and temperature threthold
* modified test method for pressure, adjust pressure threthold
* test 5ul in test droplets
* check jaws aligned while doing tip motors assembly
  • Loading branch information
Andiiiiiiyy committed Jul 1, 2024
1 parent ff46b4d commit b2a8545
Show file tree
Hide file tree
Showing 6 changed files with 436 additions and 47 deletions.
139 changes: 139 additions & 0 deletions hardware-testing/hardware_testing/drivers/sealed_pressure_fixture.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
# encoding:utf-8

import time
from typing import Union
import serial
import serial.tools.list_ports

ReceiveBuffer = 100


class SerialDriver:

@classmethod
def get_com_list(cls):
port_list = serial.tools.list_ports.comports()
return port_list

def __init__(self):
self.device = None
self.com = None

def get_device(self):
"""
select device
:return:
"""
port_list = SerialDriver.get_com_list()
print("=" * 5 + "PORT LIST" + "=" * 5)
for index, p in enumerate(port_list):
print(f"{index + 1} >>{p.device}")
select = input("Select Port Number(输入串口号对应的数字):")
self.device = port_list[int(select.strip()) - 1].device

def init_serial(self, baud):

"""
init connection
:param baud:
:return:
"""
self.com = serial.Serial(self.device, baud, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS, timeout=1)
if self.com.isOpen():
print(f"{self.device} Opened! \n")
# settings
self.com.bytesize = serial.EIGHTBITS # 数据位 8
self.com.parity = serial.PARITY_NONE # 无校验
self.com.stopbits = serial.STOPBITS_ONE # 停止位 1

def close(self):
"""
close com
:return:
"""
self.com.close()
print(f"{self.device} Closed! \n")

def init(self, baud):
"""
main
:return:
"""
self.get_device()
try:
self.init_serial(baud)
except:
print("Can't find device")

def write_and_get_buffer(self, send: Union[str, int, bytes], only_write=False, delay=None, times=30):
"""
send cmd
:return:
"""
if self.com is None:
return
if type(send) is not bytes:
send = (send + "\r\n").encode('utf-8')
self.com.flushInput()
self.com.flushOutput()
self.com.write(send)
time.sleep(0.1)
if delay is None:
pass
else:
time.sleep(delay)
if only_write is True:
return
for i in range(times):
data = self.com.read(ReceiveBuffer)
if type(data) is not bytes:
if "OK" not in data.decode('utf-8') or "busy" in data.decode('utf-8'):
time.sleep(1)
continue
else:
return data
return data.decode('utf-8')

def read_buffer(self):
"""
读取缓存
:return:
"""
try:
self.com.flushInput()
self.com.flushOutput()
except:
pass
time.sleep(3)
length = ReceiveBuffer
data = self.com.read(length)
self.com.flushInput()
self.com.flushOutput()
return data.decode('utf-8')

def get_pressure(self):
"""
analyze pressure value
"""
for _i in range(5):
try:
respond = self.read_buffer()
respond_list = respond.split('|')
respond_value = respond_list[1]

average_value = respond_value.split('\r\n')[0].split('\t')[1].strip()
average_value = float(average_value)
return average_value
except:
print(f"get pressure fail at {_i} times")
pass



if __name__ == '__main__':
s = SerialDriver()
s.init(9600)
for i in range(100):
result = s.get_pressure()
print(result)
Original file line number Diff line number Diff line change
Expand Up @@ -1096,7 +1096,12 @@ def get_pipette_serial_ot3(pipette: Union[PipetteOT2, PipetteOT3]) -> str:
model = pipette.model
volume = model.split("_")[0].replace("p", "")
volume = "1K" if volume == "1000" else volume
channels = "S" if "single" in model else "M"
if "single" in model:
channels = "S"
elif "96" in model:
channels = "H"
else:
channels = "M"
version = model.split("v")[-1].strip().replace(".", "")
assert pipette.pipette_id, f"no pipette_id found for pipette: {pipette}"
if "P" in pipette.pipette_id:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from hardware_testing.opentrons_api.types import OT3Mount, Point, Axis

TIP_VOLUME = 1000
ASPIRATE_VOLUME = 1000
ASPIRATE_VOLUME = [1000, 5]
NUM_SECONDS_TO_WAIT = 30
HOVER_HEIGHT_MM = 50
DEPTH_INTO_RESERVOIR_FOR_ASPIRATE = -24
Expand All @@ -26,7 +26,7 @@
TIP_RACK_LABWARE = f"opentrons_flex_96_tiprack_{TIP_VOLUME}ul"
RESERVOIR_LABWARE = "nest_1_reservoir_195ml"

TIP_RACK_96_SLOT = 4
TIP_RACK_96_SLOT = 10
TIP_RACK_PARTIAL_SLOT = 5
RESERVOIR_SLOT = 2
TRASH_SLOT = 12
Expand Down Expand Up @@ -103,14 +103,14 @@ def get_tiprack_partial_nominal() -> Point:


async def aspirate_and_wait(
api: OT3API, reservoir: Point, seconds: int = 30
api: OT3API, reservoir: Point, aspirate_value, seconds: int = 30
) -> Tuple[bool, float]:
"""Aspirate and wait."""
await helpers_ot3.move_to_arched_ot3(api, OT3Mount.LEFT, reservoir)
await api.move_to(
OT3Mount.LEFT, reservoir + Point(z=DEPTH_INTO_RESERVOIR_FOR_ASPIRATE)
)
await api.aspirate(OT3Mount.LEFT, ASPIRATE_VOLUME)
await api.aspirate(OT3Mount.LEFT, aspirate_value)
await api.move_to(OT3Mount.LEFT, reservoir + Point(z=HOVER_HEIGHT_MM))

start_time = time()
Expand Down Expand Up @@ -143,9 +143,9 @@ async def _drop_tip(api: OT3API, trash: Point) -> None:
await api.drop_tip(OT3Mount.LEFT)
# NOTE: a FW bug (as of v14) will sometimes not fully drop tips.
# so here we ask if the operator needs to try again
while not api.is_simulator and ui.get_user_answer("try dropping again"):
await api.add_tip(OT3Mount.LEFT, helpers_ot3.get_default_tip_length(TIP_VOLUME))
await api.drop_tip(OT3Mount.LEFT)
# while not api.is_simulator and ui.get_user_answer("try dropping again"):
# await api.add_tip(OT3Mount.LEFT, helpers_ot3.get_default_tip_length(TIP_VOLUME))
# await api.drop_tip(OT3Mount.LEFT)
await api.home_z(OT3Mount.LEFT)


Expand Down Expand Up @@ -188,8 +188,8 @@ async def run(api: OT3API, report: CSVReport, section: str) -> None:

async def _find_reservoir_pos() -> None:
nonlocal reservoir_a1_actual
if reservoir_a1_actual:
return
# if reservoir_a1_actual: # re-find reservoir position for 5ul
# return
# SAVE RESERVOIR POSITION
ui.print_header("JOG to TOP of RESERVOIR")
print("jog tips to the TOP of the RESERVOIR")
Expand All @@ -199,29 +199,37 @@ async def _find_reservoir_pos() -> None:
await helpers_ot3.jog_mount_ot3(api, OT3Mount.LEFT)
reservoir_a1_actual = await api.gantry_position(OT3Mount.LEFT)

# PICK-UP 96 TIPS
ui.print_header("JOG to 96-Tip RACK")
if not api.is_simulator:
ui.get_user_ready(f"ADD 96 tip-rack to slot #{TIP_RACK_96_SLOT}")
await helpers_ot3.move_to_arched_ot3(
api, OT3Mount.LEFT, tip_rack_96_a1_nominal + Point(z=30)
)
await helpers_ot3.jog_mount_ot3(api, OT3Mount.LEFT)
print("picking up tips")
await api.pick_up_tip(OT3Mount.LEFT, helpers_ot3.get_default_tip_length(TIP_VOLUME))
await api.home_z(OT3Mount.LEFT)
if not api.is_simulator:
ui.get_user_ready("about to move to RESERVOIR")

# TEST DROPLETS for 96 TIPS
ui.print_header("96 Tips: ASPIRATE and WAIT")
await _find_reservoir_pos()
assert reservoir_a1_actual
result, duration = await aspirate_and_wait(
api, reservoir_a1_actual, seconds=NUM_SECONDS_TO_WAIT
)
result = True
for test_volume in ASPIRATE_VOLUME:
answer = ui.get_user_answer(f"Test {test_volume}uL")
if not answer:
continue
tip_volume = 50 if test_volume<=50 else 1000
# PICK-UP 96 TIPS
ui.print_header("JOG to 96-Tip RACK")
if not api.is_simulator:
ui.get_user_ready(f"picking up tips, place tip-rack {tip_volume} on slot {TIP_RACK_96_SLOT}")
await helpers_ot3.move_to_arched_ot3(
api, OT3Mount.LEFT, tip_rack_96_a1_nominal + Point(z=30)
)
await helpers_ot3.jog_mount_ot3(api, OT3Mount.LEFT)

await api.pick_up_tip(OT3Mount.LEFT, helpers_ot3.get_default_tip_length(tip_volume))
await api.home_z(OT3Mount.LEFT)
if not api.is_simulator:
ui.get_user_ready("about to move to RESERVOIR")
# TEST DROPLETS for 96 TIPS
ui.print_header("96 Tips: ASPIRATE and WAIT")
await _find_reservoir_pos()
assert reservoir_a1_actual
ret, duration = await aspirate_and_wait(
api, reservoir_a1_actual, test_volume, seconds=NUM_SECONDS_TO_WAIT
)
result = result&ret
await api.home_z(OT3Mount.LEFT)
await _drop_tip(api, trash_nominal)
report(section, "droplets-96-tips", [duration, CSVResult.from_bool(result)])
await _drop_tip(api, trash_nominal)


# if not api.is_simulator:
# ui.get_user_ready(f"REMOVE 96 tip-rack from slot #{TIP_RACK_96_SLOT}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,22 @@
from hardware_testing.opentrons_api.types import OT3Mount
from hardware_testing.data.csv_report import (
CSVReport,
CSVResult,
CSVLine,
CSVLineRepeating,
)


NUM_SAMPLES = 10
INTER_SAMPLE_DELAY_SECONDS = 0.25
TEMPERATURE_THRESHOLD = [10, 40]
HUMIDITY_THRESHOLD = [10, 90]


def build_csv_lines() -> List[Union[CSVLine, CSVLineRepeating]]:
"""Build CSV Lines."""
return [
CSVLine(f"environment-{sensor_id.name}-celsius-humidity", [float, float])
CSVLine(f"environment-{sensor_id.name}-celsius-humidity", [float, float, CSVResult])
for sensor_id in [SensorId.S0, SensorId.S1]
]

Expand All @@ -43,6 +46,7 @@ async def run(api: OT3API, report: CSVReport, section: str) -> None:
ui.print_header(sensor_id.name.upper())
celsius_samples = []
humidity_samples = []
air_params = True
print(f"averaging {NUM_SAMPLES} samples:")
print("\tc\th")
for _ in range(NUM_SAMPLES):
Expand All @@ -60,8 +64,10 @@ async def run(api: OT3API, report: CSVReport, section: str) -> None:
humidity = _remove_outliers_and_average(humidity_samples)
print(f"[{sensor_id.name}] Celsius = {round(celsius, 2)} degrees")
print(f"[{sensor_id.name}] Humidity = {round(humidity, 2)} percent")
air_params = air_params & True if TEMPERATURE_THRESHOLD[0] <= celsius <= TEMPERATURE_THRESHOLD[1] and HUMIDITY_THRESHOLD[0] <= humidity <= HUMIDITY_THRESHOLD[1] else False

report(
section,
f"environment-{sensor_id.name}-celsius-humidity",
[celsius, humidity],
[celsius, humidity, CSVResult.from_bool(air_params)]
)
Loading

0 comments on commit b2a8545

Please sign in to comment.