Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support test for RCB4 mini board #32

Merged
merged 3 commits into from
Feb 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
304 changes: 304 additions & 0 deletions rcb4/rcb4interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,67 @@ class SubMoveCmd(Enum):
ComToDevice = 0x12


class RamAddr(Enum):
# System configuration RAM address
ConfigRamAddress = 0x0000
# Program counter configuration address
ProgramCounterRamAddress = 0x0002
# Address of the value of ADC converter 0
AdcRamAddress = 0x0022
# PIO input/output configuration
PioModeAddress = 0x0038
# PIO port value
PioAddress = 0x003A
# Address where KRR button data is recorded
KrrButtonDataAddress = 0x0350
# PA1 data (followed by data in increments of 2 bytes)
KrrPa1Address = 0x0352
# (Data continues in increments of 2 bytes)
CounterRamAddress = 0x0457
# (Data continues in increments of 2 bytes)
UserParameterRamAddress = 0x0462


class RomAddr(Enum):
# Address of the command to play the startup motion
StartupCmdRomAddress = 0x0444
# The first address where the main loop starts running
MainLoopCmd = 0x044B
# The starting address where motion data is stored
MotionRomAddress = 0x0b80


class ConfigData(Enum):
EnableSio = 0x0001 # ICS switch
EnableRunEeprom = 0x0002 # EEPROM program execution switch
# Interpolation operation completion message switch
EnableServoResponse = 0x0004
EnableReferenceTable = 0x0008 # Vector jump switch
Frame = 0x0030 # Output cycle register [4:5]
Baudrate = 0x00c0 # COM baud rate register [6:7] (check for conflicts)
ZeroFlag = 0x0100 # Zero flag
CarrayFlag = 0x0200 # Carry flag
ProgramError = 0x0400 # Program error flag
RFU = 0x1800 # Reserved for future use [11:12]
# ICS switch baud rate register [13:14] (check for conflicts)
IcsBaudrate = 0x6000
GreenLED = 0x8000 # LED register (green)


rcb4_dof = 36 # servo 35 + 1


class RCB4Interface(object):

# The number of data bytes per motion
MotionSingleDataCount = 2048

# The maximum number of motions
MaxMotionCount = 120

# The maximum total byte count of motion data
MotionDataCount = 2048 * 120

