diff --git a/README.md b/README.md index 01343f1c..88999daf 100644 --- a/README.md +++ b/README.md @@ -13,6 +13,14 @@ Connect st-link and run the following command. rcb4-write-firmware ``` +## Change Servo ID + +You can use ics-manager command to change servo id. + +```bash +ics-manager +``` + ## Contributing ### Automatic Formatting diff --git a/rcb4/apps/ics_manager.py b/rcb4/apps/ics_manager.py new file mode 100644 index 00000000..b1c6709c --- /dev/null +++ b/rcb4/apps/ics_manager.py @@ -0,0 +1,78 @@ +#!/usr/bin/env python + +import argparse +import sys + +import yaml + +from rcb4.ics import ICSServoController + + +def load_yaml(yaml_path): + try: + with open(yaml_path) as file: + data = yaml.safe_load(file) + print("YAML configuration loaded successfully.") + return data + except FileNotFoundError: + print(f"Error: YAML file not found at path: {yaml_path}") + sys.exit(1) + except yaml.YAMLError as exc: + print(f"Error parsing YAML file: {exc}") + sys.exit(1) + + +def parse_args(): + """Parse command-line arguments.""" + parser = argparse.ArgumentParser(description="ICS Servo Controller CLI Tool") + parser.add_argument( + "--yaml-path", + default=None, + help="Path to YAML configuration file for servo settings" + ) + parser.add_argument( + "--baudrate", + type=int, + default=1250000, + choices=[1250000, 625000, 115200], + help="Baud rate for servo connection" + ) + parser.add_argument( + "--verbose", + action="store_true", + help="Enable verbose output" + ) + return parser.parse_args() + + +def main(): + args = parse_args() + + # Verbose output if enabled + if args.verbose: + print(f"Using baud rate: {args.baudrate}") + if args.yaml_path: + print(f"Loading configuration from: {args.yaml_path}") + else: + print("No YAML configuration file specified.") + + if args.yaml_path: + load_yaml(args.yaml_path) + + # Initialize the ICS Servo Controller + servo_controller = ICSServoController( + baudrate=args.baudrate, + yaml_path=args.yaml_path + ) + + try: + servo_controller.display_status() + except Exception as e: + print(f"An error occurred while displaying status: {e}") + finally: + servo_controller.close_connection() + print("Connection closed.") + + +if __name__ == "__main__": + main() diff --git a/rcb4/armh7interface.py b/rcb4/armh7interface.py index f8dee192..2fc148f7 100644 --- a/rcb4/armh7interface.py +++ b/rcb4/armh7interface.py @@ -177,9 +177,12 @@ def open(self, port="/dev/ttyUSB1", baudrate=1000000, timeout=0.01): except serial.SerialException as e: print(f"Error opening serial port: {e}") raise serial.SerialException(e) - ack = self.check_ack() - if ack is not True: - return False + # After powering on, the ACK value becomes unstable for some reason, + # so the process is repeated several times. + for _ in range(10): + ack = self.check_ack() + if ack is True: + break self.check_firmware_version() self.copy_worm_params_from_flash() self.search_worm_ids() diff --git a/rcb4/ics.py b/rcb4/ics.py new file mode 100644 index 00000000..1a1b2901 --- /dev/null +++ b/rcb4/ics.py @@ -0,0 +1,653 @@ +import sys +import threading +import time + +from colorama import Fore +from colorama import Style +import readchar +import serial +import serial.tools.list_ports +import yaml + +degree_to_pulse = 29.633 + + +class KeyListener(threading.Thread): + def __init__(self): + super().__init__() + self.key = None + self.lock = threading.Lock() + self.running = True # Add a running flag + + def run(self): + """Continuously read keys and store the latest key in self.key while running is True.""" + while self.running: + try: + key = readchar.readkey() + if key == 'q': + with self.lock: + self.key = key + break + except KeyboardInterrupt: + self.stop() + break + with self.lock: + self.key = key + + def get_key(self): + """Return the latest key and reset it.""" + with self.lock: + key = self.key + self.key = None + return key + + def stop(self): + """Stop the listener thread.""" + self.running = False + + +def load_and_process_yaml(yaml_path): + with open(yaml_path) as f: + joint_data = yaml.safe_load(f) + servo_candidates = [] + servo_id_to_rotation = {} + + for _joint_name, properties in joint_data.get("joint_name_to_servo_id", {}).items(): + candidate_servo_id = properties["id"] // 2 + rotation = False + if properties.get("type") == "continuous": + rotation = True + servo_id_to_rotation[candidate_servo_id] = rotation + servo_candidates.append(candidate_servo_id) + servo_candidates = sorted(set(servo_candidates)) + return servo_candidates, servo_id_to_rotation + + +def format_baud(baud): + """Convert baud rate to a more readable format.""" + if baud >= 1_000_000: + return f"{baud / 1_000_000:.2f}M" + elif baud >= 1_000: + return f"{baud / 1_000:.0f}k" + else: + return str(baud) + + +class ICSServoController: + def __init__(self, baudrate=1250000, yaml_path=None): + if baudrate not in [1250000, 625000, 115200]: + print(f";; baud={baudrate} is wrong.") + print(";; baud should be one of 1250000, 625000, 115200") + print("Use default baudrate 1250000") + baudrate = 1250000 + if yaml_path is not None: + self.servo_candidates, self.servo_id_to_rotation = load_and_process_yaml( + yaml_path + ) + else: + self.servo_id_to_rotation = None + self.servo_candidates = list(range(18)) + self.servo_id_index = 0 + self.servo_id = 0 + self.selected_index = 0 + self.baudrate = baudrate + self.timeout = 0.1 + self.ics = None + self.servo_eeprom_params64 = [ + ("fix-header", [1, 2]), + ("stretch-gain", [3, 4]), + ("speed", [5, 6]), + ("punch", [7, 8]), + ("dead-band", [9, 10]), + ("dumping", [11, 12]), + ("safe-timer", [13, 14]), + ("mode-flag-b7slave-b4rotation-b3pwm-b1free-b0reverse", [15, 16]), + ("pulse-max-limit", [17, 18, 19, 20]), + ("pulse-min-limit", [21, 22, 23, 24]), + ("fix-dont-change-25", [25, 26]), + ("ics-baud-rate-10-115200-00-1250000", [27, 28]), + ("temperature-limit", [29, 30]), + ("current-limit", [31, 32]), + ("fix-dont-change-33", [33, 34]), + ("fix-dont-change-35", [35, 36]), + ("fix-dont-change-37", [37, 38]), + ("fix-dont-change-39", [39, 40]), + ("fix-dont-change-41", [41, 42]), + ("fix-dont-change-43", [43, 44]), + ("fix-dont-change-45", [45, 46]), + ("fix-dont-change-47", [47, 48]), + ("fix-dont-change-49", [49, 50]), + ("response", [51, 52]), + ("user-offset", [53, 54]), + ("fix-dont-change-55", [55, 56]), + ("servo-id", [57, 58]), + ("stretch-1", [59, 60]), + ("stretch-2", [61, 62]), + ("stretch-3", [63, 64]), + ] + + def open_connection(self): + ports = serial.tools.list_ports.comports() + if len(ports) == 0: + print(f"{Fore.RED}No USB Found.{Style.RESET_ALL}") + print(f"{Fore.RED}May Dual USB Adapter is wrong.{Style.RESET_ALL}") + for p in ports: + if p.vid == 0x165C and p.pid == 0x08: + for baudrate in [115200, 625000, 1250000]: + try: + self.ics = serial.Serial( + f"/dev/{p.name}", + baudrate, + timeout=self.timeout, + parity=serial.PARITY_EVEN, + ) + current_baudrate = self.baud() + if current_baudrate != self.baudrate: + self.baud(self.baudrate) + return True + except IndexError: + continue + return False + + def set_default_eeprom_param(self): + self.set_param( + [5, 10, 11, 4, 7, 15, 0, 0, 0, 2, 1, 14, + 15, 10, 0, 6, 2, 12, 14, 12, 0, 13, 10, + 12, 0, 0, 0, 0, 0, 10, 1, 14, 1, 2, 9, 8, + 14, 13, 9, 13, 6, 14, 9, 11, 10, 12, 9, 5, + 0, 14, 0, 1, 0, 0, 13, 2, 0, 0, 3, 12, 7, 8, 15, 14],) + + def read_baud(self): + _, result = self.read_param() + return result["baud"] + + def baud(self, baud=None, servo_id=None): + if baud is None: + return self.read_baud() + if baud not in [1250000, 625000, 115200]: + print(f";; baud={baud} is wrong.") + print(";; baud should be one of 1250000, 625000, 115200") + return None + + if servo_id is None: + servo_id = self.get_servo_id() + + ics_param64, _ = self.read_param() + if baud == 1250000: + ics_param64[27] = 0 + elif baud == 625000: + ics_param64[27] = 1 + elif baud == 115200: + ics_param64[27] = 10 + + # Send updated parameters to the device + self.set_param(ics_param64, servo_id) + # Re-open the connection with the updated baud rate + self.open_connection() + return self.read_baud() + + def get_servo_id(self): + self.ics.write(bytes([0xFF, 0x00, 0x00, 0x00])) + time.sleep(0.1) + ret = self.ics.read(5) + servo_id = ret[4] & 0x1F + return servo_id + + def set_servo_id(self, servo_id): + self.ics.write(bytes([0xE0 | (0x1F & servo_id), 0x01, 0x01, 0x01])) + time.sleep(0.1) + ret = self.ics.read(5) + return 0x1F & ret[4] + + def set_response(self, value, servo_id=None): + """ + Set the response parameter to the specified value. + """ + if servo_id is None: + servo_id = self.get_servo_id() + + # Read the current parameters + ics_param64, _ = self.read_param(servo_id=servo_id) + + # Update the 'response' field + indices = [51, 52] # Indices for the 'response' parameter + ics_param64[indices[0] - 1] = (value >> 4) & 0x0F # High 4 bits + ics_param64[indices[1] - 1] = value & 0x0F # Low 4 bits + + # Write back the updated parameters + self.set_param(ics_param64, servo_id=servo_id) + + # Confirm the change + _, result = self.read_param(servo_id=servo_id) + print(f"Response set to {result['response']}") + + def reset_servo_position(self): + self.set_angle(7500) + print(f"{Fore.YELLOW}Servo reset to zero position.{Fore.RESET}") + + def toggle_rotation_mode(self): + rotation_mode = self.read_rotation() + self.set_rotation(not rotation_mode) + rotation_mode = self.read_rotation() + mode_text = "Enabled" if rotation_mode else "Disabled" + print(f"{Fore.CYAN}Rotation mode set to {mode_text}{Fore.RESET}") + + def set_free_mode(self): + free_mode = self.read_free() + self.set_free(not free_mode) + time.sleep(0.1) + free_mode = self.read_free() + mode_text = "Enabled" if free_mode else "Disabled" + print(f"{Fore.MAGENTA}Free mode set to {mode_text}{Fore.RESET}") + + def increase_angle(self): + angle = self.read_angle() + angle = min(11500, angle + degree_to_pulse * 15) + self.set_angle(angle) + print(f"{Fore.BLUE}Angle increased to {angle}{Fore.RESET}") + + def decrease_angle(self): + angle = self.read_angle() + angle = max(3500, angle - degree_to_pulse * 15) + self.set_angle(angle) + print(f"{Fore.RED}Angle decreased to {angle}{Fore.RESET}") + + def parse_param64_key_value(self, v): + alist = {} + for param in self.servo_eeprom_params64: + param_name, indices = param[0], param[1] + alist[param_name] = self._4bit2num(indices, v) + + baud_value = alist.get("ics-baud-rate-10-115200-00-1250000", 0) + baud_rate = {10: 115200, 1: 625000, 0: 1250000}.get(baud_value, None) + mode_flag_value = alist.get( + "mode-flag-b7slave-b4rotation-b3pwm-b1free-b0reverse", 0 + ) + alist.update(self.ics_flag_dict(mode_flag_value)) + alist.update({"servo-id": alist.get("servo-id", 0), "baud": baud_rate}) + return alist + + def _4bit2num(self, lst, v): + sum_val = 0 + for i in lst: + sum_val = (sum_val << 4) + (v[i - 1] & 0x0F) + return sum_val + + def ics_flag_dict(self, v): + return { + "slave": (v & 0xF0) >> 4 & 0x8 == 0x8, + "rotation": (v & 0xF0) >> 4 & 0x1 == 0x1, + "free": v & 0xF & 0x2 == 0x2, + "reverse": v & 0xF & 0x1 == 0x1, + "serial": v & 0xF & 0x8 == 0x8, + } + + def set_flag(self, flag_name, value, servo_id=None): + if servo_id is None: + servo_id = self.get_servo_id() + ics_param64, _ = self.read_param(servo_id=servo_id) + + # Calculate byte and bit for manipulation + byte_idx = 14 if flag_name in ["slave", "rotation"] else 15 + bit_position = 3 if flag_name in ["slave", "serial"] else 0 + if flag_name == 'b2': + bit_position = 2 + mask = 1 << bit_position + + # Set or clear the bit based on the `value` argument + if value: + ics_param64[byte_idx] |= mask # Set the bit + else: + ics_param64[byte_idx] &= ~mask + # Clear the bit + # Set updated parameters to the servo + self.set_param(ics_param64, servo_id=servo_id) + + def set_slave(self, slave=None, servo_id=None): + return self.set_flag("slave", slave, servo_id=servo_id) + + def set_rotation(self, rotation=None, servo_id=None): + return self.set_flag("rotation", rotation, servo_id=servo_id) + + def set_stretch(self, stretch_values, servo_id=None): + if servo_id is None: + servo_id = self.get_servo_id() + + # Read the current parameters + ics_param64, _ = self.read_param(servo_id=servo_id) + + # Update the 'stretch-1', 'stretch-2', 'stretch-3' fields + for stretch_key, value in stretch_values.items(): + if stretch_key == "stretch-1": + indices = [59, 60] + elif stretch_key == "stretch-2": + indices = [61, 62] + elif stretch_key == "stretch-3": + indices = [63, 64] + else: + raise ValueError(f"Unsupported stretch parameter: {stretch_key}") + + # Split the value into two 4-bit chunks and store them in the corresponding indices + ics_param64[indices[0] - 1] = (value >> 4) & 0x0F # High 4 bits + ics_param64[indices[1] - 1] = value & 0x0F # Low 4 bits + + # Write back the updated parameters + self.set_param(ics_param64, servo_id=servo_id) + + # Confirm the change + _, result = self.read_param(servo_id=servo_id) + print(f"Stretch parameters set to: {stretch_values}") + + def set_serial(self, serial=None, servo_id=None): + return self.set_flag("serial", serial, servo_id=servo_id) + + def set_reverse(self, reverse=None, servo_id=None): + return self.set_flag("reverse", reverse, servo_id=servo_id) + + def set_free(self, free, servo_id=None): + if servo_id is None: + servo_id = self.get_servo_id() + + ics_param64, _ = self.read_param() + if free is None or free == 0: + ics_param64[15] = ics_param64[15] & 0xD + else: + ics_param64[15] = ics_param64[15] | 0x2 + self.set_param(ics_param64, servo_id) + if servo_id != self.read_param()[1].get("servo-id"): + return self.read_free(servo_id) + + def read_free(self, servo_id=None): + if servo_id is None: + servo_id = self.get_servo_id() + _, result = self.read_param() + return result.get("free", None) + + def read_rotation(self, servo_id=None): + _, result = self.read_param(servo_id=servo_id) + return result["rotation"] + + def set_param(self, ics_param64, servo_id=None): + if servo_id is None: + servo_id = self.get_servo_id() + self.ics.write(bytes([0xC0 | servo_id, 0x00] + ics_param64)) + time.sleep(0.5) + ret = self.ics.read(68) + ret_ics_param64 = ret[4:] + self.parse_param64_key_value(ret_ics_param64) + + def close_connection(self): + if self.ics and self.ics.is_open: + self.ics.close() + self.ics = None + + def display_status(self): + options = [ + "Current Servo ID", + "Angle", + "Baud Rate", + "Rotation Mode", + "Slave Mode", + "Reverse Mode", + "Serial Mode", + "Free", + ] + selectable_options = ["Current Servo ID", "Angle"] + + key_listener = KeyListener() + key_listener.daemon = True + key_listener.start() + try: + use_previous_result = False + previous_servo_id = None + while key_listener.running: + if not self.ics or not self.ics.is_open: + # Clear the previous output at the start of each loop + sys.stdout.write("\033[H\033[J") + sys.stdout.flush() + print(f"{Fore.RED}Connection is not open.{Style.RESET_ALL}") + print(f"{Fore.RED}Please check the following:") + print("1. Use Dual USB Adapter (https://kondo-robot.com/product/02116)") + print("2. Set the device to ICS mode.") + print(f"3. Connect only one servo correctly.{Style.RESET_ALL}") + + print( + f"{Fore.RED}To establish the connection, please execute the following commands in Linux:{Style.RESET_ALL}" + ) + print() + print(f"{Fore.RED} $ sudo su{Style.RESET_ALL}") + print(f"{Fore.RED} modprobe ftdi-sio{Style.RESET_ALL}") + print( + f"{Fore.RED} echo 165C 0008 > /sys/bus/usb-serial/drivers/ftdi_sio/new_id{Style.RESET_ALL}" + ) + print(f"{Fore.RED} exit{Style.RESET_ALL}") + try: + ret = self.open_connection() + except Exception: + continue + if ret is False: + time.sleep(1.0) + continue + # Print servo status + try: + servo_id = self.get_servo_id() + if servo_id != previous_servo_id: + use_previous_result = False + previous_servo_id = servo_id + if use_previous_result is False: + _, result = self.read_param() + print('======================================') + sys.stdout.write("\033[H\033[J") + sys.stdout.flush() + print("--- Servo Status ---") + for i, option in enumerate(options): + if i == self.selected_index: + print( + f">> {option}: {self.get_status(option, result, selected=True)}" + ) + else: + print(f" {option}: {self.get_status(option, result, selected=False)}") + + print("----------------------\n") + print( + "Use ↑↓ to navigate, ←→ to adjust Current Servo ID or Servo Angles" + ) + print(f"Press 'Enter' when Current Servo ID is selected to save the currently highlighted ID in {Fore.GREEN}green{Style.RESET_ALL}.") + print("Press 'z' to reset servo position") + print( + "Press 'r' to toggle rotation mode (enables continuous wheel-like rotation)" + ) + print("Press 'f' to set free mode") + print(f"Press 'd' to set default EEPROM parameters {Fore.RED}(WARNING: This action will overwrite the servo's EEPROM).{Style.RESET_ALL}\n") + print("'q' to quit.") + + key = key_listener.get_key() + use_previous_result = False + + # Perform actions based on key + if key == "z": + self.reset_servo_position() + elif key == "r": + self.toggle_rotation_mode() + elif key == "f": + self.set_free_mode() + elif key == "q": + print("Exiting...") + break + elif key == "d": + print( + f"{Fore.RED}WARNING: This will overwrite the servo's EEPROM with default values.{Style.RESET_ALL}" + ) + print("Press 'y' to proceed or any other key to cancel.") + while key_listener.running: + confirm_key = key_listener.get_key() + if confirm_key == "y": + print(f"{Fore.RED}Setting default EEPROM parameters...{Style.RESET_ALL}") + self.set_default_eeprom_param() + break + elif confirm_key is not None: + print(f"{Fore.YELLOW}Action canceled.{Style.RESET_ALL}") + break + elif key == readchar.key.UP: + self.selected_index = (self.selected_index - 1) % len( + selectable_options + ) + elif key == readchar.key.DOWN: + self.selected_index = (self.selected_index + 1) % len( + selectable_options + ) + elif ( + key == readchar.key.ENTER + and selectable_options[self.selected_index] == "Current Servo ID" + ): + self.set_servo_id(self.servo_id) + time.sleep(0.3) + if self.servo_id_to_rotation is not None: + self.set_rotation(self.servo_id_to_rotation[self.servo_id]) + time.sleep(0.3) + elif ( + key == readchar.key.LEFT + and selectable_options[self.selected_index] == "Current Servo ID" + ): + if self.servo_id_index == 0: + self.servo_id_index = len(self.servo_candidates) - 1 + else: + self.servo_id_index = max(0, self.servo_id_index - 1) + self.servo_id = self.servo_candidates[self.servo_id_index] + elif ( + key == readchar.key.RIGHT + and selectable_options[self.selected_index] == "Current Servo ID" + ): + if self.servo_id_index == len(self.servo_candidates) - 1: + self.servo_id_index = 0 + else: + self.servo_id_index = min( + len(self.servo_candidates) - 1, self.servo_id_index + 1 + ) + self.servo_id = self.servo_candidates[self.servo_id_index] + elif ( + key == readchar.key.LEFT + and selectable_options[self.selected_index] == "Angle" + ): + self.decrease_angle() + elif ( + key == readchar.key.RIGHT + and selectable_options[self.selected_index] == "Angle" + ): + self.increase_angle() + else: + use_previous_result = True + except Exception as e: + print(f'[ERROR] {e}') + use_previous_result = False + self.close_connection() + continue + # Flush the output to ensure it displays correctly + sys.stdout.flush() + + # Wait for a short period before updating again + # time.sleep(0.1) + + except KeyboardInterrupt: + print("\nDisplay stopped by user.") + key_listener.stop() + + def get_status(self, option, param=None, selected=False): + """Return the status based on the selected option.""" + if option == "Current Servo ID": + if param is not None: + current_servo_id = param["servo-id"] + else: + current_servo_id = self.get_servo_id() + + s = f'{current_servo_id}' + if selected: + str = f"{Style.RESET_ALL}[" + for i, servo_id in enumerate(self.servo_candidates): + if i == self.servo_id_index: + str += f"{Fore.GREEN}{servo_id}{Style.RESET_ALL} " + else: + str += f"{servo_id} " + str += "]" + s += f' -> Next Servo ID: {str}' + return s + elif option == "Angle": + angle = self.read_angle() + angle = int((angle - 7500) / degree_to_pulse) + return f"{angle}" + elif option == "Baud Rate": + if param is not None: + baudrate = param["baud"] + else: + baudrate = self.read_baud() + return f"{format_baud(baudrate)}bps" + elif option == "Rotation Mode": + if param is not None: + rotation = param["rotation"] + else: + rotation = self.read_rotation() + return f"{Fore.GREEN if rotation else Fore.RED}{'Enabled' if rotation else 'Disabled'}{Style.RESET_ALL}" + elif option == "Slave Mode": + if param is not None: + slave = param["slave"] + else: + slave = self.read_slave() + return f"{Fore.GREEN if slave else Fore.RED}{'Enabled' if slave else 'Disabled'}{Style.RESET_ALL}" + elif option == "Reverse Mode": + if param is not None: + reverse = param["reverse"] + else: + reverse = self.read_reverse() + return f"{Fore.GREEN if reverse else Fore.RED}{'Enabled' if reverse else 'Disabled'}{Style.RESET_ALL}" + elif option == "Serial Mode": + if param is not None: + serial = param["serial"] + else: + serial = self.read_serial() + return f"{Fore.GREEN if serial else Fore.RED}{'Enabled' if serial else 'Disabled'}{Style.RESET_ALL}" + elif option == "Free": + if param is not None: + free = param["free"] + else: + free = self.read_free() + return f"{Fore.GREEN if free else Fore.RED}{'Enabled' if free else 'Disabled'}{Style.RESET_ALL}" + + def read_angle(self, sid=None): + if sid is None: + sid = self.get_servo_id() + self.ics.write(bytes([0xA0 | (sid & 0x1F), 5])) + time.sleep(0.1) + v = self.ics.read(6) + angle = ((v[4] & 0x7F) << 7) | (v[5] & 0x7F) + return angle + + def set_angle(self, v=7500, servo_id=None): + v = int(v) + if servo_id is None: + servo_id = self.get_servo_id() + self.ics.write(bytes([0x80 | (servo_id & 0x1F), (v >> 7) & 0x7F, v & 0x7F])) + time.sleep(0.1) + v = self.ics.read(6) + angle = ((v[4] & 0x7F) << 7) | (v[5] & 0x7F) + return angle + + def read_param(self, servo_id=None): + if servo_id is None: + servo_id = self.get_servo_id() + self.ics.write(bytes([0xA0 | servo_id, 0x00])) + time.sleep(0.1) + ret = self.ics.read(68) + ics_param64 = ret[4:] + result = self.parse_param64_key_value(list(ics_param64)) + return list(ics_param64), result + + def _4bit2num(self, lst, v): + sum_val = 0 + for i in lst: + sum_val = (sum_val << 4) + (v[i - 1] & 0x0F) + return sum_val + + +if __name__ == '__main__': + servo_controller = ICSServoController() + servo_controller.open_connection() diff --git a/requirements.txt b/requirements.txt index cec765f6..3cddcac5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,4 +6,5 @@ numpy pandas pyelftools pyserial +readchar tabulate diff --git a/ros/kxr_controller/config/pooh_controllers.yaml b/ros/kxr_controller/config/pooh_controllers.yaml new file mode 100644 index 00000000..363113c5 --- /dev/null +++ b/ros/kxr_controller/config/pooh_controllers.yaml @@ -0,0 +1,32 @@ +head_controller: + type: position_controllers/JointTrajectoryController + joints: + - head_neck_y + - head_neck_p + - head_neck_r + + state_publish_rate: 100 # Override default + action_monitor_rate: 100 # Override default + stop_trajectory_duration: 0 # Override default + +rarm_controller: + type: position_controllers/JointTrajectoryController + joints: + - rarm_shoulder_p + - rarm_shoulder_r + - rarm_elbow + + state_publish_rate: 100 # Override default + action_monitor_rate: 100 # Override default + stop_trajectory_duration: 0 # Override default + +larm_controller: + type: position_controllers/JointTrajectoryController + joints: + - larm_shoulder_p + - larm_shoulder_r + - larm_elbow + + state_publish_rate: 100 # Override default + action_monitor_rate: 100 # Override default + stop_trajectory_duration: 0 # Override default diff --git a/ros/kxr_controller/launch/kxr_controller.launch b/ros/kxr_controller/launch/kxr_controller.launch index 2722dc9e..cdfb9b23 100644 --- a/ros/kxr_controller/launch/kxr_controller.launch +++ b/ros/kxr_controller/launch/kxr_controller.launch @@ -18,6 +18,10 @@ + @@ -52,6 +56,7 @@ temperature_limit: $(arg temperature_limit) read_current: $(arg read_current) read_temperature: $(arg read_temperature) + use_fullbody_controller: $(arg use_fullbody_controller) diff --git a/ros/kxr_controller/launch/kxr_controller_for_wheel.launch b/ros/kxr_controller/launch/kxr_controller_for_wheel.launch index 52b99831..d103b8f4 100644 --- a/ros/kxr_controller/launch/kxr_controller_for_wheel.launch +++ b/ros/kxr_controller/launch/kxr_controller_for_wheel.launch @@ -15,6 +15,10 @@ + @@ -32,6 +36,7 @@ + diff --git a/ros/kxr_controller/launch/pooh_bringup.launch b/ros/kxr_controller/launch/pooh_bringup.launch index 6c3d05ac..e9b06b06 100644 --- a/ros/kxr_controller/launch/pooh_bringup.launch +++ b/ros/kxr_controller/launch/pooh_bringup.launch @@ -10,6 +10,12 @@ + + kxr_controller - 0.0.3 + 0.0.7 The kxr_controller package Iori Yanokura diff --git a/ros/kxr_controller/python/kxr_controller/pooh_interface.py b/ros/kxr_controller/python/kxr_controller/pooh_interface.py new file mode 100644 index 00000000..f819ff87 --- /dev/null +++ b/ros/kxr_controller/python/kxr_controller/pooh_interface.py @@ -0,0 +1,60 @@ +import control_msgs.msg +from kxr_controller.kxr_interface import KXRROSRobotInterface + + +class PoohROSRobotInterface(KXRROSRobotInterface): + + @property + def fullbody_controller(self): + cont_name = "fullbody_controller" + return { + "controller_type": cont_name, + "controller_action": cont_name + "/follow_joint_trajectory", + "controller_state": cont_name + "/state", + "action_type": control_msgs.msg.FollowJointTrajectoryAction, + "joint_names": self.joint_names, + } + + @property + def larm_controller(self): + return dict( + controller_type='larm_controller', + controller_action='larm_controller/follow_joint_trajectory', + controller_state='larm_controller/state', + action_type=control_msgs.msg.FollowJointTrajectoryAction, + joint_names=['larm_shoulder_p', + 'larm_shoulder_r', + 'larm_elbow']) + + @property + def rarm_controller(self): + return dict( + controller_type='rarm_controller', + controller_action='rarm_controller/follow_joint_trajectory', + controller_state='rarm_controller/state', + action_type=control_msgs.msg.FollowJointTrajectoryAction, + joint_names=['rarm_shoulder_p', + 'rarm_shoulder_r', + 'rarm_elbow']) + + @property + def head_controller(self): + return dict( + controller_type='head_controller', + controller_action='head_controller/follow_joint_trajectory', + controller_state='head_controller/state', + action_type=control_msgs.msg.FollowJointTrajectoryAction, + joint_names=['head_neck_y', 'head_neck_p', 'head_neck_r']) + + def default_controller(self): + """Overriding default_controller. + + Returns + ------- + List of limb controller : list + """ + return [ + self.larm_controller, + self.rarm_controller, + self.head_controller, + ] diff --git a/ros/kxr_controller/scripts/i2c_joint_control.py b/ros/kxr_controller/scripts/i2c_joint_control.py index ab601d26..cd03ca8d 100755 --- a/ros/kxr_controller/scripts/i2c_joint_control.py +++ b/ros/kxr_controller/scripts/i2c_joint_control.py @@ -31,7 +31,7 @@ def joint_state_callback(msg): def main(): rospy.init_node('joint_state_to_i2c', anonymous=True) - rospy.Subscriber("joint_states", JointState, joint_state_callback) + rospy.Subscriber("eye_brow/joint_states", JointState, joint_state_callback) rospy.loginfo("Started JointState to I2C bridge") rospy.spin() diff --git a/ros/kxr_controller/scripts/neck_command_listener.py b/ros/kxr_controller/scripts/neck_command_listener.py index a9f8d617..e4976b18 100644 --- a/ros/kxr_controller/scripts/neck_command_listener.py +++ b/ros/kxr_controller/scripts/neck_command_listener.py @@ -5,7 +5,7 @@ import time import numpy as np import rospy -from kxr_controller.kxr_interface import KXRROSRobotInterface +from kxr_controller.pooh_interface import PoohROSRobotInterface from kxr_models.download_urdf import download_urdf_mesh_files from skrobot.model import RobotModel from std_msgs.msg import String # メッセージはString型と仮定 @@ -18,36 +18,47 @@ robot_model.load_urdf_from_robot_description( namespace + '/robot_description_viz') -ri = KXRROSRobotInterface( # NOQA +ri = PoohROSRobotInterface( # NOQA robot_model, namespace=namespace, controller_timeout=60.0) # 動作の定義 def nod(send_time=1): - robot_model.head_neck_pitch.joint_angle(np.deg2rad(30)) - ri.angle_vector(robot_model.angle_vector(), send_time) + controller_type = 'head_controller' + robot_model.head_neck_p.joint_angle(np.deg2rad(30)) + ri.angle_vector(robot_model.angle_vector(), send_time, + controller_type=controller_type) time.sleep(1) - robot_model.head_neck_pitch.joint_angle(np.deg2rad(0)) - ri.angle_vector(robot_model.angle_vector(), send_time) + robot_model.head_neck_p.joint_angle(np.deg2rad(0)) + ri.angle_vector(robot_model.angle_vector(), send_time, + controller_type=controller_type) def disagree(send_time=1): - robot_model.head_neck_yaw.joint_angle(np.deg2rad(30)) - ri.angle_vector(robot_model.angle_vector(), send_time) + controller_type = 'head_controller' + robot_model.head_neck_y.joint_angle(np.deg2rad(30)) + ri.angle_vector(robot_model.angle_vector(), send_time, + controller_type=controller_type) time.sleep(1) - robot_model.head_neck_yaw.joint_angle(np.deg2rad(-30)) - ri.angle_vector(robot_model.angle_vector(), send_time) + robot_model.head_neck_y.joint_angle(np.deg2rad(-30)) + ri.angle_vector(robot_model.angle_vector(), send_time, + controller_type=controller_type) time.sleep(1) - robot_model.head_neck_yaw.joint_angle(np.deg2rad(30)) - ri.angle_vector(robot_model.angle_vector(), send_time) + robot_model.head_neck_y.joint_angle(np.deg2rad(30)) + ri.angle_vector(robot_model.angle_vector(), send_time, + controller_type=controller_type) time.sleep(1) - robot_model.head_neck_yaw.joint_angle(np.deg2rad(0)) - ri.angle_vector(robot_model.angle_vector(), send_time) + robot_model.head_neck_y.joint_angle(np.deg2rad(0)) + ri.angle_vector(robot_model.angle_vector(), send_time, + controller_type=controller_type) def tilt(send_time=1): - robot_model.head_neck_roll.joint_angle(np.deg2rad(30)) - ri.angle_vector(robot_model.angle_vector(), send_time) + controller_type = 'head_controller' + robot_model.head_neck_r.joint_angle(np.deg2rad(30)) + ri.angle_vector(robot_model.angle_vector(), send_time, + controller_type=controller_type) time.sleep(2) - robot_model.head_neck_roll.joint_angle(np.deg2rad(0)) - ri.angle_vector(robot_model.angle_vector(), send_time) + robot_model.head_neck_r.joint_angle(np.deg2rad(0)) + ri.angle_vector(robot_model.angle_vector(), send_time, + controller_type=controller_type) # コールバック関数: /neck_motionのメッセージを受け取り対応する動作を実行 def neck_motion_callback(msg): diff --git a/ros/kxr_controller/scripts/neck_command_listener_independent.py b/ros/kxr_controller/scripts/neck_command_listener_independent.py index 25ae671b..c8be3561 100644 --- a/ros/kxr_controller/scripts/neck_command_listener_independent.py +++ b/ros/kxr_controller/scripts/neck_command_listener_independent.py @@ -5,7 +5,7 @@ import time import rospy from std_msgs.msg import String -from kxr_controller.kxr_interface import KXRROSRobotInterface +from kxr_controller.pooh_interface import PoohROSRobotInterface from kxr_models.download_urdf import download_urdf_mesh_files from skrobot.model import RobotModel import numpy as np @@ -20,7 +20,7 @@ robot_model.load_urdf_from_robot_description( namespace + '/robot_description_viz') rospy.loginfo("Init Real Robot Interface") -ri = KXRROSRobotInterface(robot_model, namespace=None, controller_timeout=60.0) +ri = PoohROSRobotInterface(robot_model, namespace=None, controller_timeout=60.0) rospy.loginfo("Init Real Robot Interface Done") # サーボをONにし、init_pose @@ -33,38 +33,51 @@ # nod動作 def nod(send_time=1): - ri.angle_vector(robot_model.init_pose()) + controller_type = 'head_controller' + ri.angle_vector(robot_model.init_pose(), + controller_type=controller_type) robot_model.head_neck_p.joint_angle(np.deg2rad(30)) - ri.angle_vector(robot_model.angle_vector(), send_time) + ri.angle_vector(robot_model.angle_vector(), send_time, + controller_type=controller_type) ri.wait_interpolation() robot_model.head_neck_p.joint_angle(np.deg2rad(0)) - ri.angle_vector(robot_model.angle_vector(), send_time) + ri.angle_vector(robot_model.angle_vector(), send_time, + controller_type=controller_type) ri.wait_interpolation() # disagree動作 def disagree(send_time=1): - ri.angle_vector(robot_model.init_pose()) + controller_type = 'head_controller' + ri.angle_vector(robot_model.init_pose(), + controller_type=controller_type) robot_model.head_neck_y.joint_angle(np.deg2rad(30)) - ri.angle_vector(robot_model.angle_vector(), send_time) + ri.angle_vector(robot_model.angle_vector(), send_time, + controller_type=controller_type) ri.wait_interpolation() robot_model.head_neck_y.joint_angle(np.deg2rad(-30)) - ri.angle_vector(robot_model.angle_vector(), send_time) + ri.angle_vector(robot_model.angle_vector(), send_time, + controller_type=controller_type) ri.wait_interpolation() #robot_model.head_neck_y.joint_angle(np.deg2rad(30)) #ri.angle_vector(robot_model.angle_vector(), send_time) #ri.wait_interpolation() robot_model.head_neck_y.joint_angle(np.deg2rad(0)) - ri.angle_vector(robot_model.angle_vector(), send_time) + ri.angle_vector(robot_model.angle_vector(), send_time, + controller_type=controller_type) ri.wait_interpolation() # tilt動作 def tilt(send_time=1): - ri.angle_vector(robot_model.init_pose()) + controller_type = 'head_controller' + ri.angle_vector(robot_model.init_pose(), + controller_type=controller_type) robot_model.head_neck_r.joint_angle(np.deg2rad(30)) - ri.angle_vector(robot_model.angle_vector(), send_time) + ri.angle_vector(robot_model.angle_vector(), send_time, + controller_type=controller_type) ri.wait_interpolation() robot_model.head_neck_r.joint_angle(np.deg2rad(0)) - ri.angle_vector(robot_model.angle_vector(), send_time) + ri.angle_vector(robot_model.angle_vector(), send_time, + controller_type=controller_type) ri.wait_interpolation() # コールバック関数 diff --git a/ros/kxr_controller/scripts/neck_test.py b/ros/kxr_controller/scripts/neck_test.py index d0c59bac..976341da 100644 --- a/ros/kxr_controller/scripts/neck_test.py +++ b/ros/kxr_controller/scripts/neck_test.py @@ -6,7 +6,7 @@ import argparse import numpy as np import IPython -from kxr_controller.kxr_interface import KXRROSRobotInterface +from kxr_controller.pooh_interface import PoohROSRobotInterface from kxr_models.download_urdf import download_urdf_mesh_files import rospy from skrobot.model import RobotModel @@ -19,36 +19,47 @@ robot_model = RobotModel() robot_model.load_urdf_from_robot_description( namespace + '/robot_description_viz') -ri = KXRROSRobotInterface( # NOQA +ri = PoohROSRobotInterface( # NOQA robot_model, namespace=namespace, controller_timeout=60.0) #define nod, disagree, tilt and then define test def nod(send_time = 1): - robot_model.head_neck_pitch.joint_angle(np.deg2rad(30)) - ri.angle_vector(robot_model.angle_vector(), send_time) + controller_type = 'head_controller' + robot_model.head_neck_p.joint_angle(np.deg2rad(30)) + ri.angle_vector(robot_model.angle_vector(), send_time, + controller_type=controller_type) time.sleep(1) - robot_model.head_neck_pitch.joint_angle(np.deg2rad(0)) - ri.angle_vector(robot_model.angle_vector(), send_time) + robot_model.head_neck_p.joint_angle(np.deg2rad(0)) + ri.angle_vector(robot_model.angle_vector(), send_time, + controller_type=controller_type) def disagree(send_time = 1): - robot_model.head_neck_yaw.joint_angle(np.deg2rad(30)) - ri.angle_vector(robot_model.angle_vector(), send_time) - time.sleep(1) - robot_model.head_neck_yaw.joint_angle(np.deg2rad(-30)) - ri.angle_vector(robot_model.angle_vector(), send_time) - time.sleep(1) - robot_model.head_neck_yaw.joint_angle(np.deg2rad(30)) - ri.angle_vector(robot_model.angle_vector(), send_time) - time.sleep(1) - robot_model.head_neck_yaw.joint_angle(np.deg2rad(0)) - ri.angle_vector(robot_model.angle_vector(), send_time) + controller_type = 'head_controller' + robot_model.head_neck_y.joint_angle(np.deg2rad(30)) + ri.angle_vector(robot_model.angle_vector(), send_time, + controller_type=controller_type) + time.sleep(1) + robot_model.head_neck_y.joint_angle(np.deg2rad(-30)) + ri.angle_vector(robot_model.angle_vector(), send_time, + controller_type=controller_type) + time.sleep(1) + robot_model.head_neck_y.joint_angle(np.deg2rad(30)) + ri.angle_vector(robot_model.angle_vector(), send_time, + controller_type=controller_type) + time.sleep(1) + robot_model.head_neck_y.joint_angle(np.deg2rad(0)) + ri.angle_vector(robot_model.angle_vector(), send_time, + controller_type=controller_type) def tilt(send_time = 1): - robot_model.head_neck_roll.joint_angle(np.deg2rad(30)) - ri.angle_vector(robot_model.angle_vector(), send_time) + controller_type = 'head_controller' + robot_model.head_neck_r.joint_angle(np.deg2rad(30)) + ri.angle_vector(robot_model.angle_vector(), send_time, + controller_type=controller_type) time.sleep(2) - robot_model.head_neck_roll.joint_angle(np.deg2rad(0)) - ri.angle_vector(robot_model.angle_vector(), send_time) + robot_model.head_neck_r.joint_angle(np.deg2rad(0)) + ri.angle_vector(robot_model.angle_vector(), send_time, + controller_type=controller_type) def test(): diff --git a/ros/kxr_models/package.xml b/ros/kxr_models/package.xml index fb958d2a..b2962bab 100644 --- a/ros/kxr_models/package.xml +++ b/ros/kxr_models/package.xml @@ -1,6 +1,6 @@ kxr_models - 0.0.3 + 0.0.7

URDF Description package for kxr_models

This package contains configuration data, 3D models and launch files for kxr robot

diff --git a/ros/kxreus/euslisp/kxr-interface.l b/ros/kxreus/euslisp/kxr-interface.l index ebd299ab..21f58ed7 100644 --- a/ros/kxreus/euslisp/kxr-interface.l +++ b/ros/kxreus/euslisp/kxr-interface.l @@ -45,7 +45,7 @@ (defclass kxr-interface ;; :super robot-interface :super robot-move-base-interface - :slots (joint-names servo-on-off-client controller-name control-pressure)) + :slots (joint-names servo-on-off-client controller-name control-pressure use-sim-time)) (defmethod kxr-interface @@ -72,28 +72,32 @@ :cmd-vel-topic "/diff_drive_controller/cmd_vel" args) - (setq servo-on-off-client (instance ros::simple-action-client :init - (if namespace (format nil "/~A/~A/servo_on_off_real_interface" namespace controller-name) - (format nil "/~A/servo_on_off_real_interface" controller-name)) - kxr_controller::ServoOnOffAction + (let (actions) + (setq use-sim-time (and (ros::has-param "/use_sim_time") (ros::get-param "/use_sim_time"))) + (when (not use-sim-time) + (setq servo-on-off-client (instance ros::simple-action-client :init + (if namespace (format nil "/~A/~A/servo_on_off_real_interface" namespace controller-name) + (format nil "/~A/servo_on_off_real_interface" controller-name)) + kxr_controller::ServoOnOffAction + :groupname groupname)) + (setq stretch-client (instance ros::simple-action-client :init + (if namespace (format nil "/~A/~A/stretch_interface" namespace controller-name) + (format nil "/~A/stretch_interface" controller-name)) + kxr_controller::StretchAction :groupname groupname)) - (setq stretch-client (instance ros::simple-action-client :init - (if namespace (format nil "/~A/~A/stretch_interface" namespace controller-name) - (format nil "/~A/stretch_interface" controller-name)) - kxr_controller::StretchAction - :groupname groupname)) - (when control-pressure - (setq pressure-control-client (instance ros::simple-action-client :init - (if namespace (format nil "/~A/~A/pressure_control_interface" namespace controller-name) - (format nil "/~A/pressure_control_interface" controller-name)) - kxr_controller::PressureControlAction - :groupname groupname))) - (dolist (action (if control-pressure (list servo-on-off-client stretch-client pressure-control-client) - (list servo-on-off-client stretch-client))) - (unless (and joint-action-enable (send action :wait-for-server 3)) - (setq joint-action-enable nil) - (ros::ros-warn "~A is not respond, kxr-interface is disabled" action) - (return))) + (setq actions (list servo-on-off-client stretch-client)) + (when control-pressure + (setq pressure-control-client (instance ros::simple-action-client :init + (if namespace (format nil "/~A/~A/pressure_control_interface" namespace controller-name) + (format nil "/~A/pressure_control_interface" controller-name)) + kxr_controller::PressureControlAction + :groupname groupname)) + (setq actions (list servo-on-off-client stretch-client pressure-control-client)))) + (dolist (action actions) + (unless (and joint-action-enable (send action :wait-for-server 3)) + (setq joint-action-enable nil) + (ros::ros-warn "~A is not respond, kxr-interface is disabled" action) + (return)))) t) (:default-controller () @@ -108,6 +112,8 @@ (cons :joint-names joint-names)))) (:servo-on (&key (names nil)) + (when use-sim-time + (return-from :servo-on t)) (unless joint-action-enable (if viewer (send self :draw-objects)) (return-from :servo-on t)) @@ -120,6 +126,8 @@ (send servo-on-off-client :send-goal goal))) (:servo-off (&key (names nil)) + (when use-sim-time + (return-from :servo-on t)) (unless joint-action-enable (if viewer (send self :draw-objects)) (return-from :servo-off t)) @@ -132,6 +140,8 @@ (send servo-on-off-client :send-goal goal))) (:send-stretch (&key (value 127) (names nil)) + (when use-sim-time + (return-from :servo-on t)) (when (null names) (setq names joint-names)) (let* (goal result) @@ -141,12 +151,16 @@ (send stretch-client :send-goal goal))) (:read-stretch () + (when use-sim-time + (return-from :servo-on t)) (one-shot-subscribe (if namespace (format nil "/~A/~A/stretch" namespace controller-name) (format nil "/~A/stretch" controller-name)) kxr_controller::Stretch)) (:send-pressure-control (&key board-idx start-pressure stop-pressure release) + (when use-sim-time + (return-from :servo-on t)) (let* (goal result) (setq goal (instance kxr_controller::PressureControlGoal :init)) (send goal :board_idx board-idx) @@ -156,6 +170,8 @@ (send pressure-control-client :send-goal goal))) (:read-pressure (&key board-idx) + (when use-sim-time + (return-from :servo-on t)) (one-shot-subscribe (if namespace (format nil "/~A/~A/pressure/~A" namespace controller-name board-idx) (format nil "/~A/pressure/~A" controller-name board-idx)) diff --git a/ros/kxreus/package.xml b/ros/kxreus/package.xml index 22411cad..51ec376b 100644 --- a/ros/kxreus/package.xml +++ b/ros/kxreus/package.xml @@ -1,7 +1,7 @@ kxreus - 0.0.3 + 0.0.7 roseus interface for kxr robots. diff --git a/setup.py b/setup.py index b12952b3..66151bff 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ from setuptools import find_packages from setuptools import setup -version = "0.0.3" +version = "0.0.7" if sys.argv[-1] == "release": @@ -59,6 +59,7 @@ "console_scripts": [ "rcb4-write-firmware=rcb4.apps.write_firmware:main", "armh7-tools=rcb4.apps.armh7_tool:main", + "ics-manager=rcb4.apps.ics_manager:main", ], }, )