From cca7472b312b71f1ce623314f0e79a241c9bc970 Mon Sep 17 00:00:00 2001 From: caila-marashaj Date: Tue, 7 May 2024 13:14:27 -0400 Subject: [PATCH] add tc control script --- .../hardware_control/modules/tc_messenger.py | 41 ---------- .../hardware_control/scripts/tc_control.py | 79 +++++++++++++++++++ 2 files changed, 79 insertions(+), 41 deletions(-) delete mode 100644 api/src/opentrons/hardware_control/modules/tc_messenger.py create mode 100644 api/src/opentrons/hardware_control/scripts/tc_control.py diff --git a/api/src/opentrons/hardware_control/modules/tc_messenger.py b/api/src/opentrons/hardware_control/modules/tc_messenger.py deleted file mode 100644 index e396bf4b3295..000000000000 --- a/api/src/opentrons/hardware_control/modules/tc_messenger.py +++ /dev/null @@ -1,41 +0,0 @@ -import sys -from serial import Serial - -_READ_ALL = "readall" -_READ_LINE = "read" -_LID_AND_SEAL_STATUS = "status" -_MOVE_SEAL = "ms" -_DONE = "done" - - -def comms_loop(dev): - exit = False - command = input("enter a command\n") - if command == _READ_ALL: - print(dev.readlines()) - elif command == _READ_LINE: - print(dev.readline()) - elif command == _LID_AND_SEAL_STATUS: - dev.write("M119\n".encode()) - print(dev.readline()) - elif command == _MOVE_SEAL: - distance = input("enter distance in steps") - dev.write(f"M241.D {distance}\n") - print(dev.readline()) - elif command == _DONE: - exit = True - else: - dev.write(f"{command}\n") - print(dev.readline()) - - return exit - - -def _main(): - dev = Serial('/dev/ot_module_thermocycler0', 9600, timeout=2) - run = True - while run: - run = comms_loop(dev) - -if __name__ == '__main__': - sys.exit(_main()) \ No newline at end of file diff --git a/api/src/opentrons/hardware_control/scripts/tc_control.py b/api/src/opentrons/hardware_control/scripts/tc_control.py new file mode 100644 index 000000000000..d2fd058957b9 --- /dev/null +++ b/api/src/opentrons/hardware_control/scripts/tc_control.py @@ -0,0 +1,79 @@ +from serial import Serial +import asyncio + +_READ_ALL = "readall" +_READ_LINE = "read" +_DONE = "done" +_MOVE_SEAL = "ms" +_MOVE_LID = "ml" + +gcode_shortcuts = { + "status": "M119", + _MOVE_SEAL: "M241.D", # move seal motor + _MOVE_LID: "M240.D", # move lid stepper motor + "ol": "M126", # open lid + "cl": "M127", # close lid +} + + +async def message_read(dev): + response = dev.readline().decode() + while not response: + await asyncio.sleep(1) + response = dev.readline().decode() + return response + + +async def message_return(dev): + try: + response = await asyncio.wait_for(message_read(dev), timeout=20) + return response + except asyncio.exceptions.TimeoutError: + print("response timed out.") + return '' + + +async def handle_gcode_shortcut(dev, command): + # handle debugging commands that require followup + if command == _MOVE_SEAL: + distance = input("enter distance in steps => ") + dev.write(f"{gcode_shortcuts[command]} {distance}\n".encode()) # (+) -> retract, (-) -> engage + print(await message_return(dev)) + elif command == _MOVE_LID: + distance = input("enter angular distance in degrees => ") # (+) -> open, (-) -> close + dev.write(f"{gcode_shortcuts[command]} {distance}\n".encode()) + print(await message_return(dev)) + # everything else + else: + dev.write(f"{gcode_shortcuts[command]}\n".encode()) + print(await message_return(dev)) + + +async def comms_loop(dev): + _exit = False + command = input("\n>>> ") + if command == _READ_ALL: + print(dev.readlines()) + elif command == _READ_LINE: + print(dev.readline()) + elif command == _DONE: + _exit = True + elif command in gcode_shortcuts: + await handle_gcode_shortcut(dev, command) + else: + try: + dev.write(f"{command}\n") + print(await message_return(dev)) + except TypeError: + print("Invalid input.") + return _exit + + +async def _main(): + dev = Serial('/dev/ot_module_thermocycler1', 9600, timeout=2) + _exit = False + while not _exit: + _exit = await comms_loop(dev) + +if __name__ == '__main__': + asyncio.run(_main())