Skip to content

Commit

Permalink
Merge pull request #97 from iory/rcb4-ros-bridge
Browse files Browse the repository at this point in the history
Refactor Rcb4 ros bridge
  • Loading branch information
iory authored Oct 31, 2024
2 parents ce474d7 + ae9155d commit 47c677b
Show file tree
Hide file tree
Showing 15 changed files with 748 additions and 523 deletions.
45 changes: 45 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
[tool.ruff]
target-version = "py38"
line-length = 90

# See https://github.com/charliermarsh/ruff#rules for error code definitions.

[tool.ruff.lint]
select = [
# "ANN", # annotations
"B", # bugbear
"C", # comprehensions
"E", # style errors
"F", # flakes
"I", # import sorting
"RUF", # ruff specific rules
"UP", # upgrade
"W", # style warnings
"YTT", # sys.version
"ISC002",
"NPY201",
"TID251"
]

ignore = [
"C901", # Comprehension is too complex (11 > 10)
"N802", # Function name should be lowercase
"N806", # Variable in function should be lowercase
"E501", # Line too long ({width} > {limit} characters)
"B904", # raise ... from err
"B905", # zip() without an explicit strict= parameter
"RUF005", # recommends non-type-aware expansion
"UP008",
]

# don't allow implicit string concatenation
flake8-implicit-str-concat = {"allow-multiline" = false}

[tool.ruff.lint.isort]
force-single-line = true
force-sort-within-sections = true
order-by-type = false

[tool.ruff.lint.flake8-tidy-imports.banned-api]
"IPython.embed".msg = "you forgot to remove a debug embed ;)"
"numpy.empty".msg = "uninitialized arrays are haunted try numpy.zeros"
81 changes: 50 additions & 31 deletions rcb4/armh7interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ def adjust_angle_vector(self, servo_ids=None, error_threshold=None):
# Ignore free servo
servo_ids = servo_ids[self.reference_angle_vector(servo_ids=servo_ids) != 32768]
if len(servo_ids) == 0:
return
return False

# Calculate error threshold[deg]
if error_threshold is None:
Expand Down Expand Up @@ -566,9 +566,7 @@ def _send_angle_vector(self, av, servo_ids=None, velocity=127):
dtype=np.float32,
)
self._worm_ref_angle[np.array(worm_indices)] = np.array(worm_av)
self.write_cstruct_slot_v(
WormmoduleStruct, "ref_angle", self._worm_ref_angle
)
self.write_cstruct_slot_v(WormmoduleStruct, "ref_angle", self._worm_ref_angle)
svs = self.angle_vector_to_servo_angle_vector(av, servo_ids)
return self.servo_angle_vector(servo_ids, svs, velocity=velocity)

Expand Down Expand Up @@ -633,7 +631,7 @@ def _set_trim_vector(self, trim_vector=None, servo_ids=None):
return
current_trim_vector = self._trim_servo_vector()
current_trim_vector[np.array(servo_ids)] = trim_vector
self.write_cstruct_slot_v(ServoStruct, "trim", current_trim_vector)
return self.write_cstruct_slot_v(ServoStruct, "trim", current_trim_vector)

def trim_vector(self, av=None, servo_ids=None):
if av is not None:
Expand All @@ -656,14 +654,10 @@ def clear_trim_vector(self, write_to_flash=True):
self.write_to_flash()

def ics_start(self):
return self.set_cstruct_slot(
SystemStruct, 0, "ics_comm_stop", [0, 0, 0, 0, 0, 0]
)
return self.set_cstruct_slot(SystemStruct, 0, "ics_comm_stop", [0, 0, 0, 0, 0, 0])

def ics_stop(self):
return self.set_cstruct_slot(
SystemStruct, 0, "ics_comm_stop", [1, 1, 1, 1, 1, 1]
)
return self.set_cstruct_slot(SystemStruct, 0, "ics_comm_stop", [1, 1, 1, 1, 1, 1])

def idmode_scan(self):
self.ics_stop()
Expand Down Expand Up @@ -703,6 +697,13 @@ def search_servo_ids(self):
self._servo_id_to_sequentialized_servo_id[servo_indices] = np.arange(
len(servo_indices)
)
servo_on_states = self.servo_states()
self.servo_on_states_dict = {}
for index in servo_indices:
if index in servo_on_states:
self.servo_on_states_dict[index] = True
else:
self.servo_on_states_dict[index] = False
return servo_indices

def search_air_board_ids(self):
Expand All @@ -716,9 +717,7 @@ def search_air_board_ids(self):
return np.array(air_board_ids)

