Skip to content

Commit

Permalink
Merge pull request #29 from iory/refactor
Browse files Browse the repository at this point in the history
Refactoring to support the RCB4-mini board.
  • Loading branch information
iory authored Feb 7, 2024
2 parents 2282292 + 1af678f commit f2896d8
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 81 deletions.
14 changes: 6 additions & 8 deletions rcb4/armh7interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ def open(self, port='/dev/ttyUSB1', baudrate=1000000, timeout=0.01):
self.copy_worm_params_from_flash()
self.search_worm_ids()
self.search_servo_ids()
self.all_jointbase_sensors()
return True

def auto_open(self):
Expand Down Expand Up @@ -413,11 +414,9 @@ def servo_error(self, servo_ids=None):
ServoStruct, slot_name='error_angle')[servo_ids]
return error_angles

def servo_id_to_index(self, servo_ids=None):
if servo_ids is None:
servo_ids = self.search_servo_ids()
return {id: i
for i, id in enumerate(servo_ids)}
def servo_id_to_index(self, servo_id):
if self.valid_servo_ids([servo_id]):
return self.sequentialized_servo_ids([servo_id])[0]

def sequentialized_servo_ids(self, servo_ids):
if len(servo_ids) == 0:
Expand Down Expand Up @@ -462,12 +461,11 @@ def angle_vector(self, av=None, servo_ids=None, velocity=127):
return np.empty(shape=0)
av = np.append(self._angle_vector()[all_servo_ids], 1)
av = np.matmul(av.T, self.actuator_to_joint_matrix.T)[:-1]
id_to_index = self.servo_id_to_index(all_servo_ids)
worm_av = self.read_cstruct_slot_vector(
WormmoduleStruct, slot_name='present_angle')
for worm_idx in self.search_worm_ids():
av[id_to_index[
self.worm_id_to_servo_id[worm_idx]]] = worm_av[worm_idx]
av[self.servo_id_to_index(
self.worm_id_to_servo_id[worm_idx])] = worm_av[worm_idx]
if servo_ids is not None:
if len(servo_ids) == 0:
return np.empty(shape=0)
Expand Down
141 changes: 68 additions & 73 deletions ros/kxr_controller/node_scripts/rcb4_ros_bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,80 +130,36 @@ def __init__(self):
JointState,
queue_size=1)

self.arm = ARMH7Interface()
arm = self.arm
ret = arm.auto_open()
self.interface = ARMH7Interface()
ret = self.interface.auto_open()
if ret is not True:
rospy.logerr('Could not open port!')
sys.exit(1)
arm.all_jointbase_sensors()
# arm.send_stretch(40)
self.id_to_index = self.arm.servo_id_to_index()
self._prev_velocity_command = None

print(arm.joint_to_actuator_matrix)
print(arm._actuator_to_joint_matrix)
for _, info in servo_infos.items():
if isinstance(info, int):
continue
servo_id = info['id']
direction = info['direction']
if servo_id not in self.id_to_index:
idx = self.interface.servo_id_to_index(servo_id)
if idx is None:
continue
idx = self.id_to_index[servo_id]
arm._joint_to_actuator_matrix[idx, idx] = \
direction * arm._joint_to_actuator_matrix[idx, idx]
print(arm.joint_to_actuator_matrix)
self.interface._joint_to_actuator_matrix[idx, idx] = \
direction * self.interface._joint_to_actuator_matrix[idx, idx]

self.fullbody_jointnames = []
for jn in self.joint_names:
if jn not in self.joint_name_to_id:
continue
servo_id = self.joint_name_to_id[jn]
if servo_id in arm.wheel_servo_sorted_ids:
continue
self.fullbody_jointnames.append(jn)
set_fullbody_controller(self.fullbody_jointnames)
# set_fullbody_controller(self.joint_names)
print('Fullbody jointnames')
print(self.fullbody_jointnames)
self.servo_id_to_worm_id = self.interface.servo_id_to_worm_id
self.set_fullbody_controller(clean_namespace)
self.set_initial_positions(clean_namespace)
self.check_servo_states()

self.servo_id_to_worm_id = self.arm.servo_id_to_worm_id

self.joint_servo_on = {jn: False for jn in self.joint_names}
servo_on_states = arm.servo_states()
for jn in self.joint_names:
if jn not in self.joint_name_to_id:
continue
idx = self.joint_name_to_id[jn]
if idx in servo_on_states:
self.joint_servo_on[jn] = True
else:
self.joint_servo_on[jn] = False
print(self.joint_servo_on)
print(self.id_to_index)

initial_positions = {}
init_av = arm.angle_vector()
arm.servo_id_to_index()
for jn in self.joint_names:
if jn not in self.joint_name_to_id:
continue
servo_id = self.joint_name_to_id[jn]
if servo_id in arm.wheel_servo_sorted_ids:
continue
if servo_id not in self.id_to_index:
continue
initial_positions[jn] = float(
np.deg2rad(init_av[self.id_to_index[servo_id]]))
rospy.loginfo('run kxr_controller')
set_initial_position(initial_positions, namespace=clean_namespace)
self.proc_kxr_controller = run_kxr_controller(
namespace=clean_namespace)

self.worm_servo_ids = [
arm.memory_cstruct(WormmoduleStruct, idx).servo_id
for idx in arm.worm_sorted_ids]
self.interface.memory_cstruct(WormmoduleStruct, idx).servo_id
for idx in self.interface.worm_sorted_ids]

