Skip to content

Commit

Permalink
Merge pull request #32 from iory/rcb4-mini
Browse files Browse the repository at this point in the history
Support test for RCB4 mini board
  • Loading branch information
iory authored Feb 8, 2024
2 parents 0c0a363 + c8daf4c commit 3fd83d3
Show file tree
Hide file tree
Showing 8 changed files with 460 additions and 23 deletions.
304 changes: 304 additions & 0 deletions rcb4/rcb4interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,67 @@ class SubMoveCmd(Enum):
ComToDevice = 0x12


class RamAddr(Enum):
# System configuration RAM address
ConfigRamAddress = 0x0000
# Program counter configuration address
ProgramCounterRamAddress = 0x0002
# Address of the value of ADC converter 0
AdcRamAddress = 0x0022
# PIO input/output configuration
PioModeAddress = 0x0038
# PIO port value
PioAddress = 0x003A
# Address where KRR button data is recorded
KrrButtonDataAddress = 0x0350
# PA1 data (followed by data in increments of 2 bytes)
KrrPa1Address = 0x0352
# (Data continues in increments of 2 bytes)
CounterRamAddress = 0x0457
# (Data continues in increments of 2 bytes)
UserParameterRamAddress = 0x0462


class RomAddr(Enum):
# Address of the command to play the startup motion
StartupCmdRomAddress = 0x0444
# The first address where the main loop starts running
MainLoopCmd = 0x044B
# The starting address where motion data is stored
MotionRomAddress = 0x0b80


class ConfigData(Enum):
EnableSio = 0x0001 # ICS switch
EnableRunEeprom = 0x0002 # EEPROM program execution switch
# Interpolation operation completion message switch
EnableServoResponse = 0x0004
EnableReferenceTable = 0x0008 # Vector jump switch
Frame = 0x0030 # Output cycle register [4:5]
Baudrate = 0x00c0 # COM baud rate register [6:7] (check for conflicts)
ZeroFlag = 0x0100 # Zero flag
CarrayFlag = 0x0200 # Carry flag
ProgramError = 0x0400 # Program error flag
RFU = 0x1800 # Reserved for future use [11:12]
# ICS switch baud rate register [13:14] (check for conflicts)
IcsBaudrate = 0x6000
GreenLED = 0x8000 # LED register (green)


rcb4_dof = 36 # servo 35 + 1


class RCB4Interface(object):

# The number of data bytes per motion
MotionSingleDataCount = 2048

# The maximum number of motions
MaxMotionCount = 120

# The maximum total byte count of motion data
MotionDataCount = 2048 * 120