def __init__(self):
self.lock = Lock()
self.serial = None
Expand Down Expand Up @@ -92,6 +148,7 @@ def open(self, port='/dev/ttyUSB0',
if ack is not True:
return False
self.check_firmware_version()
self._config_data = self.get_config()
self.search_servo_ids()
return True

Expand Down Expand Up @@ -212,6 +269,34 @@ def move_ram_to_com_command(scr_addr, src_data_size):
return_data_size = src_data_size + 3 # Total bytes of received data
return return_data_size, byte_list

def write_move_ram_to_com_command(self, src_addr, src_data_size):
_, byte_list = self.move_ram_to_com_command(src_addr, src_data_size)
return self.serial_write(byte_list)[1:]

@staticmethod
def move_com_to_ram_command(dest_addr, dest_data):
if len(dest_data) > 249:
return 0, []
txbuf = []
txbuf.append((len(dest_data) + 7) & 0xff)
txbuf.append(CommandTypes.Move.value)
txbuf.append(SubMoveCmd.ComToRam.value)
txbuf.append(dest_addr & 0xff)
txbuf.append((dest_addr >> 8) & 0xff)
txbuf.append(0x00)
if len(dest_data) == 1:
txbuf.append(dest_data[0])
else:
for i in range(len(dest_data)):
txbuf.append(dest_data[i])
txbuf.append(rcb4_checksum(txbuf))
return_data_size = 4 # return ACK
return return_data_size, txbuf

def write_move_com_to_ram_command(self, dest_addr, dest_data):
_, byte_list = self.move_com_to_ram_command(dest_addr, dest_data)
return self.serial_write(byte_list)[1:]

def reference_angle_vector(self, servo_ids=None):
if servo_ids is None:
servo_ids = self.search_servo_ids()
Expand Down Expand Up @@ -533,6 +618,225 @@ def servo_states(self):
return servo_on_ids
return []

def get_config(self):
rxbuf = self.write_move_ram_to_com_command(
RamAddr.ConfigRamAddress.value, 2)
if len(rxbuf) != 2:
return 0xFFFF
else:
return rxbuf[1] * 256 + rxbuf[0]

def call_command(self, rom_addr):
"""Creates a command to call a specified ROM address.

This method generates a command array to be sent to a device,
instructing it to perform a jump to the specified ROM address.
This function is static and can be accessed externally without
an instance of the class.

Parameters
----------
rom_addr : int
The target ROM address to call.

Returns
-------
tuple
return_data_size : int
The size of the data expected to be returned by the command.
txbuf : list of int
The list representing the entire command to be sent,
including the command to call the ROM address, the address
itself, and the checksum.

Notes
-----
The command created by this function does not check conditions before
calling the specified address.

Warnings
--------
The function calls the specified address unconditionally.
"""
buf = [0x07, CommandTypes.Call.value, rom_addr & 0xff,
(rom_addr >> 8) & 0xff, (rom_addr >> 16) & 0xff, 0x00]
buf.append(rcb4_checksum(buf))
return 4, buf

def get_motion_play_number(self):
"""Retrieves the number of the motion currently being played.

Returns
-------
int
- The number of the motion currently being played.
- 0 if no motion is being played.
- -1 in case of communication failure.
- -2 if the playback location is abnormal.

Notes
-----
The current motion number is calculated from the current program
counter and flags. If a jump to a different motion has occurred
within a motion, the jumped motion number is returned.
"""
rx_len, byte_list = self.move_ram_to_com_command(
RamAddr.ProgramCounterRamAddress.value, 10)
byte_list = self.serial_write(byte_list)[1:]

if not rx_len or len(byte_list) != 10:
return -1

eflfg = sum(byte_list[3:8])
pcunt = byte_list[0] + (byte_list[1] << 8) + (byte_list[2] << 16)
mno = int(((pcunt - RomAddr.MotionRomAddress.value)
/ self.MotionSingleDataCount) + 1)
if eflfg == 0 or pcunt < RomAddr.MotionRomAddress.value:
return 0
if not (0 <= mno <= 120):
return -2
return mno

def suspend_motion(self):
"""Suspends the currently playing motion.

Returns
-------
bool
True if the operation was successful, False otherwise.

Notes
-----
Modifies the config data to suspend the motion.
Direct manipulation of internal config data is involved,
hence the initial configuration needs to be read.
"""
self._config_data &= ~ConfigData.EnableRunEeprom.value
self._config_data &= ~ConfigData.EnableServoResponse.value
self._config_data &= ~ConfigData.EnableReferenceTable.value
self._config_data &= ~ConfigData.EnableSio.value
txbuf = [self._config_data & 0xff, (self._config_data >> 8) & 0xff]
return self.write_move_com_to_ram_command(
RamAddr.ConfigRamAddress.value, txbuf)

def motion_number_to_address(self, motion_num):
"""Calculates the ROM address for the given motion number.

Parameters
----------
motion_num : int
The motion number.

Returns
-------
int
The ROM address of the specified motion,
or -1 if the motion address does not exist.

Notes
-----
The address calculation is based on the motion number.
"""
if 0 < motion_num <= self.MaxMotionCount:
return (motion_num - 1) * self.MotionSingleDataCount \
+ RomAddr.MotionRomAddress.value
else:
return -1

def set_motion_number(self, motion_num):
"""Jumps to the address of the specified motion.

Parameters
----------
motion_num : int
The motion number.

Returns
-------
bool
True if the operation was successful, False otherwise.

Notes
-----
Utilizes the Call command to jump to the ROM address of
the specified motion.
"""
if 0 < motion_num <= self.MaxMotionCount:
rx_size, buf = self.call_command(
self.motion_number_to_address(motion_num))
return self.serial_write(buf)
else:
return False

def reset_program_counter(self):
"""Resets the program counter.

Returns
-------
bool
True if the operation was successful, False otherwise.

Notes
-----
This method effectively changes the program counter to its
initial settings after a reset, including resetting various flags.
"""
buf = [RomAddr.MainLoopCmd.value & 0xff,
(RomAddr.MainLoopCmd.value >> 8) & 0xff] + [0] * 8
return self.write_move_com_to_ram_command(
RamAddr.ProgramCounterRamAddress.value, buf)

def resume_motion(self):
"""Resumes the motion that was suspended.

Returns
-------
bool
True if the operation was successful, False otherwise.

Notes
-----
Modifies the config data to resume the motion.
Direct manipulation of internal config data
(`self._config_data`) is involved.
"""
self._config_data |= ConfigData.EnableRunEeprom.value
self._config_data &= ~ConfigData.EnableServoResponse.value
self._config_data |= ConfigData.EnableReferenceTable.value
self._config_data |= ConfigData.EnableSio.value
buf = [self._config_data & 0xff, (self._config_data >> 8) & 0xff]
return self.write_move_com_to_ram_command(
RamAddr.ConfigRamAddress.value, buf)

def play_motion(self, motion_num):
"""Plays the specified motion.

Parameters
----------
motion_num : int
The motion number.

Returns
-------
bool
True if the motion is successfully played, False otherwise.

Notes
-----
The steps to play a motion include suspending the current motion,
resetting the program counter, jumping to the motion address,
and then resuming motion.
"""
if not (0 < motion_num < self.MaxMotionCount):
return False
if not self.suspend_motion():
return False
if not self.reset_program_counter():
return False
if not self.set_motion_number(motion_num):
return False
return self.resume_motion()


if __name__ == '__main__':
interface = RCB4Interface()
Expand Down
2 changes: 2 additions & 0 deletions ros/kxr_controller/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -114,4 +114,6 @@ if(CATKIN_ENABLE_TESTING)
endif()
add_rostest(test/kxr_controller.test ARGS namespace:="")
add_rostest(test/kxr_controller.test ARGS namespace:="robot_a")
add_rostest(test/kxr_controller.test ARGS namespace:="" use_rcb4:=true)
add_rostest(test/kxr_controller.test ARGS namespace:="robot_a" use_rcb4:=true)
endif()
3 changes: 3 additions & 0 deletions ros/kxr_controller/launch/kxr_controller.launch
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
<arg name="publish_imu" default="true" />
<arg name="imu_frame_id" default="/bodyset94472077639384" />
<arg name="control_loop_rate" default="20" />
<arg name="use_rcb4" default="false" doc="Flag to use RCB4 mini board"/>

<group if="$(eval len(arg('namespace')) > 0)" ns="$(arg namespace)" >
<param name="control_loop_rate" value="$(arg control_loop_rate)" />
Expand All @@ -19,6 +20,7 @@
servo_config_path: $(arg servo_config_path)
publish_imu: $(arg publish_imu)
imu_frame_id: $(arg namespace)/$(arg imu_frame_id)
use_rcb4: $(arg use_rcb4)
</rosparam>
</node>

Expand All @@ -36,6 +38,7 @@
servo_config_path: $(arg servo_config_path)
publish_imu: $(arg publish_imu)
imu_frame_id: $(arg imu_frame_id)
use_rcb4: $(arg use_rcb4)
</rosparam>
</node>

Expand Down
9 changes: 8 additions & 1 deletion ros/kxr_controller/node_scripts/rcb4_ros_bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import yaml

from rcb4.armh7interface import ARMH7Interface
from rcb4.rcb4interface import RCB4Interface


np.set_printoptions(precision=0, suppress=True)
Expand Down Expand Up @@ -129,7 +130,11 @@ def __init__(self):
JointState,
queue_size=1)

self.interface = ARMH7Interface.from_port()
if rospy.get_param('~use_rcb4'):
self.interface = RCB4Interface()
self.interface.auto_open()
else:
self.interface = ARMH7Interface.from_port()
if self.interface is None:
rospy.logerr('Could not open port!')
sys.exit(1)
Expand Down Expand Up @@ -185,6 +190,8 @@ def __init__(self):
clean_namespace)

self.publish_imu = rospy.get_param('~publish_imu', True)
if self.interface.__class__.__name__ == 'RCB4Interface':
self.publish_imu = False
if self.publish_imu:
self.imu_frame_id = rospy.get_param(
'~imu_frame_id', clean_namespace + '/' + r.root_link.name)
Expand Down
Loading
Loading