def valid_servo_ids(self, servo_ids):
return np.isfinite(
self._servo_id_to_sequentialized_servo_id[np.array(servo_ids)]
)
return np.isfinite(self._servo_id_to_sequentialized_servo_id[np.array(servo_ids)])

def hold(self, servo_ids=None):
if servo_ids is None:
Expand Down Expand Up @@ -777,10 +776,34 @@ def servo_angle_vector(self, servo_ids, servo_vector, velocity=127):
if len(servo_ids) != len(servo_vector):
raise ValueError("Length of servo_ids and servo_vector must be the same.")

# Update servo on/off states based on 32767 and 32768 values in servo_vector
for servo_id, angle in zip(servo_ids, servo_vector):
if angle == 32767:
self.servo_on_states_dict[servo_id] = True
elif angle == 32768:
self.servo_on_states_dict[servo_id] = False

# Filter servo IDs based on their on state in servo_on_states_dict
active_ids = []
active_angles = []
for servo_id, angle in zip(servo_ids, servo_vector):
if self.servo_on_states_dict.get(servo_id, False) or angle in (
32767,
32768,
):
# Only include active servos
active_ids.append(servo_id)
active_angles.append(angle)

# If no active servos, skip command sending
if not active_ids:
# print("[servo_angle_vector] No active servos to send commands.")
return

# Sort the servo vectors based on servo IDs
sorted_indices = np.argsort(servo_ids)
sorted_servo_ids = np.array(servo_ids)[sorted_indices]
sorted_servo_vector = np.array(servo_vector)[sorted_indices]
sorted_indices = np.argsort(active_ids)
sorted_servo_ids = np.array(active_ids)[sorted_indices]
sorted_servo_vector = np.array(active_angles)[sorted_indices]

