Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Design highlevel API #17

Merged
merged 9 commits into from
Mar 10, 2025
2 changes: 1 addition & 1 deletion schunk_gripper_driver/launch/driver.launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

port = DeclareLaunchArgument(
"port",
default_value="/dev/ur-ttylink/ttyTool",
default_value="/dev/ttyUSB0",
description="The gripper's serial port",
)
device_id = DeclareLaunchArgument(
Expand Down
1 change: 1 addition & 0 deletions schunk_gripper_driver/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
<depend>launch</depend>
<depend>launch_ros</depend>
<depend>schunk_gripper_library</depend>
<depend>std_srvs</depend>

<test_depend>launch_pytest</test_depend>
<test_depend>ament_copyright</test_depend>
Expand Down
50 changes: 3 additions & 47 deletions schunk_gripper_driver/schunk_gripper_driver/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from rclpy.lifecycle import TransitionCallbackReturn
from schunk_gripper_library.driver import Driver as GripperDriver
from std_srvs.srv import Trigger
import time
import asyncio


class Driver(Node):
Expand Down Expand Up @@ -87,62 +87,18 @@ def on_shutdown(self, state: State) -> TransitionCallbackReturn:
return TransitionCallbackReturn.SUCCESS

def status_update(self):
self.gripper.receive_plc_input()
self.get_logger().info(f"---> Status update: {self.gripper.get_plc_input()}")

# Service callbacks
def _acknowledge_cb(self, request: Trigger.Request, response: Trigger.Response):
self.get_logger().info("---> Acknowledge")
# Reset
self.gripper.clear_plc_output()
self.gripper.set_control_bit(bit=0, value=True)
self.gripper.send_plc_output()
time.sleep(0.5)
# Acknowledge
self.gripper.set_control_bit(bit=2, value=True)
for bit in self.gripper.valid_control_bits:
self.get_logger().info(
f"---> Control bit {bit}: {self.gripper.get_control_bit(bit=bit)}"
)
cmd_before = self.gripper.get_status_bit(bit=5)
self.gripper.send_plc_output()
time.sleep(0.5)
self.gripper.receive_plc_input()
cmd_after = self.gripper.get_status_bit(bit=5)
self.get_logger().info(f"---> PLC input: {self.gripper.get_plc_input()}")
for bit in self.gripper.valid_status_bits:
self.get_logger().info(
f"---> Status bit {bit}: {self.gripper.get_status_bit(bit=bit)}"
)

response.success = self.gripper.get_status_bit(bit=0) == 1 and (
cmd_after != cmd_before
)
response.success = asyncio.run(self.gripper.acknowledge())
response.message = self.gripper.get_status_diagnostics()
return response

def _fast_stop_cb(self, request: Trigger.Request, response: Trigger.Response):
self.get_logger().info("---> Fast stop")
self.gripper.clear_plc_output()
self.gripper.set_control_bit(bit=0, value=False)
for bit in self.gripper.valid_control_bits:
self.get_logger().info(
f"---> Control bit {bit}: {self.gripper.get_control_bit(bit=bit)}"
)
cmd_before = self.gripper.get_status_bit(bit=5)
self.gripper.send_plc_output()
time.sleep(0.5)
self.gripper.receive_plc_input()
cmd_after = self.gripper.get_status_bit(bit=5)
self.get_logger().info(f"---> PLC input: {self.gripper.get_plc_input()}")
for bit in self.gripper.valid_status_bits:
self.get_logger().info(
f"---> Status bit {bit}: {self.gripper.get_status_bit(bit=bit)}"
)

response.success = self.gripper.get_status_bit(bit=4) == 1 and (
cmd_after != cmd_before
)
response.success = asyncio.run(self.gripper.fast_stop())
response.message = self.gripper.get_status_diagnostics()
return response

Expand Down
10 changes: 6 additions & 4 deletions schunk_gripper_driver/test/test_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,13 @@ def test_driver_implements_acknowledge(lifecycle_interface):
client.wait_for_service(timeout_sec=2)
future = client.call_async(Trigger.Request())
rclpy.spin_until_future_complete(node, future)
lifecycle_interface.change_state(Transition.TRANSITION_CLEANUP)

assert future.result().success
expected_msg = "error_code: 0, warning_code: 0, additional_code: 0" # everything ok
expected_msg = (
"error_code: 0x0, warning_code: 0x0, additional_code: 0x0" # everything ok
)
assert future.result().message == expected_msg
lifecycle_interface.change_state(Transition.TRANSITION_CLEANUP)


@skip_without_gripper
Expand All @@ -65,8 +67,8 @@ def test_driver_implements_fast_stop(lifecycle_interface):
client.wait_for_service(timeout_sec=2)
future = client.call_async(Trigger.Request())
rclpy.spin_until_future_complete(node, future)
lifecycle_interface.change_state(Transition.TRANSITION_CLEANUP)

assert future.result().success
expected_msg = "error_code: 0, warning_code: 0, additional_code: 0" # everything ok
expected_msg = "error_code: 0xD9, warning_code: 0x0, additional_code: 0x0"
assert future.result().message == expected_msg
lifecycle_interface.change_state(Transition.TRANSITION_CLEANUP)
88 changes: 83 additions & 5 deletions schunk_gripper_library/schunk_gripper_library/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
from pymodbus.client import ModbusSerialClient
from pymodbus.pdu import ModbusPDU
import re
from threading import Thread
import asyncio
import time


class Driver(object):
Expand Down Expand Up @@ -31,8 +34,16 @@ def __init__(self) -> None:
self.mb_client: ModbusSerialClient | None = None
self.mb_device_id: int | None = None
self.connected: bool = False

def connect(self, protocol: str, port: str, device_id: int | None = None) -> bool:
self.polling_thread: Thread = Thread()
self.update_cycle: float = 0.05 # sec

def connect(
self,
protocol: str,
port: str,
device_id: int | None = None,
update_cycle: float = 0.05,
) -> bool:
if protocol not in ["modbus"]:
return False
if not isinstance(port, str):
Expand All @@ -43,6 +54,8 @@ def connect(self, protocol: str, port: str, device_id: int | None = None) -> boo
return False
if isinstance(device_id, int) and device_id < 0:
return False
if update_cycle < 0.001:
return False
if self.connected:
return False

Expand All @@ -58,14 +71,55 @@ def connect(self, protocol: str, port: str, device_id: int | None = None) -> boo
trace_pdu=self._trace_pdu,
)
self.connected = self.mb_client.connect()