def __init__(self):
self.lock = Lock()
self.serial = None
Expand Down Expand Up @@ -92,6 +148,7 @@ def open(self, port='/dev/ttyUSB0',
if ack is not True:
return False
self.check_firmware_version()
self._config_data = self.get_config()
self.search_servo_ids()
return True

Expand Down Expand Up @@ -212,6 +269,34 @@ def move_ram_to_com_command(scr_addr, src_data_size):
return_data_size = src_data_size + 3 # Total bytes of received data
return return_data_size, byte_list

def write_move_ram_to_com_command(self, src_addr, src_data_size):
_, byte_list = self.move_ram_to_com_command(src_addr, src_data_size)
return self.serial_write(byte_list)[1:]

@staticmethod
def move_com_to_ram_command(dest_addr, dest_data):
if len(dest_data) > 249:
return 0, []
txbuf = []
txbuf.append((len(dest_data) + 7) & 0xff)
txbuf.append(CommandTypes.Move.value)
txbuf.append(SubMoveCmd.ComToRam.value)
txbuf.append(dest_addr & 0xff)
txbuf.append((dest_addr >> 8) & 0xff)
txbuf.append(0x00)
if len(dest_data) == 1:
txbuf.append(dest_data[0])
else:
for i in range(len(dest_data)):
txbuf.append(dest_data[i])
txbuf.append(rcb4_checksum(txbuf))
return_data_size = 4 # return ACK
return return_data_size, txbuf

def write_move_com_to_ram_command(self, dest_addr, dest_data):
_, byte_list = self.move_com_to_ram_command(dest_addr, dest_data)
return self.serial_write(byte_list)[1:]

def reference_angle_vector(self, servo_ids=None):
if servo_ids is None:
servo_ids = self.search_servo_ids()
Expand Down Expand Up @@ -533,6 +618,225 @@ def servo_states(self):
return servo_on_ids
return []

def get_config(self):
rxbuf = self.write_move_ram_to_com_command(
RamAddr.ConfigRamAddress.value, 2)
if len(rxbuf) != 2:
return 0xFFFF
else:
return rxbuf[1] * 256 + rxbuf[0]

def call_command(self, rom_addr):
"""Creates a command to call a specified ROM address.
This method generates a command array to be sent to a device,
instructing it to perform a jump to the specified ROM address.
This function is static and can be accessed externally without
an instance of the class.
Parameters
----------
rom_addr : int
The target ROM address to call.
Returns
-------
tuple
return_data_size : int
The size of the data expected to be returned by the command.
txbuf : list of int
The list representing the entire command to be sent,
including the command to call the ROM address, the address
itself, and the checksum.
Notes
-----
The command created by this function does not check conditions before
calling the specified address.
Warnings
--------
The function calls the specified address unconditionally.
"""
buf = [0x07, CommandTypes.Call.value, rom_addr & 0xff,
(rom_addr >> 8) & 0xff, (rom_addr >> 16) & 0xff, 0x00]
buf.append(rcb4_checksum(buf))
return 4, buf

def get_motion_play_number(self):
"""Retrieves the number of the motion currently being played.
Returns
-------
int
- The number of the motion currently being played.
- 0 if no motion is being played.
- -1 in case of communication failure.
- -2 if the playback location is abnormal.
Notes
-----
The current motion number is calculated from the current program
counter and flags. If a jump to a different motion has occurred
within a motion, the jumped motion number is returned.
"""
rx_len, byte_list = self.move_ram_to_com_command(
RamAddr.ProgramCounterRamAddress.value, 10)
byte_list = self.serial_write(byte_list)[1:]

if not rx_len or len(byte_list) != 10:
return -1

eflfg = sum(byte_list[3:8])
pcunt = byte_list[0] + (byte_list[1] << 8) + (byte_list[2] << 16)
mno = int(((pcunt - RomAddr.MotionRomAddress.value)
/ self.MotionSingleDataCount) + 1)
if eflfg == 0 or pcunt < RomAddr.MotionRomAddress.value:
return 0
if not (0 <= mno <= 120):
return -2
return mno

def suspend_motion(self):
"""Suspends the currently playing motion.
Returns
-------
bool
True if the operation was successful, False otherwise.
Notes
-----
Modifies the config data to suspend the motion.
Direct manipulation of internal config data is involved,
hence the initial configuration needs to be read.
"""
self._config_data &= ~ConfigData.EnableRunEeprom.value
self._config_data &= ~ConfigData.EnableServoResponse.value
self._config_data &= ~ConfigData.EnableReferenceTable.value
self._config_data &= ~ConfigData.EnableSio.value
txbuf = [self._config_data & 0xff, (self._config_data >> 8) & 0xff]
return self.write_move_com_to_ram_command(
RamAddr.ConfigRamAddress.value, txbuf)

def motion_number_to_address(self, motion_num):
"""Calculates the ROM address for the given motion number.
Parameters
----------
motion_num : int
The motion number.
Returns
-------
int
The ROM address of the specified motion,
or -1 if the motion address does not exist.
Notes
-----
The address calculation is based on the motion number.
"""
if 0 < motion_num <= self.MaxMotionCount:
return (motion_num - 1) * self.MotionSingleDataCount \
+ RomAddr.MotionRomAddress.value
else:
return -1

def set_motion_number(self, motion_num):
"""Jumps to the address of the specified motion.
Parameters
----------
motion_num : int
The motion number.
Returns
-------
bool
True if the operation was successful, False otherwise.
Notes
-----
Utilizes the Call command to jump to the ROM address of
the specified motion.
"""
if 0 < motion_num <= self.MaxMotionCount:
rx_size, buf = self.call_command(
self.motion_number_to_address(motion_num))
return self.serial_write(buf)
else:
return False

def reset_program_counter(self):
"""Resets the program counter.
Returns
-------
bool
True if the operation was successful, False otherwise.
Notes
-----
This method effectively changes the program counter to its
initial settings after a reset, including resetting various flags.
"""
buf = [RomAddr.MainLoopCmd.value & 0xff,
(RomAddr.MainLoopCmd.value >> 8) & 0xff] + [0] * 8
return self.write_move_com_to_ram_command(
RamAddr.ProgramCounterRamAddress.value, buf)

def resume_motion(self):
"""Resumes the motion that was suspended.
Returns
-------
bool
True if the operation was successful, False otherwise.
Notes
-----
Modifies the config data to resume the motion.
Direct manipulation of internal config data
(`self._config_data`) is involved.
"""
self._config_data |= ConfigData.EnableRunEeprom.value
self._config_data &= ~ConfigData.EnableServoResponse.value
self._config_data |= ConfigData.EnableReferenceTable.value
self._config_data |= ConfigData.EnableSio.value
buf = [self._config_data & 0xff, (self._config_data >> 8) & 0xff]
return self.write_move_com_to_ram_command(
RamAddr.ConfigRamAddress.value, buf)

def play_motion(self, motion_num):
"""Plays the specified motion.
Parameters
----------
motion_num : int
The motion number.
Returns
-------
bool
True if the motion is successfully played, False otherwise.
Notes
-----
The steps to play a motion include suspending the current motion,
resetting the program counter, jumping to the motion address,
and then resuming motion.
"""
if not (0 < motion_num < self.MaxMotionCount):
return False
if not self.suspend_motion():
return False
if not self.reset_program_counter():
return False
if not self.set_motion_number(motion_num):
return False
return self.resume_motion()


if __name__ == '__main__':
interface = RCB4Interface()
Expand Down
2 changes: 2 additions & 0 deletions ros/kxr_controller/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -114,4 +114,6 @@ if(CATKIN_ENABLE_TESTING)
endif()
add_rostest(test/kxr_controller.test ARGS namespace:="")
add_rostest(test/kxr_controller.test ARGS namespace:="robot_a")
add_rostest(test/kxr_controller.test ARGS namespace:="" use_rcb4:=true)
add_rostest(test/kxr_controller.test ARGS namespace:="robot_a" use_rcb4:=true)
endif()
3 changes: 3 additions & 0 deletions ros/kxr_controller/launch/kxr_controller.launch
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
<arg name="publish_imu" default="true" />
<arg name="imu_frame_id" default="/bodyset94472077639384" />
<arg name="control_loop_rate" default="20" />
<arg name="use_rcb4" default="false" doc="Flag to use RCB4 mini board"/>

<group if="$(eval len(arg('namespace')) > 0)" ns="$(arg namespace)" >
<param name="control_loop_rate" value="$(arg control_loop_rate)" />
Expand All @@ -19,6 +20,7 @@
servo_config_path: $(arg servo_config_path)
publish_imu: $(arg publish_imu)
imu_frame_id: $(arg namespace)/$(arg imu_frame_id)
use_rcb4: $(arg use_rcb4)
</rosparam>
</node>

Expand All @@ -36,6 +38,7 @@
servo_config_path: $(arg servo_config_path)
publish_imu: $(arg publish_imu)
imu_frame_id: $(arg imu_frame_id)
use_rcb4: $(arg use_rcb4)
</rosparam>
</node>

Expand Down
9 changes: 8 additions & 1 deletion ros/kxr_controller/node_scripts/rcb4_ros_bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import yaml

from rcb4.armh7interface import ARMH7Interface
from rcb4.rcb4interface import RCB4Interface


np.set_printoptions(precision=0, suppress=True)
Expand Down Expand Up @@ -129,7 +130,11 @@ def __init__(self):
JointState,
queue_size=1)

self.interface = ARMH7Interface.from_port()
if rospy.get_param('~use_rcb4'):
self.interface = RCB4Interface()
self.interface.auto_open()
else:
self.interface = ARMH7Interface.from_port()
if self.interface is None:
rospy.logerr('Could not open port!')
sys.exit(1)
Expand Down Expand Up @@ -185,6 +190,8 @@ def __init__(self):
clean_namespace)

self.publish_imu = rospy.get_param('~publish_imu', True)
if self.interface.__class__.__name__ == 'RCB4Interface':
self.publish_imu = False
if self.publish_imu:
self.imu_frame_id = rospy.get_param(
'~imu_frame_id', clean_namespace + '/' + r.root_link.name)
Expand Down
Loading

0 comments on commit 3fd83d3

Please sign in to comment.