# Prepare the command byte list
if (
Expand Down Expand Up @@ -1045,9 +1068,7 @@ def send_worm_calib_data(

# Write the parameters to the wormmodule vector cstruct
if module_type is not None:
self.write_cls_alist(
WormmoduleStruct, worm_idx, "module_type", [module_type]
)
self.write_cls_alist(WormmoduleStruct, worm_idx, "module_type", [module_type])
if servo_idx is not None:
self.write_cls_alist(WormmoduleStruct, worm_idx, "servo_id", [servo_idx])
if sensor_idx is not None:
Expand Down Expand Up @@ -1267,19 +1288,19 @@ def read_pressure_sensor(self, board_idx):

def start_pump(self):
"""Drive pump. There is supposed to be one pump for the entire system."""
self.cfunc_call("pump_switch", True)
return self.cfunc_call("pump_switch", True)

def stop_pump(self):
"""Stop driving pump. There should be one pump for the entire system."""
self.cfunc_call("pump_switch", False)
return self.cfunc_call("pump_switch", False)

def open_air_connect_valve(self):
"""Open valve to release air to atmosphere"""
self.cfunc_call("valve_switch", True)
return self.cfunc_call("valve_switch", True)

def close_air_connect_valve(self):
"""Close valve to shut off air from atmosphere"""
self.cfunc_call("valve_switch", False)
return self.cfunc_call("valve_switch", False)

def gpio_mode(self, board_idx):
"""Return current GPIO mode of air relay board
Expand All @@ -1297,7 +1318,7 @@ def open_work_valve(self, board_idx):
"""
gpio_mode = self.gpio_mode(board_idx)
command = gpio_mode | 0b00000001
self.cfunc_call("gpio_cmd", board_idx, command)
return self.cfunc_call("gpio_cmd", board_idx, command)

def close_work_valve(self, board_idx):
"""Close work valve
Expand All @@ -1306,7 +1327,7 @@ def close_work_valve(self, board_idx):
"""
gpio_mode = self.gpio_mode(board_idx)
command = gpio_mode & 0b11111110
self.cfunc_call("gpio_cmd", board_idx, command)
return self.cfunc_call("gpio_cmd", board_idx, command)

def open_relay_valve(self, board_idx):
"""Open valve to relay air to next work
Expand All @@ -1315,7 +1336,7 @@ def open_relay_valve(self, board_idx):
"""
gpio_mode = self.gpio_mode(board_idx)
command = gpio_mode | 0b00000010
self.cfunc_call("gpio_cmd", board_idx, command)
return self.cfunc_call("gpio_cmd", board_idx, command)

def close_relay_valve(self, board_idx):
"""Close valve to relay air to next work
Expand All @@ -1324,7 +1345,7 @@ def close_relay_valve(self, board_idx):
"""
gpio_mode = self.gpio_mode(board_idx)
command = gpio_mode & 0b11111101
self.cfunc_call("gpio_cmd", board_idx, command)
return self.cfunc_call("gpio_cmd", board_idx, command)

@property
def servo_id_to_worm_id(self):
Expand All @@ -1351,9 +1372,7 @@ def joint_to_actuator_matrix(self):
@property
def actuator_to_joint_matrix(self):
if self._actuator_to_joint_matrix is None:
self._actuator_to_joint_matrix = np.linalg.inv(
self.joint_to_actuator_matrix
)
self._actuator_to_joint_matrix = np.linalg.inv(self.joint_to_actuator_matrix)
return self._actuator_to_joint_matrix

@property
Expand Down
4 changes: 2 additions & 2 deletions rcb4/data/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
from collections import namedtuple
from distutils.version import StrictVersion
import os
import os.path as osp
import subprocess
from distutils.version import StrictVersion

import pkg_resources
from colorama import Fore
from colorama import Style
import gdown
import pkg_resources

data_dir = osp.abspath(osp.dirname(__file__))
_default_cache_dir = osp.expanduser("~/.rcb4")
Expand Down
46 changes: 37 additions & 9 deletions rcb4/rcb4interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,12 +413,18 @@ def search_servo_ids(self):
self._servo_id_to_sequentialized_servo_id[servo_indices] = np.arange(
len(servo_indices)
)

servo_on_states = self.servo_states()
self.servo_on_states_dict = {}
for index in servo_indices:
if index in servo_on_states:
self.servo_on_states_dict[index] = True
else:
self.servo_on_states_dict[index] = False
return servo_indices

def valid_servo_ids(self, servo_ids):
return np.isfinite(
self._servo_id_to_sequentialized_servo_id[np.array(servo_ids)]
)
return np.isfinite(self._servo_id_to_sequentialized_servo_id[np.array(servo_ids)])

def hold(self, servo_ids=None):
if servo_ids is None:
Expand Down Expand Up @@ -473,10 +479,34 @@ def servo_angle_vector(self, servo_ids, servo_vector, velocity=127):
if len(servo_ids) != len(servo_vector):
raise ValueError("Length of servo_ids and servo_vector must be the same.")

# Update servo on/off states based on 32767 and 32768 values in servo_vector
for servo_id, angle in zip(servo_ids, servo_vector):
if angle == 32767:
self.servo_on_states_dict[servo_id] = True
elif angle == 32768:
self.servo_on_states_dict[servo_id] = False

# Filter servo IDs based on their on state in servo_on_states_dict
active_ids = []
active_angles = []
for servo_id, angle in zip(servo_ids, servo_vector):
if self.servo_on_states_dict.get(servo_id, False) or angle in (
32767,
32768,
):
# Only include active servos
active_ids.append(servo_id)
active_angles.append(angle)

# If no active servos, skip command sending
if not active_ids:
# print("[servo_angle_vector] No active servos to send commands.")
return

# Sort the servo vectors based on servo IDs
sorted_indices = np.argsort(servo_ids)
sorted_servo_ids = np.array(servo_ids)[sorted_indices]
sorted_servo_vector = np.array(servo_vector)[sorted_indices]
sorted_indices = np.argsort(active_ids)
sorted_servo_ids = np.array(active_ids)[sorted_indices]
sorted_servo_vector = np.array(active_angles)[sorted_indices]

# Prepare the command byte list
if (
Expand Down Expand Up @@ -609,9 +639,7 @@ def joint_to_actuator_matrix(self):
@property
def actuator_to_joint_matrix(self):
if self._actuator_to_joint_matrix is None:
self._actuator_to_joint_matrix = np.linalg.inv(
self.joint_to_actuator_matrix
)
self._actuator_to_joint_matrix = np.linalg.inv(self.joint_to_actuator_matrix)
return self._actuator_to_joint_matrix

def servo_states(self):
Expand Down
4 changes: 1 addition & 3 deletions rcb4/usb_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,7 @@ def reset_usb_device(port):
vendor_id, product_id = get_vendor_id_and_product_id(port)
# Create a USB context and open the device by vendor ID and product ID
context = usb1.USBContext()
handle = context.openByVendorIDAndProductID(
vendor_id, product_id, skip_on_error=True
)
handle = context.openByVendorIDAndProductID(vendor_id, product_id, skip_on_error=True)
if handle is None:
raise ValueError("Device not found")
# Reset the USB device
Expand Down
Loading

0 comments on commit 47c677b

Please sign in to comment.