self.command_joint_state_sub = rospy.Subscriber(
clean_namespace + '/command_joint_state',
Expand Down Expand Up @@ -251,6 +207,45 @@ def unsubscribe(self):
self.command_joint_state_sub.unregister()
self.velocity_command_joint_state_sub.unregister()

def check_servo_states(self):
self.joint_servo_on = {jn: False for jn in self.joint_names}
servo_on_states = self.interface.servo_states()
for jn in self.joint_names:
if jn not in self.joint_name_to_id:
continue
idx = self.joint_name_to_id[jn]
if idx in servo_on_states:
self.joint_servo_on[jn] = True
else:
self.joint_servo_on[jn] = False

def set_fullbody_controller(self, clean_namespace):
self.fullbody_jointnames = []
for jn in self.joint_names:
if jn not in self.joint_name_to_id:
continue
servo_id = self.joint_name_to_id[jn]
if servo_id in self.interface.wheel_servo_sorted_ids:
continue
self.fullbody_jointnames.append(jn)
set_fullbody_controller(self.fullbody_jointnames)

def set_initial_positions(self, clean_namespace):
initial_positions = {}
init_av = self.interface.angle_vector()
for jn in self.joint_names:
if jn not in self.joint_name_to_id:
continue
servo_id = self.joint_name_to_id[jn]
if servo_id in self.interface.wheel_servo_sorted_ids:
continue
idx = self.interface.servo_id_to_index(servo_id)
if idx is None:
continue
initial_positions[jn] = float(
np.deg2rad(init_av[idx]))
set_initial_position(initial_positions, namespace=clean_namespace)

def _msg_to_angle_vector_and_servo_ids(
self, msg,
velocity_control=False):
Expand All @@ -263,10 +258,10 @@ def _msg_to_angle_vector_and_servo_ids(
continue
idx = self.joint_name_to_id[name]
if velocity_control:
if idx not in self.arm.wheel_servo_sorted_ids:
if idx not in self.interface.wheel_servo_sorted_ids:
continue
else:
if idx in self.arm.wheel_servo_sorted_ids:
if idx in self.interface.wheel_servo_sorted_ids:
continue
# should ignore duplicated index.
if idx in used_servo_id:
Expand All @@ -276,7 +271,7 @@ def _msg_to_angle_vector_and_servo_ids(
servo_ids.append(idx)
angle_vector = np.array(angle_vector)
servo_ids = np.array(servo_ids, dtype=np.int32)
valid_indices = self.arm.valid_servo_ids(servo_ids)
valid_indices = self.interface.valid_servo_ids(servo_ids)
return angle_vector[valid_indices], servo_ids[valid_indices]

def velocity_command_joint_state_callback(self, msg):
Expand All @@ -289,7 +284,7 @@ def velocity_command_joint_state_callback(self, msg):
return
self._prev_velocity_command = av
try:
self.arm.angle_vector(av, servo_ids, velocity=0)
self.interface.angle_vector(av, servo_ids, velocity=0)
except RuntimeError as e:
self.unsubscribe()
rospy.signal_shutdown('Disconnected {}.'.format(e))
Expand All @@ -300,7 +295,7 @@ def command_joint_state_callback(self, msg):
if len(av) == 0:
return
try:
self.arm.angle_vector(av, servo_ids, velocity=1)
self.interface.angle_vector(av, servo_ids, velocity=1)
except RuntimeError as e:
self.unsubscribe()
rospy.signal_shutdown('Disconnected {}.'.format(e))
Expand All @@ -320,7 +315,8 @@ def servo_on_off_callback(self, goal):
servo_vector.append(32768)
self.joint_servo_on[joint_name] = False
try:
self.arm.servo_angle_vector(servo_ids, servo_vector, velocity=1)
self.interface.servo_angle_vector(
servo_ids, servo_vector, velocity=1)
except RuntimeError as e:
self.unsubscribe()
rospy.signal_shutdown('Disconnected {}.'.format(e))
Expand All @@ -330,7 +326,7 @@ def create_imu_message(self):
msg = sensor_msgs.msg.Imu()
msg.header.frame_id = self.imu_frame_id
msg.header.stamp = rospy.Time.now()
q_wxyz, acc, gyro = self.arm.read_imu_data()
q_wxyz, acc, gyro = self.interface.read_imu_data()
(msg.orientation.w, msg.orientation.x,
msg.orientation.y, msg.orientation.z) = q_wxyz
(msg.angular_velocity.x, msg.angular_velocity.y,
Expand All @@ -343,13 +339,13 @@ def run(self):
rate = rospy.Rate(100)

while not rospy.is_shutdown():
if self.arm.is_opened() is False:
if self.interface.is_opened() is False:
self.unsubscribe()
rospy.signal_shutdown('Disconnected.')
break
try:
av = self.arm.angle_vector()
torque_vector = self.arm.servo_error()
av = self.interface.angle_vector()
torque_vector = self.interface.servo_error()
except RuntimeError as e:
self.unsubscribe()
rospy.signal_shutdown('Disconnected {}.'.format(e))
Expand All @@ -359,13 +355,12 @@ def run(self):
for name in self.joint_names:
if name in self.joint_name_to_id:
servo_id = self.joint_name_to_id[name]
if servo_id in self.id_to_index:
msg.position.append(
np.deg2rad(
av[self.id_to_index[servo_id]]))
msg.effort.append(
torque_vector[self.id_to_index[servo_id]])
msg.name.append(name)
idx = self.interface.servo_id_to_index(servo_id)
if idx is None:
continue
msg.position.append(np.deg2rad(av[idx]))
msg.effort.append(torque_vector[idx])
msg.name.append(name)
self.current_joint_states_pub.publish(msg)

if self.publish_imu and self.imu_publisher.get_num_connections():
Expand Down

0 comments on commit f2896d8

Please sign in to comment.