if self.connected:
self.update_cycle = update_cycle
self.polling_thread = Thread(
target=asyncio.run,
args=(self._module_update(self.update_cycle),),
daemon=True,
)
self.polling_thread.start()

return self.connected

def disconnect(self) -> bool:
self.connected = False
if self.polling_thread.is_alive():
self.polling_thread.join()

if self.mb_client and self.mb_client.connected:
self.mb_client.close()
self.connected = False
return True

async def acknowledge(self) -> bool:
if not self.connected:
return False
self.clear_plc_output()
self.send_plc_output()

cmd_toggle_before = self.get_status_bit(bit=5)
self.set_control_bit(bit=2, value=True)
self.send_plc_output()

desired_bits = {"0": 1, "5": cmd_toggle_before ^ 1}
return await self.wait_for_status(bits=desired_bits)

async def fast_stop(self) -> bool:
if not self.connected:
return False
self.clear_plc_output()
self.send_plc_output()

cmd_toggle_before = self.get_status_bit(bit=5)
self.set_control_bit(
bit=0, value=False
) # activate fast stop (inverted behavior)
self.send_plc_output()
desired_bits = {"5": cmd_toggle_before ^ 1, "7": 1}
print(f"hello stefan: {self.get_plc_input()}")
return await self.wait_for_status(bits=desired_bits)

def send_plc_output(self) -> bool:
if self.mb_client and self.mb_client.connected:
with self.output_buffer_lock:
Expand Down Expand Up @@ -102,6 +156,23 @@ def receive_plc_input(self) -> bool:
return True
return False

async def wait_for_status(
self, bits: dict[str, int] = {}, timeout_sec: float = 1.0
) -> bool:
if not timeout_sec > 0.0:
return False
if not bits:
return False
max_duration = time.time() + timeout_sec
while not all(
[self.get_status_bit(int(bit)) == value for bit, value in bits.items()]
):
await asyncio.sleep(0.001)
self.receive_plc_input()
if time.time() > max_duration:
return False
return True

def contains_non_hex_chars(self, buffer: str) -> bool:
return bool(re.search(r"[^0-9a-fA-F]", buffer))

Expand Down Expand Up @@ -132,8 +203,10 @@ def get_plc_output(self) -> str:
return self.plc_output_buffer.hex().upper()

def clear_plc_output(self) -> None:
with self.output_buffer_lock:
self.plc_output_buffer = bytearray(bytes.fromhex("00" * 16))
self.set_plc_output("00" * 16)
self.set_control_bit(
bit=0, value=True
) # deactivate fast stop (inverted behavior)

def set_control_bit(self, bit: int, value: bool) -> bool:
with self.output_buffer_lock:
Expand Down Expand Up @@ -251,6 +324,11 @@ def _set_status_bit(self, bit: int, value: bool) -> bool:
self.plc_input_buffer[byte_index] &= ~(1 << bit_index)
return True

async def _module_update(self, update_cycle: float) -> None:
while self.connected:
self.receive_plc_input()
await asyncio.sleep(update_cycle)

def _trace_packet(self, sending: bool, data: bytes) -> bytes:
txt = "REQUEST stream" if sending else "RESPONSE stream"
print(f"---> {txt}: {data!r}")
Expand Down
43 changes: 43 additions & 0 deletions schunk_gripper_library/tests/test_behavior.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from ..schunk_gripper_library.driver import Driver
from ..tests.conftest import skip_without_gripper
import asyncio


@skip_without_gripper
def test_acknowledge():
driver = Driver()

# Not connected
assert not asyncio.run(driver.acknowledge())

# Connected
driver.connect("modbus", "/dev/ttyUSB0", 12)
assert asyncio.run(driver.acknowledge())

# Repetitive
for _ in range(5):
assert asyncio.run(driver.acknowledge())

driver.disconnect()


@skip_without_gripper
def test_fast_stop():
driver = Driver()

# Not connected
assert not asyncio.run(driver.fast_stop())

# After fresh start
driver.connect("modbus", "/dev/ttyUSB0", 12)
assert asyncio.run(driver.fast_stop())

# From operational
assert asyncio.run(driver.acknowledge())
assert asyncio.run(driver.fast_stop())

# Repetitive
for _ in range(5):
assert asyncio.run(driver.fast_stop())

driver.disconnect()
Loading