From 02839f60ad7d187c00356145698d8aa2b07053d3 Mon Sep 17 00:00:00 2001 From: Iori Yanokura Date: Wed, 7 Feb 2024 23:46:51 +0900 Subject: [PATCH 1/3] Enable motion play --- rcb4/rcb4interface.py | 301 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 301 insertions(+) diff --git a/rcb4/rcb4interface.py b/rcb4/rcb4interface.py index 74a8a07d..21cd7e0b 100644 --- a/rcb4/rcb4interface.py +++ b/rcb4/rcb4interface.py @@ -41,11 +41,68 @@ class SubMoveCmd(Enum): ComToDevice = 0x12 +class RamAddr(Enum): + # System configuration RAM address + ConfigRamAddress = 0x0000 + # Program counter configuration address + ProgramCounterRamAddress = 0x0002 + # Address of the value of ADC converter 0 + AdcRamAddress = 0x0022 + # PIO input/output configuration + PioModeAddress = 0x0038 + # PIO port value + PioAddress = 0x003A + # Address where KRR button data is recorded + KrrButtonDataAddress = 0x0350 + # PA1 data (followed by data in increments of 2 bytes) + KrrPa1Address = 0x0352 + # (Data continues in increments of 2 bytes) + CounterRamAddress = 0x0457 + # (Data continues in increments of 2 bytes) + UserParameterRamAddress = 0x0462 + + +class RomAddr(Enum): + # Address of the command to play the startup motion + StartupCmdRomAddress = 0x0444 + # The first address where the main loop starts running + MainLoopCmd = 0x044B + # The starting address where motion data is stored + MotionRomAddress = 0x0b80 + + +class ConfigData(Enum): + EnableSio = 0x0001 # ICS switch + EnableRunEeprom = 0x0002 # EEPROM program execution switch + # Interpolation operation completion message switch + EnableServoResponse = 0x0004 + EnableReferenceTable = 0x0008 # Vector jump switch + Frame = 0x0030 # Output cycle register [4:5] + Baudrate = 0x00c0 # COM baud rate register [6:7] (check for conflicts) + ZeroFlag = 0x0100 # Zero flag + CarrayFlag = 0x0200 # Carry flag + ProgramError = 0x0400 # Program error flag + RFU = 0x1800 # Reserved for future use [11:12] + # ICS switch baud rate register [13:14] (check for conflicts) + IcsBaudrate = 0x6000 + GreenLED = 0x8000 # LED register (green) + + rcb4_dof = 36 # servo 35 + 1 class RCB4Interface(object): + # The number of data bytes per motion + MotionSingleDataCount = 2048 + + # The maximum number of motions + MaxMotionCount = 120 + + # The maximum total byte count of motion data + MotionDataCount = 2048 * 120 + + def __init__(self): self.lock = Lock() self.serial = None @@ -92,6 +149,7 @@ def open(self, port='/dev/ttyUSB0', if ack is not True: return False self.check_firmware_version() + self._config_data = self.get_config() self.search_servo_ids() return True @@ -212,6 +270,34 @@ def move_ram_to_com_command(scr_addr, src_data_size): return_data_size = src_data_size + 3 # Total bytes of received data return return_data_size, byte_list + def write_move_ram_to_com_command(self, src_addr, src_data_size): + _, byte_list = self.move_ram_to_com_command(src_addr, src_data_size) + return self.serial_write(byte_list)[1:] + + @staticmethod + def move_com_to_ram_command(dest_addr, dest_data): + if len(dest_data) > 249: + return 0, [] + txbuf = [] + txbuf.append((len(dest_data) + 7) & 0xff) + txbuf.append(CommandTypes.Move.value) + txbuf.append(SubMoveCmd.ComToRam.value) + txbuf.append(dest_addr & 0xff) + txbuf.append((dest_addr >> 8 ) & 0xff) + txbuf.append(0x00) + if len(dest_data) == 1: + txbuf.append(dest_data[0]) + else: + for i in range(len(dest_data)): + txbuf.append(dest_data[i]) + txbuf.append(rcb4_checksum(txbuf)) + return_data_size = 4 # return ACK + return return_data_size, txbuf + + def write_move_com_to_ram_command(self, dest_addr, dest_data): + _, byte_list = self.move_com_to_ram_command(dest_addr, dest_data) + return self.serial_write(byte_list)[1:] + def reference_angle_vector(self, servo_ids=None): if servo_ids is None: servo_ids = self.search_servo_ids() @@ -533,6 +619,221 @@ def servo_states(self): return servo_on_ids return [] + def get_config(self): + rxbuf = self.write_move_ram_to_com_command( + RamAddr.ConfigRamAddress.value, 2) + if len(rxbuf) != 2: + return 0xFFFF + else: + return rxbuf[1] * 256 + rxbuf[0] + + def call_command(self, rom_addr): + """Creates a command to call a specified ROM address. + + This method generates a command array to be sent to a device, instructing it + to perform a jump to the specified ROM address. This function is static and + can be accessed externally without an instance of the class. + + Parameters + ---------- + rom_addr : int + The target ROM address to call. + + Returns + ------- + tuple + return_data_size : int + The size of the data expected to be returned by the command. + txbuf : list of int + The list representing the entire command to be sent, including the + command to call the ROM address, the address itself, and the checksum. + + Notes + ----- + The command created by this function does not check conditions before calling + the specified address. + + Warnings + -------- + The function calls the specified address unconditionally. + """ + buf = [0x07, CommandTypes.Call.value, rom_addr & 0xff, + (rom_addr >> 8) & 0xff, (rom_addr >> 16) & 0xff, 0x00] + buf.append(rcb4_checksum(buf)) + return 4, buf + + def get_motion_play_number(self): + """Retrieves the number of the motion currently being played. + + Returns + ------- + int + - The number of the motion currently being played. + - 0 if no motion is being played. + - -1 in case of communication failure. + - -2 if the playback location is abnormal. + + Notes + ----- + The current motion number is calculated from the current program + counter and flags. If a jump to a different motion has occurred + within a motion, the jumped motion number is returned. + """ + rx_len, byte_list = self.move_ram_to_com_command( + RamAddr.ProgramCounterRamAddress.value, 10) + byte_list = self.serial_write(byte_list)[1:] + + if not rx_len or len(byte_list) != 10: + return -1 + + eflfg = sum(byte_list[3:8]) + pcunt = byte_list[0] + (byte_list[1] << 8) + (byte_list[2] << 16) + mno = int(((pcunt - RomAddr.MotionRomAddress.value) + / self.MotionSingleDataCount) + 1) + if eflfg == 0 or pcunt < RomAddr.MotionRomAddress.value: + return 0 + if not (0 <= mno <= 120): + return -2 + return mno + + def suspend_motion(self): + """Suspends the currently playing motion. + + Returns + ------- + bool + True if the operation was successful, False otherwise. + + Notes + ----- + Modifies the config data to suspend the motion. + Direct manipulation of internal config data is involved, + hence the initial configuration needs to be read. + """ + self._config_data &= ~ConfigData.EnableRunEeprom.value + self._config_data &= ~ConfigData.EnableServoResponse.value + self._config_data &= ~ConfigData.EnableReferenceTable.value + self._config_data &= ~ConfigData.EnableSio.value + txbuf = [self._config_data & 0xff, (self._config_data >> 8) & 0xff] + return self.write_move_com_to_ram_command( + RamAddr.ConfigRamAddress.value, txbuf) + + def motion_number_to_address(self, motion_num): + """Calculates the ROM address for the given motion number. + + Parameters + ---------- + motion_num : int + The motion number. + + Returns + ------- + int + The ROM address of the specified motion, + or -1 if the motion address does not exist. + + Notes + ----- + The address calculation is based on the motion number. + """ + if 0 < motion_num <= self.MaxMotionCount: + return (motion_num - 1) * self.MotionSingleDataCount \ + + RomAddr.MotionRomAddress.value + else: + return -1 + + def set_motion_number(self, motion_num): + """Jumps to the address of the specified motion. + + Parameters + ---------- + motion_num : int + The motion number. + + Returns + ------- + bool + True if the operation was successful, False otherwise. + + Notes + ----- + Utilizes the Call command to jump to the ROM address of + the specified motion. + """ + if 0 < motion_num <= self.MaxMotionCount: + rx_size, buf = self.call_command( + self.motion_number_to_address(motion_num)) + return self.serial_write(buf) + else: + return False + + def reset_program_counter(self): + """Resets the program counter. + + Returns + ------- + bool + True if the operation was successful, False otherwise. + + Notes + ----- + This method effectively changes the program counter to its + initial settings after a reset, including resetting various flags. + """ + buf = [RomAddr.MainLoopCmd.value & 0xff, + (RomAddr.MainLoopCmd.value >> 8) & 0xff] + [0] * 8 + return self.write_move_com_to_ram_command( + RamAddr.ProgramCounterRamAddress.value, buf) + + def resume_motion(self): + """Resumes the motion that was suspended. + + Returns + ------- + bool + True if the operation was successful, False otherwise. + + Notes + ----- + Modifies the config data to resume the motion. + Direct manipulation of internal config data (`self._config_data`) is involved. + """ + self._config_data |= ConfigData.EnableRunEeprom.value + self._config_data &= ~ConfigData.EnableServoResponse.value + self._config_data |= ConfigData.EnableReferenceTable.value + self._config_data |= ConfigData.EnableSio.value + buf = [self._config_data & 0xff, (self._config_data >> 8) & 0xff] + return self.write_move_com_to_ram_command(RamAddr.ConfigRamAddress.value, buf) + + def play_motion(self, motion_num): + """Plays the specified motion. + + Parameters + ---------- + motion_num : int + The motion number. + + Returns + ------- + bool + True if the motion is successfully played, False otherwise. + + Notes + ----- + The steps to play a motion include suspending the current motion, + resetting the program counter, jumping to the motion address, + and then resuming motion. + """ + if not (0 < motion_num < self.MaxMotionCount): + return False + if not self.suspend_motion(): + return False + if not self.reset_program_counter(): + return False + if not self.set_motion_number(motion_num): + return False + return self.resume_motion() + if __name__ == '__main__': interface = RCB4Interface() From 92910e228ca5e13d4a920265dbcd317db08144fb Mon Sep 17 00:00:00 2001 From: Iori Yanokura Date: Thu, 8 Feb 2024 16:18:48 +0900 Subject: [PATCH 2/3] Loose test condition --- ros/kxr_controller/test/test_kxr_controller.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/ros/kxr_controller/test/test_kxr_controller.py b/ros/kxr_controller/test/test_kxr_controller.py index d2d6be5a..b940fd64 100644 --- a/ros/kxr_controller/test/test_kxr_controller.py +++ b/ros/kxr_controller/test/test_kxr_controller.py @@ -39,9 +39,13 @@ def test_follow_joint_trajectory(self): self.ri.wait_interpolation() rospy.sleep(1.0) current_angles = self.ri.angle_vector() - testing.assert_array_almost_equal( - np.rad2deg(current_angles), [45, 45], - decimal=0) + expected_angles = 45 * np.ones(len(current_angles)) + self.assertTrue( + np.all(np.abs(np.rad2deg(current_angles) + - expected_angles) <= 2.0), + f"The difference in angles exceeds 2 degrees: " + f"Current angles = {current_angles}, " + f"Expected angles = {expected_angles}") self.ri.angle_vector(self.robot_model.init_pose(), 1.0) self.ri.wait_interpolation() self.ri.servo_off() From c8daf4c22f06f967392e2f709ab28294b8e70c55 Mon Sep 17 00:00:00 2001 From: Iori Yanokura Date: Thu, 8 Feb 2024 16:19:06 +0900 Subject: [PATCH 3/3] Add rcb4 mini board test --- rcb4/rcb4interface.py | 25 ++--- ros/kxr_controller/CMakeLists.txt | 2 + .../launch/kxr_controller.launch | 3 + .../node_scripts/rcb4_ros_bridge.py | 9 +- ros/kxr_controller/test/kxr_controller.test | 55 +++++++---- .../test/test_kxr_controller.py | 1 - tests/rcb4_tests/test_armh7interface.py | 7 +- tests/rcb4_tests/test_rcb4interface.py | 92 +++++++++++++++++++ 8 files changed, 163 insertions(+), 31 deletions(-) create mode 100644 tests/rcb4_tests/test_rcb4interface.py diff --git a/rcb4/rcb4interface.py b/rcb4/rcb4interface.py index 21cd7e0b..3bbe6793 100644 --- a/rcb4/rcb4interface.py +++ b/rcb4/rcb4interface.py @@ -102,7 +102,6 @@ class RCB4Interface(object): # The maximum total byte count of motion data MotionDataCount = 2048 * 120 - def __init__(self): self.lock = Lock() self.serial = None @@ -283,7 +282,7 @@ def move_com_to_ram_command(dest_addr, dest_data): txbuf.append(CommandTypes.Move.value) txbuf.append(SubMoveCmd.ComToRam.value) txbuf.append(dest_addr & 0xff) - txbuf.append((dest_addr >> 8 ) & 0xff) + txbuf.append((dest_addr >> 8) & 0xff) txbuf.append(0x00) if len(dest_data) == 1: txbuf.append(dest_data[0]) @@ -630,9 +629,10 @@ def get_config(self): def call_command(self, rom_addr): """Creates a command to call a specified ROM address. - This method generates a command array to be sent to a device, instructing it - to perform a jump to the specified ROM address. This function is static and - can be accessed externally without an instance of the class. + This method generates a command array to be sent to a device, + instructing it to perform a jump to the specified ROM address. + This function is static and can be accessed externally without + an instance of the class. Parameters ---------- @@ -645,13 +645,14 @@ def call_command(self, rom_addr): return_data_size : int The size of the data expected to be returned by the command. txbuf : list of int - The list representing the entire command to be sent, including the - command to call the ROM address, the address itself, and the checksum. + The list representing the entire command to be sent, + including the command to call the ROM address, the address + itself, and the checksum. Notes ----- - The command created by this function does not check conditions before calling - the specified address. + The command created by this function does not check conditions before + calling the specified address. Warnings -------- @@ -796,14 +797,16 @@ def resume_motion(self): Notes ----- Modifies the config data to resume the motion. - Direct manipulation of internal config data (`self._config_data`) is involved. + Direct manipulation of internal config data + (`self._config_data`) is involved. """ self._config_data |= ConfigData.EnableRunEeprom.value self._config_data &= ~ConfigData.EnableServoResponse.value self._config_data |= ConfigData.EnableReferenceTable.value self._config_data |= ConfigData.EnableSio.value buf = [self._config_data & 0xff, (self._config_data >> 8) & 0xff] - return self.write_move_com_to_ram_command(RamAddr.ConfigRamAddress.value, buf) + return self.write_move_com_to_ram_command( + RamAddr.ConfigRamAddress.value, buf) def play_motion(self, motion_num): """Plays the specified motion. diff --git a/ros/kxr_controller/CMakeLists.txt b/ros/kxr_controller/CMakeLists.txt index 778f6ef6..eb981805 100644 --- a/ros/kxr_controller/CMakeLists.txt +++ b/ros/kxr_controller/CMakeLists.txt @@ -114,4 +114,6 @@ if(CATKIN_ENABLE_TESTING) endif() add_rostest(test/kxr_controller.test ARGS namespace:="") add_rostest(test/kxr_controller.test ARGS namespace:="robot_a") + add_rostest(test/kxr_controller.test ARGS namespace:="" use_rcb4:=true) + add_rostest(test/kxr_controller.test ARGS namespace:="robot_a" use_rcb4:=true) endif() diff --git a/ros/kxr_controller/launch/kxr_controller.launch b/ros/kxr_controller/launch/kxr_controller.launch index 0aa8d751..4ad02281 100644 --- a/ros/kxr_controller/launch/kxr_controller.launch +++ b/ros/kxr_controller/launch/kxr_controller.launch @@ -6,6 +6,7 @@ + @@ -19,6 +20,7 @@ servo_config_path: $(arg servo_config_path) publish_imu: $(arg publish_imu) imu_frame_id: $(arg namespace)/$(arg imu_frame_id) + use_rcb4: $(arg use_rcb4) @@ -36,6 +38,7 @@ servo_config_path: $(arg servo_config_path) publish_imu: $(arg publish_imu) imu_frame_id: $(arg imu_frame_id) + use_rcb4: $(arg use_rcb4) diff --git a/ros/kxr_controller/node_scripts/rcb4_ros_bridge.py b/ros/kxr_controller/node_scripts/rcb4_ros_bridge.py index d831d8f9..08225ba1 100644 --- a/ros/kxr_controller/node_scripts/rcb4_ros_bridge.py +++ b/ros/kxr_controller/node_scripts/rcb4_ros_bridge.py @@ -18,6 +18,7 @@ import yaml from rcb4.armh7interface import ARMH7Interface +from rcb4.rcb4interface import RCB4Interface np.set_printoptions(precision=0, suppress=True) @@ -129,7 +130,11 @@ def __init__(self): JointState, queue_size=1) - self.interface = ARMH7Interface.from_port() + if rospy.get_param('~use_rcb4'): + self.interface = RCB4Interface() + self.interface.auto_open() + else: + self.interface = ARMH7Interface.from_port() if self.interface is None: rospy.logerr('Could not open port!') sys.exit(1) @@ -185,6 +190,8 @@ def __init__(self): clean_namespace) self.publish_imu = rospy.get_param('~publish_imu', True) + if self.interface.__class__.__name__ == 'RCB4Interface': + self.publish_imu = False if self.publish_imu: self.imu_frame_id = rospy.get_param( '~imu_frame_id', clean_namespace + '/' + r.root_link.name) diff --git a/ros/kxr_controller/test/kxr_controller.test b/ros/kxr_controller/test/kxr_controller.test index 29209365..88971b74 100644 --- a/ros/kxr_controller/test/kxr_controller.test +++ b/ros/kxr_controller/test/kxr_controller.test @@ -1,11 +1,13 @@ + + @@ -13,23 +15,34 @@ name="test_ros_topics" pkg="jsk_tools" type="test_topic_published.py" retry="1" > - - topic_0: current_joint_states - timeout_0: 30 - topic_1: joint_states - timeout_1: 30 - topic_2: kxr_fullbody_controller/follow_joint_trajectory/status - timeout_2: 30 - topic_3: kxr_fullbody_controller/servo_on_off_real_interface/status - timeout_3: 30 - topic_4: kxr_fullbody_controller/servo_on_off/status - timeout_4: 30 - topic_5: imu - timeout_5: 30 - + + topic_0: current_joint_states + timeout_0: 30 + topic_1: joint_states + timeout_1: 30 + topic_2: kxr_fullbody_controller/follow_joint_trajectory/status + timeout_2: 30 + topic_3: kxr_fullbody_controller/servo_on_off_real_interface/status + timeout_3: 30 + topic_4: kxr_fullbody_controller/servo_on_off/status + timeout_4: 30 + + + + + + topic_0: imu + timeout_0: 30 + + + + + + + + + topic_0: imu + timeout_0: 30 + + + 2.0): + raise AssertionError( + "Servo angles are beyond the acceptable range.") + + time.sleep(1.0) + self.interface.neutral() + time.sleep(4.0) + self.interface.free()