diff --git a/src/comodo/mujocoSimulator/callbacks.py b/src/comodo/mujocoSimulator/callbacks.py new file mode 100644 index 0000000..9491f2d --- /dev/null +++ b/src/comodo/mujocoSimulator/callbacks.py @@ -0,0 +1,142 @@ +from abc import ABC, abstractmethod +from typing import List +from comodo.mujocoSimulator.mjcontactinfo import MjContactInfo +import types +import logging +import mujoco +import copy + +class Callback(ABC): + def __init__(self, *args, **kwargs) -> None: + self.args = args + self.kwargs = kwargs + self.simulator = None + + def set_simulator(self, simulator): + self.simulator = simulator + + @abstractmethod + def on_simulation_start(self) -> None: + pass + + @abstractmethod + def on_simulation_step(self, t: float, data: mujoco.MjData, opts: dict = None) -> None: + pass + + @abstractmethod + def on_simulation_end(self) -> None: + pass + + +class EarlyStoppingCallback(Callback): + def __init__(self, condition, *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + self.score = 0 + self.condition = condition + + def on_simulation_start(self) -> None: + pass + + def on_simulation_step(self, t: float, iter: int, data: mujoco.MjData, opts: dict = None) -> None: + if self.condition(t, iter, data, opts, *self.args, **self.kwargs): + if self.simulator is not None: + self.simulator.should_stop = True + + def on_simulation_end(self) -> None: + pass + + +class ScoreCallback(Callback): + def __init__(self, score_function, *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + self.score = 0 + self.history = [] + self.score_function = score_function + + def on_simulation_start(self) -> None: + self.score = 0 + self.history = [] + + def on_simulation_step(self, t: float, iter: int, data: mujoco.MjData, opts: dict = None) -> None: + score = self.score_function(t, iter, data, opts, *self.args, **self.kwargs) + self.score += score + self.history.append(score) + + def on_simulation_end(self) -> None: + pass + + +class TrackerCallback(Callback): + def __init__(self, tracked_variables: list, print_values: bool | list = False, *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + self.tracked_variables = tracked_variables + + if isinstance(print_values, list): + self.print_values = print_values + elif isinstance(print_values, bool): + self.print_values = [print_values for _ in range(len(tracked_variables))] + else: + raise ValueError(f"print_values should be a boolean or a list of booleans for masking, not {type(print_values)}") + + self.t = [] + self.vals = {} + + def on_simulation_start(self) -> None: + pass + + def on_simulation_step(self, t: float, iter: int, data: mujoco.MjData, opts: dict = None) -> None: + self.t.append(t) + for var in self.tracked_variables: + if isinstance(var, str): + try: + v = eval(f"data.{var}") + if not var in self.vals: + self.vals[var] = [] + self.vals[var].append(copy.deepcopy(v)) + if self.print_values[self.tracked_variables.index(var)]: + print(f"{var}: {v}") + except: + print(f"Error: {self.tracked_variables} not found in data") + elif isinstance(var, types.FunctionType): + v = var(t, iter, data, opts, *self.args, **self.kwargs) + if not var.__name__ in self.vals: + self.vals[var.__name__] = [] + if not isinstance(v, (int, float)): + return + self.vals[var.__name__].append(v) + if self.print_values[self.tracked_variables.index(var)]: + print(f"{var.__name__}: {v}") + + def on_simulation_end(self) -> None: + pass + + def get_tracked_values(self): + return self.t, self.vals + + +class ContactCallback(Callback): + def __init__(self, tracked_bodies: List[str] = [], logger: logging.Logger = None, *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + self.tracked_bodies = tracked_bodies + self.logger = logger + self.last_contact = None + + def on_simulation_start(self) -> None: + pass + + def on_simulation_step(self, t: float, iter: int, data: mujoco.MjData, opts: dict = None) -> None: + if opts.get("contact", None) is not None: + contact_info = opts["contact"] + assert isinstance(contact_info, MjContactInfo), "Contact info is not an instance of MjContactInfo" + if contact_info.is_none(): + return + self.last_contact = contact_info + if self.logger is not None: + self.logger.debug(f"Contact detected at t={t}: {contact_info}") + else: + pass + #print(f"Contact detected at t={t}: {contact_info}") + + + def on_simulation_end(self) -> None: + pass diff --git a/src/comodo/mujocoSimulator/mjcontactinfo.py b/src/comodo/mujocoSimulator/mjcontactinfo.py new file mode 100644 index 0000000..d554017 --- /dev/null +++ b/src/comodo/mujocoSimulator/mjcontactinfo.py @@ -0,0 +1,39 @@ +from dataclasses import dataclass +from typing import Sequence, List + +class MjContactInfo: + """Wrapper class for mjContact struct of MuJoCo. + Accepts the struct instance individually or by unpacking the struct. + """ + def __init__(self, t: float, iter: int, *args): + self.t = t + self.iter = iter + if len(args) == 1: + self.mj_struct = args[0] + self.dist: float = self.mj_struct.dist[0] if self.mj_struct.dist.size > 0 else None + self.pos: List[float] = self.mj_struct.pos.flatten().tolist() if self.mj_struct.pos.size > 0 else None + self.frame: List[float] = self.mj_struct.frame.flatten().tolist() if self.mj_struct.frame.size > 0 else None + self.dim: int = self.mj_struct.dim[0] if self.mj_struct.dim.size > 0 else None + self.geom: List[int] = self.mj_struct.geom.flatten().tolist() if self.mj_struct.geom.size > 0 else None + self.flex: List[int] = self.mj_struct.flex.flatten().tolist() if self.mj_struct.flex.size > 0 else None + self.elem: List[int] = self.mj_struct.elem.flatten().tolist() if self.mj_struct.elem.size > 0 else None + self.vert: List[int] = self.mj_struct.vert.flatten().tolist() if self.mj_struct.vert.size > 0 else None + self.mu: float = float(self.mj_struct.mu[0]) if self.mj_struct.mu.size > 0 else None + self.H: List[float] = self.mj_struct.H.flatten().tolist() if self.mj_struct.H.size > 0 else None + + self._is_none = self.pos is None + else: + raise NotImplementedError("Unpacking the struct is not implemented yet.") + + def __str__(self): + return f"ContactInfo(t={self.t}, iter={self.iter}, dist={self.dist}, pos={self.pos}, frame={self.frame}, dim={self.dim}, geom={self.geom}, flex={self.flex}, elem={self.elem}, vert={self.vert}, mu={self.mu}, H={self.H})" + + def __repr__(self): + return f"ContactInfo(t={self.t}, iter={self.iter}, dist={self.dist}, pos={self.pos}, frame={self.frame}, dim={self.dim}, geom={self.geom}, flex={self.flex}, elem={self.elem}, vert={self.vert}, mu={self.mu}, H={self.H})" + + def is_none(self) -> bool: + return self._is_none + + def get_time(self) -> float: + return self.t + \ No newline at end of file diff --git a/src/comodo/mujocoSimulator/mujocoSimulator.py b/src/comodo/mujocoSimulator/mujocoSimulator.py index 8814108..ed123f0 100644 --- a/src/comodo/mujocoSimulator/mujocoSimulator.py +++ b/src/comodo/mujocoSimulator/mujocoSimulator.py @@ -1,25 +1,88 @@ from comodo.abstractClasses.simulator import Simulator +from comodo.robotModel.robotModel import RobotModel +from comodo.mujocoSimulator.callbacks import Callback +from comodo.mujocoSimulator.mjcontactinfo import MjContactInfo +from comodo.mujocoSimulator.visualiser import MujocoVideoRecorder, RenderOptions +from typing import Dict, List +from PIL import Image +import cv2 +import subprocess +import shutil import mujoco import math import numpy as np +import os import mujoco_viewer -import copy +import logging +import pathlib +import tempfile import casadi as cs class MujocoSimulator(Simulator): - def __init__(self) -> None: + def __init__(self, logger: logging.Logger = None, log_dir: str = "", video_path: str | os.PathLike = "", render_options: RenderOptions | None = None) -> None: + self.robot_model = None + self.model = None self.desired_pos = None self.postion_control = False + self.should_stop = False + self.t = 0 + self.iter = 0 + self.contacts = [] + + # Set up video recording + video_path = str(video_path) + self.save_video = video_path != "" + self.video_path = video_path + self.render_options = render_options if render_options is not None else RenderOptions() + + self._setup_logger(logger, log_dir) self.compute_misalignment_gravity_fun() super().__init__() - def load_model(self, robot_model, s, xyz_rpy, kv_motors=None, Im=None): - self.robot_model = robot_model + def _setup_logger(self, logger: logging.Logger, log_dir) -> None: + if logger is None: + self.logger = logging.getLogger(__name__) + self.logger.setLevel(logging.INFO) + ch = logging.StreamHandler() + ch.setLevel(logging.INFO) + FMT = "%(asctime)s - %(name)s - %(levelname)s - %(message)s" + fmt = logging.Formatter(fmt=FMT) + ch.setFormatter(fmt=fmt) + self.logger.addHandler(ch) + else: + self.logger = logger + + def load_model(self, robot_model: RobotModel, s, xyz_rpy, kv_motors=None, Im=None, floor_opts: Dict = {}) -> None: + """ + Loads the robot model into the MuJoCo simulator. - mujoco_xml = robot_model.get_mujoco_model() - self.model = mujoco.MjModel.from_xml_string(mujoco_xml) + Args: + robot_model (RobotModel): The robot model to be loaded. + s (array-like): The joint state vector. + xyz_rpy (array-like): The base pose in terms of position (xyz) and orientation (rpy). + kv_motors (array-like?): Motor velocity constants. Defaults to zeros if not provided. + Im (array-like?): Motor inertia values. Defaults to zeros if not provided. + Returns: + None + """ + + self.robot_model = robot_model + mujoco_xml = robot_model.get_mujoco_model(floor_opts=floor_opts, save_mjc_xml=False) + try: + self.model = mujoco.MjModel.from_xml_string(mujoco_xml) + except Exception as e: + with open("failed_mujoco.xml", "w") as f: + f.write(mujoco_xml) + self.logger.error(f"Failed to load the model: {e}. Dumped the model to failed_mujoco.xml") + raise self.data = mujoco.MjData(self.model) + if self.save_video: + self.recorder = MujocoVideoRecorder( + self.model, self.data, self.render_options + ) + print(f"Created recorder: {self.recorder}") + self.create_mapping_vector_from_mujoco() self.create_mapping_vector_to_mujoco() mujoco.mj_forward(self.model, self.data) @@ -27,53 +90,139 @@ def load_model(self, robot_model, s, xyz_rpy, kv_motors=None, Im=None): self.set_base_pose_in_mujoco(xyz_rpy=xyz_rpy) mujoco.mj_forward(self.model, self.data) self.visualize_robot_flag = False + self.should_stop = False self.Im = Im if not (Im is None) else np.zeros(self.robot_model.NDoF) self.kv_motors = ( kv_motors if not (kv_motors is None) else np.zeros(self.robot_model.NDoF) ) - self.H_left_foot = copy.deepcopy(self.robot_model.H_left_foot) - self.H_right_foot = copy.deepcopy(self.robot_model.H_right_foot) - self.H_left_foot_num = None - self.H_right_foot_num = None + # self.H_left_foot = copy.deepcopy(self.robot_model.H_left_foot) + # self.H_right_foot = copy.deepcopy(self.robot_model.H_right_foot) + # self.H_left_foot_num = None + # self.H_right_foot_num = None self.mass = self.robot_model.get_total_mass() + def get_model_frame_pose2(self, frame_name: str) -> np.ndarray: + frame_id = mujoco.mj_name2id(self.model, mujoco.mjtObj.mjOBJ_BODY, frame_name) + return self.data.xpos[frame_id] + + def get_model_frame_pose(self, frame_name: str) -> np.ndarray: + base_pose = self.get_base() + joint_values = self.get_state()[0] + frame_pose = self.robot_model.forward_kinematics(frame_name, base_pose, joint_values) + return frame_pose + + def get_contact_status(self) -> tuple: + """ + Determines the contact status of the left and right feet. + + Returns: + tuple: A tuple containing two boolean values: + - left_foot_contact (bool): True if the left foot is in contact, False otherwise. + - right_foot_contact (bool): True if the right foot is in contact, False otherwise. + """ - def get_contact_status(self): left_wrench, rigth_wrench = self.get_feet_wrench() - left_foot_contact = left_wrench[2] > 0.1*self.mass - right_foot_contact = rigth_wrench[2] > 0.1*self.mass + left_foot_contact = left_wrench[2] > 0.1 * self.mass + right_foot_contact = rigth_wrench[2] > 0.1 * self.mass return left_foot_contact, right_foot_contact - def set_visualize_robot_flag(self, visualize_robot): - self.visualize_robot_flag = visualize_robot + def set_visualize_robot_flag(self, visualize_robot) -> None: + """ + Sets the flag to visualize the robot and initializes the viewer if the flag is set to True. + + Args: + visualize_robot (bool): A flag indicating whether to visualize the robot. + """ + + self.visualize_robot_flag = visualize_robot and not self.save_video if self.visualize_robot_flag: + self.logger.info("Initializing the Mujoco viewer") self.viewer = mujoco_viewer.MujocoViewer(self.model, self.data) - def set_base_pose_in_mujoco(self, xyz_rpy): + def set_base_pose_in_mujoco(self, xyz_rpy) -> None: + """ + Set the base pose in the MuJoCo simulator. + + Args: + xyz_rpy (array-like): A 6-element array where the first three elements represent the + position (x, y, z) and the last three elements represent the + orientation in roll, pitch, and yaw (rpy) angles. + Returns: + None + Notes: + - This function converts the roll, pitch, and yaw angles to a quaternion and sets the + base position and orientation in the MuJoCo simulator's data structure. + """ + base_xyz_quat = np.zeros(7) base_xyz_quat[:3] = xyz_rpy[:3] base_xyz_quat[3:] = self.RPY_to_quat(xyz_rpy[3], xyz_rpy[4], xyz_rpy[5]) base_xyz_quat[2] = base_xyz_quat[2] self.data.qpos[:7] = base_xyz_quat - def set_joint_vector_in_mujoco(self, pos): + def set_joint_vector_in_mujoco(self, pos) -> None: + """ + Sets the joint positions in the MuJoCo simulation. + This method converts the given joint positions from the robot's coordinate + system to MuJoCo's coordinate system and updates the joint positions in the + MuJoCo simulation accordingly. + + Args: + pos (list or array-like): A list or array of joint positions in the robot's + coordinate system. + Returns: + None + """ + pos_muj = self.convert_vector_to_mujoco(pos) indexes_joint = self.model.jnt_qposadr[1:] for i in range(self.robot_model.NDoF): self.data.qpos[indexes_joint[i]] = pos_muj[i] - def set_input(self, input): + def set_input(self, input) -> None: + """ + Sets the input for the MuJoCo simulator. + This method converts the provided input vector to a format compatible + with MuJoCo and assigns it to the simulator's control data. + + Args: + input (array-like): The input vector to be set for the simulator. + Returns: + None + """ + input_muj = self.convert_vector_to_mujoco(input) self.data.ctrl = input_muj np.copyto(self.data.ctrl, input_muj) - def set_position_input(self, pos): + def set_position_input(self, pos) -> None: + """ + Sets the desired position input for the simulator. + This method converts the given position vector to the MuJoCo format and + sets it as the desired position. It also enables position control. + + Args: + pos (list or np.ndarray): The position vector to be set as the desired position. + """ + pos_muj = self.convert_vector_to_mujoco(pos) self.desired_pos = pos_muj self.postion_control = True - def create_mapping_vector_to_mujoco(self): + def create_mapping_vector_to_mujoco(self) -> None: + """ + Creates a mapping vector from the robot model's joint names to the Mujoco joint order. + This function initializes the `to_mujoco` attribute as an empty list and populates it with + indices that map each Mujoco joint to its corresponding index in the robot model's joint + name list. If a Mujoco joint is not found in the joint name list, a ValueError is raised. + + Returns: + None + Raises: + ValueError: If a Mujoco joint is not found in the robot model's joint name list. + """ + # This function creates the to_mujoco map self.to_mujoco = [] for mujoco_joint in self.robot_model.mujoco_joint_order: @@ -81,9 +230,24 @@ def create_mapping_vector_to_mujoco(self): index = self.robot_model.joint_name_list.index(mujoco_joint) self.to_mujoco.append(index) except ValueError: - raise ValueError(f"Mujoco joint '{mujoco_joint}' not found in joint list.") - - def create_mapping_vector_from_mujoco(self): + raise ValueError( + f"Mujoco joint '{mujoco_joint}' not found in joint list." + ) + + def create_mapping_vector_from_mujoco(self) -> None: + """ + Creates a mapping vector from the MuJoCo joint order to the robot model's joint order. + This function initializes the `from_mujoco` attribute as an empty list and then populates it + with indices that map each joint in the robot model's joint name list to its corresponding + index in the MuJoCo joint order list. + + Returns: + None + Raises: + ValueError: If a joint name in the robot model's joint name list is not found in the + MuJoCo joint order list. + """ + # This function creates the to_mujoco map self.from_mujoco = [] for joint in self.robot_model.joint_name_list: @@ -91,26 +255,67 @@ def create_mapping_vector_from_mujoco(self): index = self.robot_model.mujoco_joint_order.index(joint) self.from_mujoco.append(index) except ValueError: - raise ValueError(f"Joint name list joint '{joint}' not found in mujoco list.") - - def convert_vector_to_mujoco (self, array_in): - out_muj = np.asarray([array_in[self.to_mujoco[item]] for item in range(self.robot_model.NDoF)]) + raise ValueError( + f"Joint name list joint '{joint}' not found in mujoco list." + ) + + def convert_vector_to_mujoco(self, array_in) -> np.ndarray: + """ + Converts a given vector to the MuJoCo format. + This function takes an input array and reorders its elements according to the + mapping defined in `self.to_mujoco` for the degrees of freedom (DoF) of the robot model. + + Args: + array_in (array-like): The input array to be converted. + Returns: + np.ndarray: The converted array in MuJoCo format. + """ + + out_muj = np.asarray( + [array_in[self.to_mujoco[item]] for item in range(self.robot_model.NDoF)] + ) return out_muj - + def convert_from_mujoco(self, array_muj): - out_classic = np.asarray([array_muj[self.from_mujoco[item]] for item in range(self.robot_model.NDoF)]) + """ + Converts an array from MuJoCo format to a classic format. + + Args: + array_muj (np.ndarray): The input array in MuJoCo format. + Returns: + np.ndarray: The converted array in classic format. + """ + + out_classic = np.asarray( + [array_muj[self.from_mujoco[item]] for item in range(self.robot_model.NDoF)] + ) return out_classic - def step(self, n_step=1, visualize=True): + def step(self, n_step=1, visualize=True) -> None: + """ + Advances the simulation by a specified number of steps. + + Args: + n_step (int?): The number of simulation steps to advance. Default is 1. + visualize (bool?): If True, renders the simulation after stepping. Default is True. + Notes: + - If position control is enabled, the control input is computed using + proportional-derivative (PD) control based on the desired position. + - The control input is applied to the simulation, and the simulation + is advanced by the specified number of steps. + - If visualization is enabled, the simulation is rendered after stepping. + """ + if self.postion_control: for _ in range(n_step): s, s_dot, tau = self.get_state(use_mujoco_convention=True) - kp_muj = self.convert_vector_to_mujoco(self.robot_model.kp_position_control) - kd_muj = self.convert_vector_to_mujoco(self.robot_model.kd_position_control) - ctrl = ( - kp_muj * (self.desired_pos - s) - - kd_muj * s_dot + kp_muj = self.convert_vector_to_mujoco( + self.robot_model.kp_position_control + ) + kd_muj = self.convert_vector_to_mujoco( + self.robot_model.kd_position_control ) + ctrl = kp_muj * (self.desired_pos - s) - kd_muj * s_dot self.data.ctrl = ctrl np.copyto(self.data.ctrl, ctrl) mujoco.mj_step(self.model, self.data) @@ -120,11 +325,23 @@ def step(self, n_step=1, visualize=True): mujoco.mj_step(self.model, self.data, n_step) mujoco.mj_step1(self.model, self.data) mujoco.mj_forward(self.model, self.data) - + if self.visualize_robot_flag: self.viewer.render() - - def step_with_motors(self, n_step, torque): + if self.save_video: + self.recorder.record_frame() + + def step_with_motors(self, n_step, torque) -> None: + """ + Advances the simulation by a specified number of steps while applying motor torques. + + Args: + n_step (int): The number of simulation steps to advance. + torque (array-like): An array of torques to be applied to the motors. The length of the array should match the number of degrees of freedom (NDoF) of the robot. + Returns: + None + """ + indexes_joint_acceleration = self.model.jnt_dofadr[1:] s_dot_dot = self.data.qacc[indexes_joint_acceleration[0] :] for _ in range(n_step): @@ -145,14 +362,37 @@ def step_with_motors(self, n_step, torque): if self.visualize_robot_flag: self.viewer.render() - def compute_misalignment_gravity_fun(self): + def compute_misalignment_gravity_fun(self) -> None: + """ + Computes the misalignment gravity function and assigns it to the instance variable `error_mis`. + This function creates a symbolic 4x4 matrix `H` and a symbolic variable `theta`. It calculates + `theta` as the dot product of the vector [0, 0, 1] and the first three elements of the third + column of `H`, minus 1. It then defines a CasADi function `error` that takes `H` as input and + returns `theta` as output. This function is assigned to the instance variable `error_mis`. + + Returns: + None + """ + H = cs.SX.sym("H", 4, 4) theta = cs.SX.sym("theta") theta = cs.dot([0, 0, 1], H[:3, 2]) - 1 error = cs.Function("error", [H], [theta]) self.error_mis = error - def check_feet_status(self, s, H_b): + def check_feet_status(self, s: np.ndarray, H_b: np.ndarray) -> tuple: + """ + Checks the status of the robot's feet to determine if they are in contact with the ground and aligned properly. + + Args: + s (np.ndarray): The state vector of the robot. Shape must be (NDoF,). + H_b (np.ndarray): The homogeneous transformation matrix representing the base pose of the robot. Shape must be (4, 4). + Returns: + tuple: + bool: True if both feet are in contact with the ground and properly aligned, False otherwise. + float: The total misalignment error of both feet. + """ + left_foot_pose = self.robot_model.H_left_foot(H_b, s) rigth_foot_pose = self.robot_model.H_right_foot(H_b, s) left_foot_z = left_foot_pose[2, 3] @@ -172,10 +412,22 @@ def check_feet_status(self, s, H_b): return True, misalignment_error - def get_feet_wrench(self): + def get_feet_wrench(self) -> tuple: + """ + Computes the wrenches (forces and torques) applied to the left and right feet of the robot. + This method calculates the resulting wrenches on the robot's feet based on the current state + and contact forces. It iterates through all contacts, determines if the contact is with the + left or right foot, and accumulates the wrench for each foot. + + Returns: + tuple: A tuple containing two numpy arrays: + - left_foot_wrench (numpy.ndarray): A 6-element array representing the wrench on the left foot. + - right_foot_wrench (numpy.ndarray): A 6-element array representing the wrench on the right foot. + """ + left_foot_wrench = np.zeros(6) rigth_foot_wrench = np.zeros(6) - s,s_dot, tau = self.get_state() + s, s_dot, tau = self.get_state() H_b = self.get_base() self.H_left_foot_num = np.array(self.H_left_foot(H_b, s)) self.H_right_foot_num = np.array(self.H_right_foot(H_b, s)) @@ -204,8 +456,18 @@ def get_feet_wrench(self): wrench_LF = self.compute_resulting_wrench(LF_H_contact, c_array) left_foot_wrench[:] += wrench_LF.reshape(6) return (left_foot_wrench, rigth_foot_wrench) - - def compute_resulting_wrench(self, b_H_a, force_torque_a): + + def compute_resulting_wrench(self, b_H_a, force_torque_a) -> np.ndarray: + """ + Compute the resulting wrench (force and torque) in frame b given the wrench in frame a. + + Args: + b_H_a (np.ndarray): A 4x4 homogeneous transformation matrix representing the pose of frame a relative to frame b. + force_torque_a (np.ndarray): A 6-element array representing the force and torque in frame a. + Returns: + np.ndarray: A 6x1 array representing the force and torque in frame b. + """ + p = b_H_a[:3, 3] R = b_H_a[:3, :3] adjoint_matrix = np.zeros([6, 6]) @@ -216,8 +478,19 @@ def compute_resulting_wrench(self, b_H_a, force_torque_a): return force_torque_b # note that for mujoco the ordering is w,x,y,z - def get_base(self): + def get_base(self) -> np.ndarray: + """ + Computes the transformation matrix representing the base position and orientation + of the model in the simulation. + + Returns: + np.ndarray: A 4x4 transformation matrix where the upper-left 3x3 submatrix + represents the rotation matrix derived from the quaternion, and + the upper-right 3x1 submatrix represents the translation vector + (position) of the base. + """ indexes_joint = self.model.jnt_qposadr[1:] + # Extract quaternion components w, x, y, z = self.data.qpos[3 : indexes_joint[0]] @@ -253,37 +526,72 @@ def get_base(self): # Return transformation matrix return trans_mat - def get_base_velocity(self): + def get_base_velocity(self) -> np.ndarray: + """ + Retrieve the base velocity of the model. + This method extracts the base velocity from the simulation data. It uses the joint degrees of freedom + addresses to determine the relevant indices and returns the velocity of the base. + + Returns: + numpy.ndarray: The base velocity of the model. + """ + indexes_joint_velocities = self.model.jnt_dofadr[1:] return self.data.qvel[: indexes_joint_velocities[0]] - def get_state(self, use_mujoco_convention = False): + def get_state(self, use_mujoco_convention=False) -> tuple: + """ + Returns the state of the robot either in mujoco_convention or classic one. + If the model has no joints, an empty state is returned either way. + + Args: + use_mujoco_convention (bool): If True, the state is returned in mujoco_convention. If False, it is returned in classic convention. + Returns: + s_out (np.array): joint positions + s_dot_out (np.array): joint velocities + tau_out (np.array): joint torques + """ indexes_joint = self.model.jnt_qposadr[1:] indexes_joint_velocities = self.model.jnt_dofadr[1:] + if len(indexes_joint) == 0: + return np.array([]), np.array([]), np.array([]) s = self.data.qpos[indexes_joint[0] :] s_dot = self.data.qvel[indexes_joint_velocities[0] :] tau = self.data.ctrl - if(use_mujoco_convention): - return s,s_dot,tau + if use_mujoco_convention: + return s, s_dot, tau s_out = self.convert_from_mujoco(s) s_dot_out = self.convert_from_mujoco(s_dot) tau_out = self.convert_from_mujoco(tau) return s_out, s_dot_out, tau_out - def close(self): - if self.visualize_robot_flag: - self.viewer.close() + def get_simulation_time(self) -> float: + """ + Retrieve the current simulation time. - def visualize_robot(self): - self.viewer.render() + Returns: + float: The current time of the simulation. + """ - def get_simulation_time(self): return self.data.time - def get_simulation_frequency(self): + def get_simulation_frequency(self) -> float: return self.model.opt.timestep - def RPY_to_quat(self, roll, pitch, yaw): + def RPY_to_quat(self, roll, pitch, yaw) -> list: + """ + Convert roll, pitch, and yaw angles to a quaternion. + The quaternion is returned as a list of four elements [qw, qx, qy, qz]. + + Args: + roll (float): The roll angle in radians. + pitch (float): The pitch angle in radians. + yaw (float): The yaw angle in radians. + Returns: + list: A list containing the quaternion [qw, qx, qy, qz]. + """ + + cr = math.cos(roll / 2) cp = math.cos(pitch / 2) cy = math.cos(yaw / 2) @@ -298,6 +606,57 @@ def RPY_to_quat(self, roll, pitch, yaw): return [qw, qx, qy, qz] - def close_visualization(self): + def run(self, tf: float, callbacks: List[Callback] = [], visualise: bool = False) -> None: + """ + Run the simulation. + This method runs the simulation until the `should_stop` flag is set to True. + """ + + self.reset() + for callback in callbacks: + callback.on_simulation_start() + callback.set_simulator(self) + self.set_visualize_robot_flag(visualise) + while self.t < tf: + self.step() + + contact = MjContactInfo(self.t, self.iter, self.data.contact) + if not contact.is_none(): + self.contacts.append(contact) + + self.t = self.get_simulation_time() + for callback in callbacks: + d = { + "contact" : contact, + } + callback.on_simulation_step(self.t, self.iter, self.data, d) + self.iter += 1 + if self.should_stop: + break + for callback in callbacks: + callback.on_simulation_end() + + def close(self) -> None: + """ + Closes the simulator viewer if the visualization flag is set. + This method checks if the `visualize_robot_flag` is True. If it is, it closes the viewer associated with the simulator. + """ + if self.visualize_robot_flag: + self.logger.info("Closing the Mujoco viewer") self.viewer.close() + if self.save_video: + self.logger.info(f"Writing video to {self.video_path}") + #self.recorder.write_video(self.video_path) + + def reset(self) -> None: + """ + Resets the simulator to the initial state. + This method resets the simulator to the initial state by setting the control input to zero and + calling the `step` method with zero steps. + """ + + self.set_input(np.zeros(self.robot_model.NDoF)) + self.step(0) + self.t = 0 + self.iter = 0 \ No newline at end of file diff --git a/src/comodo/mujocoSimulator/visualiser.py b/src/comodo/mujocoSimulator/visualiser.py new file mode 100644 index 0000000..4d89280 --- /dev/null +++ b/src/comodo/mujocoSimulator/visualiser.py @@ -0,0 +1,48 @@ +import cv2 +import mujoco as mj +import mujoco_viewer as mjv +import numpy as np +import pathlib +from dataclasses import dataclass + +@dataclass +class RenderOptions: + width: int = 800 + height: int = 600 + fps: int = 30 + +class MujocoVideoRecorder: + def __init__( + self, + model: mj.MjModel, + data: mj.MjData, + options: RenderOptions, + ): + self.model: self.MjModel = model + self.data: mj.MjData = data + self.width: int = options.width + self.height: int = options.height + self.fps: int = options.fps + + self.viewer = mjv.MujocoViewer(self.model, self.data, 'offscreen') + self.frames = [] + + def reset(self): + self.frames = [] + + def record_frame(self): + img = self.viewer.read_pixels() + self.frames.append(img) + + def write_video(self, video_path: str | pathlib.Path): + print("0") + video_path = str(video_path) + writer = cv2.VideoWriter(video_path, cv2.VideoWriter_fourcc(*"mp4v"), self.fps, (self.width, self.height)) + + print("1") + for frame in self.frames: + frame = cv2.resize(frame,(self.width,self.height)) + writer.write(frame) + + print("2") + writer.release() \ No newline at end of file diff --git a/src/comodo/robotModel/robotModel.py b/src/comodo/robotModel/robotModel.py index 77cc373..123e366 100644 --- a/src/comodo/robotModel/robotModel.py +++ b/src/comodo/robotModel/robotModel.py @@ -1,8 +1,12 @@ from adam.casadi.computations import KinDynComputations import numpy as np +import io +import pathlib +from typing import Union from urchin import URDF from urchin import Joint from urchin import Link +from typing import Dict import mujoco import tempfile import xml.etree.ElementTree as ET @@ -16,35 +20,83 @@ class RobotModel(KinDynComputations): def __init__( self, - urdfstring: str, + urdf_path: Union[str, pathlib.Path, pathlib.PurePath, pathlib.PurePosixPath], robot_name: str, joint_name_list: list, - base_link: str = "root_link", - left_foot: str = "l_sole", - right_foot: str = "r_sole", - torso: str = "chest", - right_foot_rear_link_name: str = "r_foot_rear", - right_foot_front_link_name: str = "r_foot_front", - left_foot_rear_link_name: str = "l_foot_rear", - left_foot_front_link_name: str = "l_foot_front", + base_link: str = "root", + left_foot: str = None, + right_foot: str = None, + torso: str = None, + right_foot_rear_link_name: str = None, + right_foot_front_link_name: str = None, + left_foot_rear_link_name: str = None, + left_foot_front_link_name: str = None, kp_pos_control: np.float32 = np.array( - [35 * 70.0, 35 * 70.0, 35 * 40.0, 35 * 100.0, 35 * 100.0, 35 * 100.0,35 * 70.0, 35 * 70.0, 35 * 40.0, 35 * 100.0, 35 * 100.0, 35 * 100.0,20 * 5.745, 20 * 5.745, 20 * 5.745, 20 * 1.745,20 * 5.745, 20 * 5.745, 20 * 5.745, 20 * 1.745] + [ + 35 * 70.0, + 35 * 70.0, + 35 * 40.0, + 35 * 100.0, + 35 * 100.0, + 35 * 100.0, + 35 * 70.0, + 35 * 70.0, + 35 * 40.0, + 35 * 100.0, + 35 * 100.0, + 35 * 100.0, + 20 * 5.745, + 20 * 5.745, + 20 * 5.745, + 20 * 1.745, + 20 * 5.745, + 20 * 5.745, + 20 * 5.745, + 20 * 1.745, + ] + ), + kd_pos_control: np.float32 = np.array( + [ + 15 * 0.15, + 15 * 0.15, + 15 * 0.35, + 15 * 0.15, + 15 * 0.15, + 15 * 0.15, + 15 * 0.15, + 15 * 0.15, + 15 * 0.35, + 15 * 0.15, + 15 * 0.15, + 15 * 0.15, + 4 * 5.745, + 4 * 5.745, + 4 * 5.745, + 4 * 1.745, + 4 * 5.745, + 4 * 5.745, + 4 * 5.745, + 4 * 1.745, + ] ), - kd_pos_control: np.float32= np.array([15 * 0.15, 15 * 0.15, 15 * 0.35, 15 * 0.15, 15 * 0.15, 15 * 0.15,15 * 0.15, 15 * 0.15, 15 * 0.35, 15 * 0.15, 15 * 0.15, 15 * 0.15,4 * 5.745, 4 * 5.745, 4 * 5.745, 4 * 1.745,4 * 5.745, 4 * 5.745, 4 * 5.745, 4 * 1.745]) ) -> None: + valid_path_types = (str, pathlib.Path, pathlib.PurePath, pathlib.PurePosixPath) + if not isinstance(urdf_path, valid_path_types): + raise TypeError(f"urdf_path must be a string or a pathlib object, but got {type(urdf_path)}") + self.collision_keyword = "_collision" self.visual_keyword = "_visual" - self.urdf_string = urdfstring + self.urdf_path = str(urdf_path) self.robot_name = robot_name self.joint_name_list = joint_name_list self.base_link = base_link self.left_foot_frame = left_foot self.right_foot_frame = right_foot - self.torso_link = torso - self.right_foot_rear_ct = right_foot_rear_link_name + self.collision_keyword - self.right_foot_front_ct = right_foot_front_link_name + self.collision_keyword - self.left_foot_rear_ct = left_foot_rear_link_name + self.collision_keyword - self.left_foot_front_ct = left_foot_front_link_name + self.collision_keyword + # self.torso_link = torso + # self.right_foot_rear_ct = right_foot_rear_link_name + self.collision_keyword + # self.right_foot_front_ct = right_foot_front_link_name + self.collision_keyword + # self.left_foot_rear_ct = left_foot_rear_link_name + self.collision_keyword + # self.left_foot_front_ct = left_foot_front_link_name + self.collision_keyword self.remote_control_board_list = [ "/" + self.robot_name + "/torso", @@ -55,15 +107,22 @@ def __init__( ] self.kp_position_control = kp_pos_control - self.kd_position_control = kd_pos_control + self.kd_position_control = kd_pos_control self.ki_position_control = 10 * self.kd_position_control self.gravity = iDynTree.Vector3() self.gravity.zero() self.gravity.setVal(2, -9.81) self.H_b = iDynTree.Transform() - super().__init__(urdfstring, self.joint_name_list, self.base_link) - self.H_left_foot = self.forward_kinematics_fun(self.left_foot_frame) - self.H_right_foot = self.forward_kinematics_fun(self.right_foot_frame) + super().__init__(urdf_path, self.joint_name_list, self.base_link) + # self.H_left_foot = self.forward_kinematics_fun(self.left_foot_frame) + # self.H_right_foot = self.forward_kinematics_fun(self.right_foot_frame) + #print("....", self.forward_kinematics_fun("right_foot"), " -- ", type(self.forward_kinematics_fun("right_foot"))) + + def a(self, link_name: str): + try: + return self.forward_kinematics_fun(link_name) + except: + return None def override_control_boar_list(self, remote_control_board_list: list): self.remote_control_board_list = remote_control_board_list @@ -91,7 +150,7 @@ def set_limbs_indexes( def get_idyntree_kyndyn(self): model_loader = iDynTree.ModelLoader() model_loader.loadReducedModelFromString( - copy.deepcopy(self.urdf_string), self.joint_name_list + copy.deepcopy(self.urdf_path), self.joint_name_list ) kindyn = iDynTree.KinDynComputations() kindyn.loadRobotModel(model_loader.model()) @@ -128,11 +187,11 @@ def compute_desired_position_walking(self): for index, joint_name in enumerate(self.joint_name_list): if "knee" in joint_name: self.solver.subject_to(self.s[index] == desired_knee) - if "shoulder_roll" in joint_name: + if "shoulder_roll" in joint_name: self.solver.subject_to(self.s[index] == shoulder_roll) - if "elbow" in joint_name: - self.solver.subject_to(self.s[index] == elbow) - + if "elbow" in joint_name: + self.solver.subject_to(self.s[index] == elbow) + self.solver.subject_to(H_left_foot[2, 3] == 0.0) self.solver.subject_to(H_right_foot[2, 3] == 0.0) self.solver.subject_to(quat_left_foot == reference_rotation) @@ -183,15 +242,15 @@ def compute_base_pose_left_foot_in_contact(self, s): w_H_init = np.linalg.inv(w_H_lefFoot_num) @ w_H_torso_num return w_H_init - def compute_com_init(self): + def compute_com_init(self): com = self.CoM_position_fun() - return np.array(com(self.w_H_b_init,self.s_init)) + return np.array(com(self.w_H_b_init, self.s_init)) - def set_initial_position(self, s_init, w_H_b_init, xyz_rpy_init): + def set_initial_position(self, s_init, w_H_b_init, xyz_rpy_init): self.s_init = s_init self.w_H_b_init = w_H_b_init self.xyz_rpy_init = xyz_rpy_init - + def rotation_matrix_to_quaternion(self, R): # Ensure the matrix is a valid rotation matrix (orthogonal with determinant 1) trace = cs.trace(R) @@ -210,11 +269,11 @@ def rotation_matrix_to_quaternion(self, R): return cs.vertcat(w, x, y, z) - def get_mujoco_urdf_string(self): + def get_mujoco_urdf_string(self) -> str: ## We first start by ET - tempFileOut = tempfile.NamedTemporaryFile(mode="w+") - tempFileOut.write(copy.deepcopy(self.urdf_string)) - root = ET.fromstring(self.urdf_string) + parser = ET.XMLParser(encoding="utf-8") + tree = ET.parse(self.urdf_path, parser=parser) + root = tree.getroot() self.mujoco_joint_order = [] # Declaring as fixed the not controlled joints for joint in root.findall(".//joint"): @@ -276,12 +335,35 @@ def get_mujoco_urdf_string(self): mujoco_el.append(compiler_el) robot_el.append(mujoco_el) # Convert the XML tree to a string - robot_urdf_string_original = ET.tostring(root) + robot_urdf_string_original = ET.tostring(root, encoding="unicode") return robot_urdf_string_original - def get_mujoco_model(self): + def get_mujoco_model(self, floor_opts: Dict, save_mjc_xml: bool = False) -> mujoco.MjModel: + valid_floor_opts = ["inclination_deg", "sliding_friction", "torsional_friction", "rolling_friction"] + for key in floor_opts.keys(): + if key not in valid_floor_opts: + raise ValueError(f"Invalid key {key} in floor_opts. Valid keys are {valid_floor_opts}") + + floor_inclination_deg = floor_opts.get("inclination_deg", [0, 0, 0]) + sliding_friction = floor_opts.get("sliding_friction", 1) + torsional_friction = floor_opts.get("torsional_friction", 0.005) + rolling_friction = floor_opts.get("rolling_friction", 0.0001) + + # Type & value checking + try: + floor_inclination = np.array(floor_inclination_deg) + floor_inclination = np.deg2rad(floor_inclination) + except: + raise ValueError(f"floor's inclination_deg must be a sequence of 3 elements, but got {floor_inclination_deg} of type {type(floor_inclination_deg)}") + + for friction_coeff in ("sliding_friction", "torsional_friction", "rolling_friction"): + if not isinstance(eval(friction_coeff), (int, float)): + raise ValueError(f"{friction_coeff} must be a number (int, float), but got {type(eval(friction_coeff))}") + if not eval(f"0 <= {friction_coeff} <= 1"): + raise ValueError(f"{friction_coeff} must be in the range [0, 1], but got {eval(friction_coeff)}") + + # Get the URDF string urdf_string = self.get_mujoco_urdf_string() - mujoco_model = mujoco.MjModel.from_xml_string(urdf_string) path_temp_xml = tempfile.NamedTemporaryFile(mode="w+") mujoco.mj_saveLastXML(path_temp_xml.name, mujoco_model) @@ -376,12 +458,22 @@ def get_mujoco_model(self): break floor = ET.Element("geom") floor.set("name", "floor") - floor.set("size", "0 0 .05") + floor.set("size", "0 0 .04") floor.set("type", "plane") floor.set("material", "grid") floor.set("condim", "3") + floor.set("euler", "{} {} {}".format(*floor_inclination)) + floor.set("friction", "{} {} {}".format(sliding_friction, torsional_friction, rolling_friction)) + + #if world_elem.find(".//geom[@name='floor']") is not None: + # world_elem.remove(world_elem.find(".//geom[@name='floor']")) world_elem.append(floor) new_xml = ET.tostring(tree.getroot(), encoding="unicode") + + if save_mjc_xml: + with open("./mujoco_model.xml", "w") as f: + f.write(new_xml) + return new_xml def get_base_pose_from_contacts(self, s, contact_frames_pose: dict):