diff --git a/safe_control_gym/envs/__init__.py b/safe_control_gym/envs/__init__.py index 3a88f9419..7cf284c00 100644 --- a/safe_control_gym/envs/__init__.py +++ b/safe_control_gym/envs/__init__.py @@ -1,15 +1,14 @@ """Register environments.""" -from safe_control_gym.utils.registration import register +from gymnasium import register register( id="quadrotor", entry_point="safe_control_gym.envs.quadrotor:Quadrotor", - config_entry_point="safe_control_gym.envs:quadrotor.yaml", ) register( id="firmware", entry_point="safe_control_gym.envs.firmware_wrapper:FirmwareWrapper", - config_entry_point="safe_control_gym.controllers.firmware:firmware.yaml", + max_episode_steps=900, # 30 seconds * 30 Hz ) diff --git a/safe_control_gym/envs/base_aviary.py b/safe_control_gym/envs/base_aviary.py index 65f817c0b..4bcb6a568 100644 --- a/safe_control_gym/envs/base_aviary.py +++ b/safe_control_gym/envs/base_aviary.py @@ -5,15 +5,15 @@ The module also contains enumerations for drone models, PyBullet physics updates image types captured by PyBullet's camera rendering. - """ import os import time +from pathlib import Path +import logging import pkgutil import xml.etree.ElementTree as etxml -from datetime import datetime from enum import Enum import numpy as np import pybullet as p @@ -21,6 +21,7 @@ from safe_control_gym.envs.benchmark_env import BenchmarkEnv +logger = logging.getLogger(__name__) egl = pkgutil.get_loader("eglRenderer") @@ -43,15 +44,6 @@ class Physics(str, Enum): ) -class ImageType(int, Enum): - """Camera capture image type enumeration class.""" - - RGB = 0 # Red, green, blue (and alpha). - DEP = 1 # Depth. - SEG = 2 # Segmentation by object id. - BW = 3 # Black and white. - - class BaseAviary(BenchmarkEnv): """Base class for "drone aviary" Gym environments.""" @@ -61,7 +53,7 @@ class BaseAviary(BenchmarkEnv): def __init__( self, drone_model: DroneModel = DroneModel.CF2X, - num_drones: int = 1, + n_drones: int = 1, physics: Physics = Physics.PYB, record=False, gui=False, @@ -74,7 +66,7 @@ def __init__( Args: drone_model (DroneModel, optional): The desired drone type (detailed in an .urdf file in folder `assets`). - num_drones (int, optional): The desired number of drones in the aviary. + n_drones: The desired number of drones in the simulation. physics (Physics, optional): The desired implementation of PyBullet physics/custom dynamics. record (bool, optional): Whether to save a video of the simulation in folder @@ -83,6 +75,7 @@ def __init__( verbose (bool, optional): If to suppress environment print statetments. """ + self._recording_id = None # Constants. self.GRAVITY_ACC = 9.8 self.RAD2DEG = 180 / np.pi @@ -90,7 +83,7 @@ def __init__( # Parameters. self.DRONE_MODEL = DroneModel(drone_model) self.URDF_PATH = os.path.join(self.URDF_DIR, self.DRONE_MODEL.value + ".urdf") - self.NUM_DRONES = num_drones + self.n_drones = n_drones self.PHYSICS = Physics(physics) self.RECORD = record # Load the drone properties from the .urdf file. @@ -118,37 +111,6 @@ def __init__( self.MAX_PWM, ) = self._parse_urdf_parameters(self.URDF_PATH) self.GROUND_PLANE_Z = 0 # -0.05 - if verbose: - print( - "[INFO] BaseAviary.__init__() loaded parameters from the drone's .urdf: \ - \n[INFO] m {:f}, L {:f},\n[INFO] ixx {:f}, iyy {:f}, izz {:f}, \ - \n[INFO] kf {:f}, km {:f},\n[INFO] t2w {:f}, max_speed_kmh {:f}, \ - \n[INFO] gnd_eff_coeff {:f}, prop_radius {:f}, \ - \n[INFO] drag_xy_coeff {:f}, drag_z_coeff {:f}, \ - \n[INFO] dw_coeff_1 {:f}, dw_coeff_2 {:f}, dw_coeff_3 {:f} \ - \n[INFO] pwm2rpm_scale {:f}, pwm2rpm_const {:f}, min_pwm {:f}, max_pwm {:f}".format( - self.MASS, - self.L, - self.J[0, 0], - self.J[1, 1], - self.J[2, 2], - self.KF, - self.KM, - self.THRUST2WEIGHT_RATIO, - self.MAX_SPEED_KMH, - self.GND_EFF_COEFF, - self.PROP_RADIUS, - self.DRAG_COEFF[0], - self.DRAG_COEFF[2], - self.DW_COEFF_1, - self.DW_COEFF_2, - self.DW_COEFF_3, - self.PWM2RPM_SCALE, - self.PWM2RPM_CONST, - self.MIN_PWM, - self.MAX_PWM, - ) - ) # Compute constants. self.GRAVITY = self.GRAVITY_ACC * self.MASS self.HOVER_RPM = np.sqrt(self.GRAVITY / (4 * self.KF)) @@ -162,7 +124,21 @@ def __init__( * np.sqrt((15 * self.MAX_RPM**2 * self.KF * self.GND_EFF_COEFF) / self.MAX_THRUST) ) # BenchmarkEnv constructor. - super().__init__(gui=gui, verbose=verbose, **kwargs) + super().__init__( + seed=kwargs["seed"], + gui=gui, + episode_len_sec=kwargs["episode_len_sec"], + sim_freq=kwargs["sim_freq"], + ctrl_freq=kwargs["ctrl_freq"], + init_state=kwargs["init_state"], + randomized_init=kwargs["randomized_init"], + init_state_randomization_info=kwargs["init_state_randomization_info"], + randomized_inertial_prop=kwargs["randomized_inertial_prop"], + inertial_prop_randomization_info=kwargs["inertial_prop_randomization_info"], + constraints=kwargs["constraints"], + disturbances=kwargs["disturbances"], + reseed=kwargs["reseed"], + ) # Connect to PyBullet. self.PYB_CLIENT = -1 if gui: @@ -191,7 +167,7 @@ def __init__( self.RENDER_WIDTH = int(640) self.RENDER_HEIGHT = int(480) self.FRAME_PER_SEC = 24 - self.CAPTURE_FREQ = int(self.PYB_FREQ / self.FRAME_PER_SEC) + self.CAPTURE_FREQ = int(self.sim_settings.sim_freq / self.FRAME_PER_SEC) self.CAM_VIEW = p.computeViewMatrixFromYawPitchRoll( distance=3, yaw=-30, @@ -209,20 +185,20 @@ def __init__( self.INIT_XYZS = ( np.vstack( [ - np.array([x * 4 * self.L for x in range(self.NUM_DRONES)]), - np.array([y * 4 * self.L for y in range(self.NUM_DRONES)]), - np.ones(self.NUM_DRONES) * (self.COLLISION_H / 2 - self.COLLISION_Z_OFFSET), + np.array([x * 4 * self.L for x in range(self.n_drones)]), + np.array([y * 4 * self.L for y in range(self.n_drones)]), + np.ones(self.n_drones) * (self.COLLISION_H / 2 - self.COLLISION_Z_OFFSET), ] ) .transpose() - .reshape(self.NUM_DRONES, 3) + .reshape(self.n_drones, 3) ) - self.INIT_RPYS = np.zeros((self.NUM_DRONES, 3)) + self.INIT_RPYS = np.zeros((self.n_drones, 3)) def close(self): """Terminates the environment.""" - if self.RECORD and self.GUI: - p.stopStateLogging(self.VIDEO_ID, physicsClientId=self.PYB_CLIENT) + if self._recording_id is not None: + p.stopStateLogging(self._recording_id, physicsClientId=self.PYB_CLIENT) if self.PYB_CLIENT >= 0: p.disconnect(physicsClientId=self.PYB_CLIENT) self.PYB_CLIENT = -1 @@ -237,27 +213,27 @@ def _reset_simulation(self): # Initialize/reset counters and zero-valued variables. self.RESET_TIME = time.time() self.first_render_call = True - self.X_AX = -1 * np.ones(self.NUM_DRONES) - self.Y_AX = -1 * np.ones(self.NUM_DRONES) - self.Z_AX = -1 * np.ones(self.NUM_DRONES) - self.GUI_INPUT_TEXT = -1 * np.ones(self.NUM_DRONES) + self.X_AX = -1 * np.ones(self.n_drones) + self.Y_AX = -1 * np.ones(self.n_drones) + self.Z_AX = -1 * np.ones(self.n_drones) + self.GUI_INPUT_TEXT = -1 * np.ones(self.n_drones) self.USE_GUI_RPM = False self.last_input_switch = 0 - self.last_clipped_action = np.zeros((self.NUM_DRONES, 4)) + self.last_clipped_action = np.zeros((self.n_drones, 4)) self.gui_input = np.zeros(4) # Initialize the drones kinematic information. - self.pos = np.zeros((self.NUM_DRONES, 3)) - self.quat = np.zeros((self.NUM_DRONES, 4)) - self.rpy = np.zeros((self.NUM_DRONES, 3)) - self.vel = np.zeros((self.NUM_DRONES, 3)) - self.ang_v = np.zeros((self.NUM_DRONES, 3)) + self.pos = np.zeros((self.n_drones, 3)) + self.quat = np.zeros((self.n_drones, 4)) + self.rpy = np.zeros((self.n_drones, 3)) + self.vel = np.zeros((self.n_drones, 3)) + self.ang_v = np.zeros((self.n_drones, 3)) if self.PHYSICS == Physics.DYN: - self.rpy_rates = np.zeros((self.NUM_DRONES, 3)) + self.rpy_rates = np.zeros((self.n_drones, 3)) # Set PyBullet's parameters. p.resetSimulation(physicsClientId=self.PYB_CLIENT) p.setGravity(0, 0, -self.GRAVITY_ACC, physicsClientId=self.PYB_CLIENT) p.setRealTimeSimulation(0, physicsClientId=self.PYB_CLIENT) - p.setTimeStep(self.PYB_TIMESTEP, physicsClientId=self.PYB_CLIENT) + p.setTimeStep(1 / self.sim_settings.sim_freq, physicsClientId=self.PYB_CLIENT) p.setAdditionalSearchPath(pybullet_data.getDataPath(), physicsClientId=self.PYB_CLIENT) # Load ground plane, drone and obstacles models. self.PLANE_ID = p.loadURDF( @@ -272,22 +248,14 @@ def _reset_simulation(self): flags=p.URDF_USE_INERTIA_FROM_FILE, # Use URDF inertia tensor. physicsClientId=self.PYB_CLIENT, ) - for i in range(self.NUM_DRONES) + for i in range(self.n_drones) ] ) - # for i in range(self.NUM_DRONES): - # p.changeDynamics(self.DRONE_IDS[i], -1, linearDamping=0, angularDamping=0) # Remove default damping. - for i in range(self.NUM_DRONES): + for i in range(self.n_drones): p.changeDynamics(self.DRONE_IDS[i], -1, linearDamping=0, angularDamping=0) # Update and store the drones kinematic information. self._update_and_store_kinematic_information() - # Start video recording. - self._start_video_recording() - # # Show frame of references of drones, will severly slow down the GUI. - # for i in range(self.NUM_DRONES): - # if gui: - # self._show_drone_local_axes(i) def _advance_simulation(self, clipped_action, disturbance_force=None): """Advances the environment by one simulation step. @@ -299,12 +267,12 @@ def _advance_simulation(self, clipped_action, disturbance_force=None): disturbance_force (ndarray, optional): Disturbance force, applied to all drones. """ - clipped_action = np.reshape(clipped_action, (self.NUM_DRONES, 4)) + clipped_action = np.reshape(clipped_action, (self.n_drones, 4)) # Repeat for as many as the aggregate physics steps. - for _ in range(self.PYB_STEPS_PER_CTRL): + for _ in range(self.sim_settings.sim_freq // self.sim_settings.ctrl_freq): # Update and store the drones kinematic info for certain # Between aggregate steps for certain types of update. - if self.PYB_STEPS_PER_CTRL > 1 and self.PHYSICS in [ + if self.sim_settings.sim_freq // self.sim_settings.ctrl_freq > 1 and self.PHYSICS in [ Physics.DYN, Physics.PYB_GND, Physics.PYB_DRAG, @@ -313,7 +281,7 @@ def _advance_simulation(self, clipped_action, disturbance_force=None): ]: self._update_and_store_kinematic_information() # Step the simulation using the desired physics update. - for i in range(self.NUM_DRONES): + for i in range(self.n_drones): if self.PHYSICS == Physics.PYB: self._physics(clipped_action[i, :], i) elif self.PHYSICS == Physics.DYN: @@ -359,39 +327,6 @@ def render(self, mode="human", close=False): close (bool, optional): Unused. """ - if self.first_render_call and not self.GUI: - print( - "[WARNING] BaseAviary.render() is implemented as text-only, re-initialize the environment using Aviary(gui=True) to use PyBullet's graphical interface" - ) - self.first_render_call = False - if self.VERBOSE: - print( - "\n[INFO] BaseAviary.render() ——— it {:04d}".format(self.pyb_step_counter), - "——— wall-clock time {:.1f}s,".format(time.time() - self.RESET_TIME), - "simulation time {:.1f}s@{:d}Hz ({:.2f}x)".format( - self.pyb_step_counter * self.TIMESTEP, - self.SIM_FREQ, - (self.pyb_step_counter * self.TIMESTEP) / (time.time() - self.RESET_TIME), - ), - ) - for i in range(self.NUM_DRONES): - print( - "[INFO] BaseAviary.render() ——— drone {:d}".format(i), - "——— x {:+06.2f}, y {:+06.2f}, z {:+06.2f}".format( - self.pos[i, 0], self.pos[i, 1], self.pos[i, 2] - ), - "——— velocity {:+06.2f}, {:+06.2f}, {:+06.2f}".format( - self.vel[i, 0], self.vel[i, 1], self.vel[i, 2] - ), - "——— roll {:+06.2f}, pitch {:+06.2f}, yaw {:+06.2f}".format( - self.rpy[i, 0] * self.RAD2DEG, - self.rpy[i, 1] * self.RAD2DEG, - self.rpy[i, 2] * self.RAD2DEG, - ), - "——— angular velocity {:+06.4f}, {:+06.4f}, {:+06.4f} ——— ".format( - self.ang_v[i, 0], self.ang_v[i, 1], self.ang_v[i, 2] - ), - ) def _update_and_store_kinematic_information(self): """Updates and stores the drones kinematic information. @@ -400,7 +335,7 @@ def _update_and_store_kinematic_information(self): and improve performance (at the expense of memory). """ - for i in range(self.NUM_DRONES): + for i in range(self.n_drones): self.pos[i], self.quat[i] = p.getBasePositionAndOrientation( self.DRONE_IDS[i], physicsClientId=self.PYB_CLIENT ) @@ -409,29 +344,21 @@ def _update_and_store_kinematic_information(self): self.DRONE_IDS[i], physicsClientId=self.PYB_CLIENT ) - def _start_video_recording(self): - """Starts the recording of a video output. + def record(self, path: Path): + """Start the recording of a video output. The format of the video output is .mp4, if GUI is True, or .png, otherwise. The video is saved under folder `files/videos`. """ - if self.RECORD and self.GUI: - self.VIDEO_ID = p.startStateLogging( - loggingType=p.STATE_LOGGING_VIDEO_MP4, - fileName=os.path.join( - self.output_dir, - "videos/video-{}.mp4".format(datetime.now().strftime("%m.%d.%Y_%H.%M.%S")), - ), - physicsClientId=self.PYB_CLIENT, - ) - if self.RECORD and not self.GUI: - self.FRAME_NUM = 0 - self.IMG_PATH = os.path.join( - self.output_dir, - "quadrotor_videos/video-{}/".format(datetime.now().strftime("%m.%d.%Y_%H.%M.%S")), - ) - os.makedirs(os.path.dirname(self.IMG_PATH), exist_ok=True) + if not self.sim_settings.gui: + logger.warning("Cannot record video without GUI.") + return + self._recording_id = p.startStateLogging( + loggingType=p.STATE_LOGGING_VIDEO_MP4, + fileName=str(path.absolute()), + physicsClientId=self.PYB_CLIENT, + ) def _get_drone_state_vector(self, nth_drone): """Returns the state vector of the n-th drone. @@ -499,19 +426,15 @@ def _ground_effect(self, rpm, nth_drone): """ # Kin. info of all links (propellers and center of mass) - link_states = np.array( - p.getLinkStates( - self.DRONE_IDS[nth_drone], - linkIndices=[0, 1, 2, 3, 4], - computeLinkVelocity=1, - computeForwardKinematics=1, - physicsClientId=self.PYB_CLIENT, - ) + s = p.getLinkStates( + self.DRONE_IDS[nth_drone], + linkIndices=[0, 1, 2, 3], + computeLinkVelocity=1, + computeForwardKinematics=1, + physicsClientId=self.PYB_CLIENT, ) + prop_heights = np.array([s[0][0][2], s[1][0][2], s[2][0][2], s[3][0][2]]) # Simple, per-propeller ground effects. - prop_heights = np.array( - [link_states[0, 0][2], link_states[1, 0][2], link_states[2, 0][2], link_states[3, 0][2]] - ) prop_heights = np.clip(prop_heights, self.GND_EFF_H_CLIP, np.inf) gnd_effects = ( np.array(rpm**2) @@ -568,7 +491,7 @@ def _downwash(self, nth_drone): nth_drone (int): The ordinal number/position of the desired drone in list self.DRONE_IDS. """ - for i in range(self.NUM_DRONES): + for i in range(self.n_drones): delta_z = self.pos[i, 2] - self.pos[nth_drone, 2] delta_xy = np.linalg.norm( np.array(self.pos[i, 0:2]) - np.array(self.pos[nth_drone, 0:2]) @@ -622,10 +545,10 @@ def _dynamics(self, rpm, nth_drone): rpy_rates_deriv = np.dot(self.J_INV, torques) no_pybullet_dyn_accs = force_world_frame / self.MASS # Update state. - vel = vel + self.TIMESTEP * no_pybullet_dyn_accs - rpy_rates = rpy_rates + self.TIMESTEP * rpy_rates_deriv - pos = pos + self.TIMESTEP * vel - rpy = rpy + self.TIMESTEP * rpy_rates + vel = vel + no_pybullet_dyn_accs / self.sim_settings.sim_freq + rpy_rates = rpy_rates + rpy_rates_deriv / self.sim_settings.sim_freq + pos = pos + vel / self.sim_settings.sim_freq + rpy = rpy + rpy_rates / self.sim_settings.sim_freq # Set PyBullet's state. p.resetBasePositionAndOrientation( self.DRONE_IDS[nth_drone], @@ -643,49 +566,11 @@ def _dynamics(self, rpm, nth_drone): # Store the roll, pitch, yaw rates for the next step # self.rpy_rates[nth_drone, :] = rpy_rates - def _show_drone_local_axes(self, nth_drone): - """Draws the local frame of the n-th drone in PyBullet's GUI. - - Args: - nth_drone (int): The ordinal number/position of the desired drone in list self.DRONE_IDS. - - """ - if self.GUI: - AXIS_LENGTH = 2 * self.L - self.X_AX[nth_drone] = p.addUserDebugLine( - lineFromXYZ=[0, 0, 0], - lineToXYZ=[AXIS_LENGTH, 0, 0], - lineColorRGB=[1, 0, 0], - parentObjectUniqueId=self.DRONE_IDS[nth_drone], - parentLinkIndex=-1, - replaceItemUniqueId=int(self.X_AX[nth_drone]), - physicsClientId=self.PYB_CLIENT, - ) - self.Y_AX[nth_drone] = p.addUserDebugLine( - lineFromXYZ=[0, 0, 0], - lineToXYZ=[0, AXIS_LENGTH, 0], - lineColorRGB=[0, 1, 0], - parentObjectUniqueId=self.DRONE_IDS[nth_drone], - parentLinkIndex=-1, - replaceItemUniqueId=int(self.Y_AX[nth_drone]), - physicsClientId=self.PYB_CLIENT, - ) - self.Z_AX[nth_drone] = p.addUserDebugLine( - lineFromXYZ=[0, 0, 0], - lineToXYZ=[0, 0, AXIS_LENGTH], - lineColorRGB=[0, 0, 1], - parentObjectUniqueId=self.DRONE_IDS[nth_drone], - parentLinkIndex=-1, - replaceItemUniqueId=int(self.Z_AX[nth_drone]), - physicsClientId=self.PYB_CLIENT, - ) - def _parse_urdf_parameters(self, file_name): """Loads parameters from an URDF file. This method is nothing more than a custom XML parser for the .urdf files in folder `assets/`. - """ URDF_TREE = etxml.parse(file_name).getroot() M = float(URDF_TREE[1][0][1].attrib["value"]) diff --git a/safe_control_gym/envs/benchmark_env.py b/safe_control_gym/envs/benchmark_env.py index 07d27c084..48bc25c67 100644 --- a/safe_control_gym/envs/benchmark_env.py +++ b/safe_control_gym/envs/benchmark_env.py @@ -4,213 +4,188 @@ """ -import os -from enum import Enum +from __future__ import annotations + +from dataclasses import dataclass import numpy as np -import gym -from gym.utils import seeding +import gymnasium +from gymnasium.utils import seeding import copy +import math +from safe_control_gym.envs.constraints import GENERAL_CONSTRAINTS from safe_control_gym.envs.constraints import create_constraint_list from safe_control_gym.envs.disturbances import create_disturbance_list -class Cost(str, Enum): - """Reward/cost functions enumeration class.""" - - RL_REWARD = "rl_reward" # Default RL reward function. - QUADRATIC = "quadratic" # Quadratic cost. - COMPETITION = "competition" # IROS 2022 competition sparse reward. - - -class Task(str, Enum): - """Environment tasks enumeration class.""" - - STABILIZATION = "stabilization" # Stabilization task. - TRAJ_TRACKING = "traj_tracking" # Trajectory tracking task. +@dataclass +class SimSettings: + """Simulation settings dataclass.""" + sim_freq: int = 500 + ctrl_freq: int = 500 + gui: bool = False -class BenchmarkEnv(gym.Env): - """Benchmark environment base class. + def __post_init__(self): + assert self.sim_freq % self.ctrl_freq == 0, "sim_freq must be divisible by ctrl_freq." - Attributes: - id (int): unique identifier of the current env instance (among other instances). - """ - - _count = 0 # Class variable, count env instance in current process. - NAME = "base" # Environment name. - URDF_PATH = None # Path to urdf file that defines base parameters of the robot. - AVAILABLE_CONSTRAINTS = None # Dict of constraint names & classes. - DISTURBANCE_MODES = ( - None # Dict of disturbance mode names & shared args, e.g. dim of the affected variable. - ) - INERTIAL_PROP_RAND_INFO = None # Dict of parameters & distributions for domain randomization. - INIT_STATE_RAND_INFO = ( - None # Dict of state name & distribution info to randomize at episode reset - ) - TASK_INFO = None # Dict of task related info, e.g. goal state or trajectory args. +class BenchmarkEnv(gymnasium.Env): + """Benchmark environment base class.""" def __init__( self, - output_dir=None, - seed=None, - info_in_reset: bool = False, gui: bool = False, - verbose: bool = False, - normalized_rl_action_space: bool = False, - # Task. - task: Task = Task.STABILIZATION, - task_info=None, - cost: Cost = Cost.RL_REWARD, - pyb_freq: int = 50, - ctrl_freq: int = 50, + sim_freq: int = 500, + ctrl_freq: int = 500, episode_len_sec: int = 5, - # Initialization. init_state=None, randomized_init: bool = True, init_state_randomization_info=None, - # Domain randomization. - prior_prop=None, inertial_prop=None, randomized_inertial_prop: bool = False, inertial_prop_randomization_info=None, - # Constraint. constraints=None, - done_on_violation: bool = False, - use_constraint_penalty=False, - constraint_penalty=-1, - # Disturbance. disturbances=None, - adversary_disturbance=None, - adversary_disturbance_offset=0.0, - adversary_disturbance_scale=0.01, - **kwargs, + seed: int | None = None, + reseed: bool = False, ): """Initialization method for BenchmarkEnv. Args: - output_dir (str, optional): path to directory to save any env outputs. - seed (int, optional): Seed for the random number generator. - info_in_reset (bool, optional): Whether .reset() returns a dictionary with the - environment's symbolic model. - gui (bool, optional): Whether to show PyBullet's GUI. - verbose (bool, optional): If to suppress environment print statetments. - normalized_rl_action_space (bool, optional): Whether to normalize the action space. - task: (Task, optional): The environment's task (stabilization or traj. tracking). - task_info (dict, optional): A dictionary with the information used to generate the - task X and U references. - cost (Cost, optional): Cost function choice used to compute the reward in .step(). - pyb_freq (int, optional): The frequency at which PyBullet steps (a multiple of ctrl_freq). - ctrl_freq (int, optional): The frequency at which the environment steps. - episode_len_sec (int, optional): Maximum episode duration in seconds. - init_state (ndarray/dict, optional): The initial state of the environment - randomized_init (bool, optional): Whether to randomize the initial state. - init_state_randomization_info (dict, optional): A dictionary with information used to - randomize the initial state. - prior_prop (dict, optional): The prior inertial properties of the environment. - inertial_prop (dict, optional): The ground truth inertial properties of the environment. - randomized_inertial_prop (bool, optional): Whether to randomize the inertial properties. - inertial_prop_randomization_info (dict, optional): A dictionary with information used - to randomize the inert. properties. - constraints (Dict, optional): Dictionary to specify the constraints being used. - done_on_violation (bool, optional): Whether to return done==True on a constraint violation. - use_constraint_penalty (bool, optional): if to use shaped reward to penalize potential - constraint violation. - constraint_penalty (float, optional): constraint penalty cost for reward shaping. - disturbances (dict, optional): Dictionary to specify disturbances being used. - adversary_disturbance (str, optional): if to use adversary/external disturbance. - adversary_disturbance_offset (float, optional): parameterizes the offset of the adversary disturbance. - adversary_disturbance_scale (float, optional): parameterizes magnitude of adversary disturbance. - - Attributes: - id (int): unique identifier of the current env instance (among other instances). - + gui: Option to show PyBullet's GUI. + sim_freq: The frequency at which PyBullet steps (a multiple of ctrl_freq). + ctrl_freq: The frequency at which the environment steps. + init_state: The initial state of the environment + randomized_init: Flag to randomize the initial state. + init_state_randomization_info: A dictionary with information used to randomize the + initial state. + inertial_prop: The ground truth inertial properties of the environment. + randomized_inertial_prop: Flag to randomize the inertial properties. + inertial_prop_randomization_info: A dictionary with information used to randomize the + inert. properties. + constraints: Dictionary to specify the constraints being used. + disturbances: Dictionary to specify disturbances being used. + seed: Seed for the random number generator. + reseed: Option to reseed the environment on each reset with the same seed. """ - # Assign unique ID based on env instance count. - self.id = self.__class__._count - self.__class__._count += 1 - # Directory to save any env output. - if output_dir is None: - output_dir = os.getcwd() - self.output_dir = output_dir - self.GUI = gui - self.VERBOSE = verbose - # Task. - print(task) - self.TASK = Task(task) - if task_info is not None: - self.TASK_INFO = task_info - # Set timing constants. - self.CTRL_FREQ = ctrl_freq - self.PYB_FREQ = pyb_freq - if self.PYB_FREQ % self.CTRL_FREQ != 0: - raise ValueError( - "[ERROR] in BenchmarkEnv.__init__(), pyb_freq is not divisible by env_freq." - ) - self.PYB_STEPS_PER_CTRL = int(self.PYB_FREQ / self.CTRL_FREQ) - self.CTRL_TIMESTEP = 1.0 / self.CTRL_FREQ - self.PYB_TIMESTEP = 1.0 / self.PYB_FREQ - # Maximum episode length in seconds. + self.sim_settings = SimSettings(sim_freq, ctrl_freq, gui) + + self._init_state = init_state self.EPISODE_LEN_SEC = episode_len_sec - self.CTRL_STEPS = self.EPISODE_LEN_SEC * self.CTRL_FREQ - # Initialization of state. - self.INIT_STATE = init_state - self.RANDOMIZED_INIT = randomized_init - if init_state_randomization_info is not None: - self.INIT_STATE_RAND_INFO = init_state_randomization_info - # Domain randomization on parameters. - self.PRIOR_PROP = prior_prop - self.INERTIAL_PROP = inertial_prop - self.RANDOMIZED_INERTIAL_PROP = randomized_inertial_prop - if inertial_prop_randomization_info is not None: - self.INERTIAL_PROP_RAND_INFO = inertial_prop_randomization_info - # Set up action and observation space. - self.NORMALIZED_RL_ACTION_SPACE = normalized_rl_action_space - # Define cost-related quantities. - self.COST = Cost(cost) + + self._init_state = init_state + self._init_state_randomization = init_state_randomization_info + self._enable_init_state_randomization = randomized_init + + self._inertial = inertial_prop + self._inertial_randomization = inertial_prop_randomization_info + self._enable_inertial_randomization = randomized_inertial_prop + # Create action and observation spaces. self._set_action_space() self._set_observation_space() + self.state_space = self.observation_space # TODO: Remove this, not always correct # Store action (input) and observation spaces dimensions. - # if observation is not the same as state, env should also have a `state_space` - # and `state_dim` is queried from it. - self.action_dim = self.action_space.shape[0] - self.obs_dim = self.observation_space.shape[0] - if hasattr(self, "state_space"): - self.state_dim = self.state_space.shape[0] - else: - self.state_dim = self.obs_dim - # Default Q and R matrices for quadratic cost. - if self.COST == Cost.QUADRATIC or self.COST == Cost.COMPETITION: - self.Q = np.eye(self.observation_space.shape[0]) - self.R = np.eye(self.action_space.shape[0]) + self.Q = np.eye(self.observation_space.shape[0]) + self.R = np.eye(self.action_space.shape[0]) # Set constraint info. - self.CONSTRAINTS = constraints - self.DONE_ON_VIOLATION = done_on_violation - self.use_constraint_penalty = use_constraint_penalty - self.constraint_penalty = constraint_penalty - self._setup_constraints() + self.constraints = None + if constraints is not None: + self.constraints = create_constraint_list(constraints, GENERAL_CONSTRAINTS, self) # Set disturbance info. - self.DISTURBANCES = disturbances - self.adversary_disturbance = adversary_disturbance - self.adversary_disturbance_offset = adversary_disturbance_offset - self.adversary_disturbance_scale = adversary_disturbance_scale - self._setup_disturbances() + self._disturbance_config = disturbances + self.disturbances = self._setup_disturbances(disturbances) # Default seed None means pure randomness/no seeding. self.seed(seed) - self.initial_reset = False - self.INFO_IN_RESET = info_in_reset - # IROS 2022 - Save random seed for re-seeding. - self.RND_SEED = seed - if "reseed_on_reset" in kwargs: - self.RESEED_ON_RESET = kwargs["reseed_on_reset"] - else: - self.RESEED_ON_RESET = False - - # IROS 2022 - Constrain violation flag for reward. - self.cnstr_violation = False + self._seed = seed + self._reseed_on_reset = reseed + + def _set_action_space(self): + a_low = self.KF * (self.PWM2RPM_SCALE * self.MIN_PWM + self.PWM2RPM_CONST) ** 2 + a_high = self.KF * (self.PWM2RPM_SCALE * self.MAX_PWM + self.PWM2RPM_CONST) ** 2 + self.action_space = gymnasium.spaces.Box(low=a_low, high=a_high, shape=(4,)) + + def _set_observation_space(self): + self.x_threshold = 5 + self.y_threshold = 5 + self.z_threshold = 2.5 + self.phi_threshold_radians = 85 * math.pi / 180 + self.theta_threshold_radians = 85 * math.pi / 180 + self.psi_threshold_radians = 180 * math.pi / 180 # Do not bound yaw. + + # Define obs/state bounds, labels and units. + # obs/state = {x, x_dot, y, y_dot, z, z_dot, phi, theta, psi, p, q, r}. + low = np.array( + [ + -self.x_threshold, + -np.finfo(np.float32).max, + -self.y_threshold, + -np.finfo(np.float32).max, + self.GROUND_PLANE_Z, + -np.finfo(np.float32).max, + -self.phi_threshold_radians, + -self.theta_threshold_radians, + -self.psi_threshold_radians, + -np.finfo(np.float32).max, + -np.finfo(np.float32).max, + -np.finfo(np.float32).max, + ], + np.float32, + ) + high = np.array( + [ + self.x_threshold, + np.finfo(np.float32).max, + self.y_threshold, + np.finfo(np.float32).max, + self.z_threshold, + np.finfo(np.float32).max, + self.phi_threshold_radians, + self.theta_threshold_radians, + self.psi_threshold_radians, + np.finfo(np.float32).max, + np.finfo(np.float32).max, + np.finfo(np.float32).max, + ], + np.float32, + ) + self.STATE_LABELS = [ + "x", + "x_dot", + "y", + "y_dot", + "z", + "z_dot", + "phi", + "theta", + "psi", + "p", + "q", + "r", + ] + self.STATE_UNITS = [ + "m", + "m/s", + "m", + "m/s", + "m", + "m/s", + "rad", + "rad", + "rad", + "rad/s", + "rad/s", + "rad/s", + ] + + # Define the state space for the dynamics. + self.state_space = gymnasium.spaces.Box(low=low, high=high, dtype=np.float32) + + # Define obs space exposed to the controller. + # Note how the obs space can differ from state space (i.e. augmented with the next reference states for RL) + self.observation_space = gymnasium.spaces.Box(low=low, high=high, dtype=np.float32) def seed(self, seed=None): """Sets up a random number generator for a given seed. @@ -227,13 +202,6 @@ def seed(self, seed=None): disturbs.seed(self) return [seed] - def _check_initial_reset(self): - """Makes sure that .reset() is called at least once before .step().""" - if not self.initial_reset: - raise RuntimeError( - "[ERROR] You must call env.reset() at least once before using env.step()." - ) - def _randomize_values_by_info(self, original_values, randomization_info): """Randomizes a list of values according to desired distributions. @@ -263,39 +231,33 @@ def _randomize_values_by_info(self, original_values, randomization_info): randomized_values[key] += distrib(*d_args, **d_kwargs) return randomized_values - def _setup_disturbances(self): + def _setup_disturbances(self, disturbances: dict | None = None): """Creates attributes and action spaces for the disturbances.""" # Default: no passive disturbances. - self.disturbances = {} - if self.DISTURBANCES is not None: - print(self.DISTURBANCES) - for mode, disturb_specs in self.DISTURBANCES.items(): - assert ( - mode in self.DISTURBANCE_MODES - ), "[ERROR] in BenchmarkEnv._setup_disturbances(), disturbance mode not available." - mode_shared_args = self.DISTURBANCE_MODES[mode] - self.disturbances[mode] = create_disturbance_list( - disturb_specs, mode_shared_args, self - ) - - def _setup_constraints(self): - """Creates a list of constraints as an attribute.""" - self.constraints = None - self.num_constraints = 0 - if self.CONSTRAINTS is not None: - self.constraints = create_constraint_list( - self.CONSTRAINTS, self.AVAILABLE_CONSTRAINTS, self - ) - self.num_constraints = self.constraints.num_constraints + if disturbances is None: + return + dist = {} + modes = { + "observation": {"dim": self.observation_space.shape[0]}, + "action": {"dim": self.action_space.shape[0]}, + "dynamics": {"dim": 3}, + } + for mode, spec in disturbances.items(): + assert mode in modes, "Disturbance mode not available." + dist[mode] = create_disturbance_list(spec, modes[mode], self) + return dist + + @property + def n_constraints(self): + return 0 if self.constraints is None else self.constraints.num_constraints def before_reset(self): """Pre-processing before calling `.reset()`.""" # IROS 2022 - Re-seed on reset. - if self.RESEED_ON_RESET: - self.seed(self.RND_SEED) + if self._reseed_on_reset: + self.seed(self._seed) # Housekeeping variables. - self.initial_reset = True self.pyb_step_counter = 0 self.ctrl_step_counter = 0 self.current_raw_input_action = None @@ -303,8 +265,6 @@ def before_reset(self): # Reset the disturbances. for mode in self.disturbances.keys(): self.disturbances[mode].reset(self) - if self.adversary_disturbance is not None: - self.adv_action = None def after_reset(self, obs, info): """Post-processing after calling `.reset()`.""" @@ -315,8 +275,6 @@ def after_reset(self, obs, info): def before_step(self, action): """Pre-processing before calling `.step()`.""" - # Sanity check (reset at least once). - self._check_initial_reset() # Save the raw input action. self.current_raw_input_action = action # Pre-process/clip the action @@ -326,7 +284,7 @@ def before_step(self, action): def after_step(self, obs, rew, done, info): """Post-processing after calling `.step()`.""" # Increment counters - self.pyb_step_counter += self.PYB_STEPS_PER_CTRL + self.pyb_step_counter += self.sim_settings.sim_freq // self.sim_settings.ctrl_freq self.ctrl_step_counter += 1 # Terminate when (any) constraint is violated. @@ -337,31 +295,10 @@ def after_step(self, obs, rew, done, info): if self.constraints is not None: c_value = self.constraints.get_values(self) info["constraint_values"] = c_value - if self.constraints.is_violated(self, c_value=c_value): - # IROS 2022 - Constrain violation flag for reward. - self.cnstr_violation = True - - info["constraint_violation"] = 1 - if self.DONE_ON_VIOLATION: - done = True - else: - # IROS 2022 - Constrain violation flag for reward. - self.cnstr_violation = False - - info["constraint_violation"] = 0 - - # Apply penalized reward when close to constraint violation - if self.COST == Cost.RL_REWARD: - if ( - self.constraints is not None - and self.use_constraint_penalty - and self.constraints.is_almost_active(self, c_value=c_value) - ): - rew += self.constraint_penalty - + info["constraint_violation"] = self.constraints.is_violated(self, c_value=c_value) # Terminate when reaching time limit, # but distinguish between done due to true termination or time limit reached - if self.ctrl_step_counter >= self.CTRL_STEPS: + if self.ctrl_step_counter >= self.EPISODE_LEN_SEC * self.sim_settings.ctrl_freq: info["TimeLimit.truncated"] = not done done = True return obs, rew, done, info diff --git a/safe_control_gym/envs/constants.py b/safe_control_gym/envs/constants.py index 38961041a..94cb6213a 100644 --- a/safe_control_gym/envs/constants.py +++ b/safe_control_gym/envs/constants.py @@ -12,8 +12,6 @@ class SimConstants: ... class DroneConstants: firmware_freq: int = 500 # Firmware frequency in Hz supply_voltage: float = 3.0 # Power supply voltage - min_pwm: int = 20000 # Minimum PWM signal - max_pwm: int = 65535 # Maximum PWM signal thrust_curve_a: float = -0.0006239 # Thrust curve parameters for brushed motors thrust_curve_b: float = 0.088 # Thrust curve parameters for brushed motors tumble_threshold: float = -0.5 # Vertical acceleration threshold for tumbling detection @@ -21,6 +19,3 @@ class DroneConstants: # TODO: acc and gyro were swapped in original implementation. Possible bug? acc_lpf_cutoff: int = 80 # Low-pass filter cutoff freq gyro_lpf_cutoff: int = 30 # Low-pass filter cutoff freq - KF: float = 3.16e-10 # Motor force factor - pwm2rpm_scale: float = 0.2685 # mapping factor from PWM to RPM - pwm2rpm_const: float = 4070.3 # mapping constant from PWM to RPM diff --git a/safe_control_gym/envs/constraints.py b/safe_control_gym/envs/constraints.py index 5781f5c3a..170cf2760 100644 --- a/safe_control_gym/envs/constraints.py +++ b/safe_control_gym/envs/constraints.py @@ -58,11 +58,11 @@ def __init__( self.rounding = rounding self.constrained_variable = ConstrainedVariableType(constrained_variable) if self.constrained_variable == ConstrainedVariableType.STATE: - self.dim = env.state_dim + self.dim = env.state_space.shape[0] elif self.constrained_variable == ConstrainedVariableType.INPUT: - self.dim = env.action_dim + self.dim = env.action_space.shape[0] elif self.constrained_variable == ConstrainedVariableType.INPUT_AND_STATE: - self.dim = env.state_dim + env.action_dim + self.dim = env.state_space.shape[0] + env.action_space.shape[0] else: raise NotImplementedError( "[ERROR] invalid constrained_variable (use STATE, INPUT or INPUT_AND_STATE)." diff --git a/safe_control_gym/envs/disturbances.py b/safe_control_gym/envs/disturbances.py index 27be3f170..79f0ffb17 100644 --- a/safe_control_gym/envs/disturbances.py +++ b/safe_control_gym/envs/disturbances.py @@ -1,94 +1,58 @@ -"""Disturbances. +"""Disturbances.""" -""" -from enum import Enum import numpy as np class Disturbance: - """Base class for disturbance or noise applied to inputs or dyanmics. + """Base class for disturbance or noise applied to inputs or dyanmics.""" - """ - - def __init__(self, - env, - dim, - mask=None, - **kwargs - ): + def __init__(self, env, dim, mask=None, **kwargs): self.dim = dim self.mask = mask if mask is not None: self.mask = np.asarray(mask) assert self.dim == len(self.mask) - def reset(self, - env - ): + def reset(self, env): pass - def apply(self, - target, - env - ): - """Default is identity. - - """ + def apply(self, target, env): + """Default is identity.""" return target - + def seed(self, env): - """Reset seed from env. - - """ + """Reset seed from env.""" self.np_random = env.np_random - class DisturbanceList: - """Combine list of disturbances as one. - - """ + """Combine list of disturbances as one.""" - def __init__(self, - disturbances - ): - """Initialization of the list of disturbances. - - """ + def __init__(self, disturbances): + """Initialization of the list of disturbances.""" self.disturbances = disturbances - def reset(self, - env - ): - """Sequentially reset disturbances. - - """ + def reset(self, env): + """Sequentially reset disturbances.""" for disturb in self.disturbances: disturb.reset(env) - def apply(self, - target, - env - ): - """Sequentially apply disturbances. - - """ + def apply(self, target, env): + """Sequentially apply disturbances.""" disturbed = target for disturb in self.disturbances: disturbed = disturb.apply(disturbed, env) return disturbed - + def seed(self, env): - """Reset seed from env. - - """ + """Reset seed from env.""" for disturb in self.disturbances: disturb.seed(env) class ImpulseDisturbance(Disturbance): """Impulse applied during a short time interval. - + Examples: * single step, square (duration=1, decay_rate=1): ______|-|_______ * multiple step, square (duration>1, decay_rate=1): ______|-----|_____ @@ -96,16 +60,9 @@ class ImpulseDisturbance(Disturbance): """ - def __init__(self, - env, - dim, - mask=None, - magnitude=1, - step_offset=None, - duration=1, - decay_rate=1, - **kwargs - ): + def __init__( + self, env, dim, mask=None, magnitude=1, step_offset=None, duration=1, decay_rate=1, **kwargs + ): super().__init__(env, dim, mask) self.magnitude = magnitude self.step_offset = step_offset @@ -116,19 +73,14 @@ def __init__(self, self.duration = duration self.decay_rate = decay_rate - def reset(self, - env - ): + def reset(self, env): if self.step_offset is None: self.current_step_offset = self.np_random.randint(self.max_step) else: self.current_step_offset = self.step_offset self.current_peak_step = int(self.current_step_offset + self.duration / 2) - def apply(self, - target, - env - ): + def apply(self, target, env): noise = 0 if env.ctrl_step_counter >= self.current_step_offset: peak_offset = np.abs(env.ctrl_step_counter - self.current_peak_step) @@ -145,36 +97,24 @@ def apply(self, class StepDisturbance(Disturbance): """Constant disturbance at all time steps (but after offset). - + Applied after offset step (randomized or given): _______|--------- """ - def __init__(self, - env, - dim, - mask=None, - magnitude=1, - step_offset=None, - **kwargs - ): + def __init__(self, env, dim, mask=None, magnitude=1, step_offset=None, **kwargs): super().__init__(env, dim, mask) self.magnitude = magnitude self.step_offset = step_offset self.max_step = int(env.EPISODE_LEN_SEC / env.CTRL_TIMESTEP) - def reset(self, - env - ): + def reset(self, env): if self.step_offset is None: self.current_step_offset = self.np_random.randint(self.max_step) else: self.current_step_offset = self.step_offset - def apply(self, - target, - env - ): + def apply(self, target, env): noise = 0 if env.ctrl_step_counter >= self.current_step_offset: noise = self.magnitude @@ -183,6 +123,7 @@ def apply(self, disturbed = target + noise return disturbed + class UniformNoise(Disturbance): """i.i.d uniform noise ~ U(low, high) per time step.""" @@ -195,14 +136,18 @@ def __init__(self, env, dim, mask=None, low=0.0, high=1.0, **kwargs): elif isinstance(low, list): self.low = np.asarray(low) else: - raise ValueError("[ERROR] UniformNoise.__init__(): low must be specified as a float or list.") + raise ValueError( + "[ERROR] UniformNoise.__init__(): low must be specified as a float or list." + ) if isinstance(high, float): self.high = np.asarray([high] * self.dim) elif isinstance(low, list): self.high = np.asarray(high) else: - raise ValueError("[ERROR] UniformNoise.__init__(): high must be specified as a float or list.") + raise ValueError( + "[ERROR] UniformNoise.__init__(): high must be specified as a float or list." + ) def apply(self, target, env): noise = self.np_random.uniform(self.low, self.high, size=self.dim) @@ -213,17 +158,9 @@ def apply(self, target, env): class WhiteNoise(Disturbance): - """I.i.d Gaussian noise per time step. + """I.i.d Gaussian noise per time step.""" - """ - - def __init__(self, - env, - dim, - mask=None, - std=1.0, - **kwargs - ): + def __init__(self, env, dim, mask=None, std=1.0, **kwargs): super().__init__(env, dim, mask) # I.i.d gaussian variance. if isinstance(std, float): @@ -231,15 +168,14 @@ def __init__(self, elif isinstance(std, list): self.std = np.asarray(std) else: - raise ValueError("[ERROR] WhiteNoise.__init__(): std must be specified as a float or list.") + raise ValueError( + "[ERROR] WhiteNoise.__init__(): std must be specified as a float or list." + ) assert self.dim == len(self.std), "std shape should be the same as dim." - def apply(self, - target, - env - ): + def apply(self, target, env): noise = self.np_random.normal(0, self.std, size=self.dim) - # # TODO: hack for debug + # # TODO: hack for debug # noise = np.clip(noise, -self.std, self.std) if self.mask is not None: @@ -249,36 +185,22 @@ def apply(self, class BrownianNoise(Disturbance): - """Simple random walk noise. - - """ + """Simple random walk noise.""" def __init__(self): super().__init__() class PeriodicNoise(Disturbance): - """Sinuisodal noise. - - """ + """Sinuisodal noise.""" - def __init__(self, - env, - dim, - mask=None, - scale=1.0, - frequency=1.0, - **kwargs - ): + def __init__(self, env, dim, mask=None, scale=1.0, frequency=1.0, **kwargs): super().__init__(env, dim) # Sine function parameters. self.scale = scale self.frequency = frequency - def apply(self, - target, - env - ): + def apply(self, target, env): phase = self.np_random.uniform(low=-np.pi, high=np.pi, size=self.dim) t = env.pyb_step_counter * env.PYB_TIMESTEP noise = self.scale * np.sin(2 * np.pi * self.frequency * t + phase) @@ -290,26 +212,22 @@ def apply(self, class StateDependentDisturbance(Disturbance): """Time varying and state varying, e.g. friction. - + Here to provide an explicit form, can also enable friction in simulator directly. """ - def __init__(self, - env, - dim, - mask=None, - **kwargs - ): + def __init__(self, env, dim, mask=None, **kwargs): super().__init__() -DISTURBANCE_TYPES = {"impulse": ImpulseDisturbance, - "step": StepDisturbance, - "uniform": UniformNoise, - "white_noise": WhiteNoise, - "periodic": PeriodicNoise, - } +DISTURBANCE_TYPES = { + "impulse": ImpulseDisturbance, + "step": StepDisturbance, + "uniform": UniformNoise, + "white_noise": WhiteNoise, + "periodic": PeriodicNoise, +} def create_disturbance_list(disturbance_specs, shared_args, env): @@ -323,13 +241,15 @@ def create_disturbance_list(disturbance_specs, shared_args, env): disturb_list = [] # Each disturbance for the mode. for disturb in disturbance_specs: - assert "disturbance_func" in disturb.keys(), "[ERROR]: Every distrubance must specify a disturbance_func." + assert ( + "disturbance_func" in disturb.keys() + ), "[ERROR]: Every distrubance must specify a disturbance_func." disturb_func = disturb["disturbance_func"] - assert disturb_func in DISTURBANCE_TYPES, "[ERROR] in BenchmarkEnv._setup_disturbances(), disturbance type not available." + assert ( + disturb_func in DISTURBANCE_TYPES + ), "[ERROR] in BenchmarkEnv._setup_disturbances(), disturbance type not available." disturb_cls = DISTURBANCE_TYPES[disturb_func] cfg = {key: disturb[key] for key in disturb if key != "disturbance_func"} disturb = disturb_cls(env, **shared_args, **cfg) disturb_list.append(disturb) return DisturbanceList(disturb_list) - - diff --git a/safe_control_gym/envs/drone.py b/safe_control_gym/envs/drone.py index dc3c9f234..539e04861 100644 --- a/safe_control_gym/envs/drone.py +++ b/safe_control_gym/envs/drone.py @@ -4,6 +4,10 @@ import logging from typing import Literal from types import ModuleType +from dataclasses import dataclass +from pathlib import Path + +from xml.etree import ElementTree import numpy as np from scipy.spatial.transform import Rotation as R @@ -18,6 +22,7 @@ class Drone: def __init__(self, controller: Literal["pid", "mellinger"]): self.firmware = self._load_firmware() self.firmware_freq = Constants.firmware_freq + self.params = DroneParams.from_urdf(Path(__file__).resolve().parent / "assets/cf2x.urdf") # Initialize firmware states self._state = self.firmware.state_t() self._control = self.firmware.control_t() @@ -123,7 +128,7 @@ def _update_state( setattr(self._state.acc, name, value) def _pwms_to_action(self, pwms: npt.NDArray[np.float64]) -> npt.NDArray[np.float64]: - return Constants.KF * (Constants.pwm2rpm_scale * pwms + Constants.pwm2rpm_const) ** 2 + return self.params.kf * (self.params.pwm2rpm_scale * pwms + self.params.pwm2rpm_const) ** 2 def full_state_cmd( self, @@ -321,11 +326,10 @@ def _update_pwms(self, control): y = control.yaw thrust = control.thrust thrust = [thrust - r + p + y, thrust - r - p - y, thrust + r - p + y, thrust + r + p - y] - thrust = np.clip(thrust, 0, Constants.max_pwm) # Limit thrust to motor range - self._pwms = np.clip(self._thrust_to_pwm(thrust), Constants.min_pwm, Constants.max_pwm) + thrust = np.clip(thrust, 0, self.params.max_pwm) # Limit thrust to motor range + self._pwms = np.clip(self._thrust_to_pwm(thrust), self.params.min_pwm, self.params.max_pwm) - @staticmethod - def _thrust_to_pwm(thrust: npt.NDArray[np.float64]) -> npt.NDArray[np.float64]: + def _thrust_to_pwm(self, thrust: npt.NDArray[np.float64]) -> npt.NDArray[np.float64]: """Convert thrust to PWM signal. Assumes brushed motors. @@ -336,10 +340,10 @@ def _thrust_to_pwm(thrust: npt.NDArray[np.float64]) -> npt.NDArray[np.float64]: Returns: PWM signal. """ - thrust = thrust / Constants.max_pwm * 60 + thrust = thrust / self.params.max_pwm * 60 volts = Constants.thrust_curve_a * thrust**2 + Constants.thrust_curve_b * thrust percentage = np.minimum(1, volts / Constants.supply_voltage) - return percentage * Constants.max_pwm + return percentage * self.params.max_pwm def _update_sensor_data( self, timestamp: float, acc: npt.NDArray[np.float64], gyro: npt.NDArray[np.float64] @@ -378,3 +382,61 @@ def _load_firmware(self) -> ModuleType: return mod # endregion + + +@dataclass +class DroneParams: + mass: float + arm_len: float + thrust2weight_ratio: float + J: npt.NDArray[np.float64] + kf: float + km: float + collision_h: float + collision_r: float + collision_z_offset: float + max_speed_kmh: float + gnd_eff_coeff: float + prop_radius: float + drag_coeff: npt.NDArray[np.float64] + dw_coeff_1: float + dw_coeff_2: float + dw_coeff_3: float + pwm2rpm_scale: float + pwm2rpm_const: float + min_pwm: float + max_pwm: float + J_inv: npt.NDArray[np.float64] = np.zeros((3, 3)) # Is calculated in __post_init__ + + def __post_init__(self): + self.J_inv = np.linalg.inv(self.J) + + @staticmethod + def from_urdf(path: Path) -> DroneParams: + """Load the drone parameters from the URDF file in `assets/` with a custom XML parser.""" + urdf = ElementTree.parse(path).getroot() + params = DroneParams( + mass=float(urdf[1][0][1].attrib["value"]), + arm_len=float(urdf[0].attrib["arm"]), + thrust2weight_ratio=float(urdf[0].attrib["thrust2weight"]), + J=np.diag([float(urdf[1][0][2].attrib[c]) for c in ("ixx", "iyy", "izz")]), + kf=float(urdf[0].attrib["kf"]), + km=float(urdf[0].attrib["km"]), + collision_h=float(urdf[1][2][1][0].attrib["length"]), + collision_r=float(urdf[1][2][1][0].attrib["radius"]), + collision_z_offset=[float(s) for s in urdf[1][2][0].attrib["xyz"].split(" ")][2], + max_speed_kmh=float(urdf[0].attrib["max_speed_kmh"]), + gnd_eff_coeff=float(urdf[0].attrib["gnd_eff_coeff"]), + prop_radius=float(urdf[0].attrib["prop_radius"]), + drag_coeff=np.array( + [float(urdf[0].attrib["drag_coeff_" + c]) for c in ("xy", "xy", "z")] + ), + dw_coeff_1=float(urdf[0].attrib["dw_coeff_1"]), + dw_coeff_2=float(urdf[0].attrib["dw_coeff_2"]), + dw_coeff_3=float(urdf[0].attrib["dw_coeff_3"]), + pwm2rpm_scale=float(urdf[0].attrib["pwm2rpm_scale"]), + pwm2rpm_const=float(urdf[0].attrib["pwm2rpm_const"]), + min_pwm=float(urdf[0].attrib["pwm_min"]), + max_pwm=float(urdf[0].attrib["pwm_max"]), + ) + return params diff --git a/safe_control_gym/envs/firmware_wrapper.py b/safe_control_gym/envs/firmware_wrapper.py index 02218a2ef..6803459c3 100644 --- a/safe_control_gym/envs/firmware_wrapper.py +++ b/safe_control_gym/envs/firmware_wrapper.py @@ -5,6 +5,7 @@ import math from typing import Callable +import gymnasium from safe_control_gym.envs.quadrotor import Quadrotor from safe_control_gym.envs.drone import Drone @@ -18,30 +19,10 @@ logger = logging.getLogger(__name__) -class FirmwareWrapper: +class FirmwareWrapper(gymnasium.Env): CONTROLLER = "mellinger" # specifies controller type - # Configurations to match firmware. Not recommended to change - GYRO_LPF_CUTOFF_FREQ = 80 - ACCEL_LPF_CUTOFF_FREQ = 30 - - # Motor and power supply settings - BRUSHED = True - - RAD_TO_DEG = 180 / math.pi - - def __init__( - self, - env_func: Callable[[], Quadrotor], - firmware_freq: int, - ctrl_freq: int, - PWM2RPM_SCALE: float = 0.2685, - PWM2RPM_CONST: float = 4070.3, - KF: float = 3.16e-10, - MIN_PWM: int = 20000, - MAX_PWM: int = 65535, - verbose: bool = False, - ): + def __init__(self, env_func: Callable[[], Quadrotor], firmware_freq: int, ctrl_freq: int): """Initializes a FirmwareWrapper object. This allows users to simulate the on board controllers of CrazyFlie (CF), including access @@ -52,13 +33,6 @@ def __init__( env_func (function): initilizer for safe-control-gym environment firmware_freq (int): frequency to run the firmware loop at (typically 500) ctrl_freq (int): frequency that .step will be called (typically < 60) - PWM2RPM_SCALE (float): mapping factor from PWM to RPM - PWM2RPM_CONST (float): mapping constant from PWM to RPM - KF = (float): motor force factor - MIN_PWM (int): minimum PWM command - MAX_PWM (int): maximum pwm command - verbose (bool): displays additional information - **kwargs: to be passed to BaseController Attributes: env: safe-control environment @@ -66,6 +40,9 @@ def __init__( Todo: * Add support for state estimation """ + super().__init__() + self.action_space = gymnasium.spaces.Box(low=-1, high=1, shape=(4,)) + self.observation_space = gymnasium.spaces.Box(low=-1, high=1, shape=(10,)) self.drone = Drone(self.CONTROLLER) self.firmware_freq = firmware_freq self.ctrl_freq = ctrl_freq @@ -73,17 +50,10 @@ def __init__( self.env = env_func() def reset(self): - """Resets the firmware_wrapper object. - - Todo: - * Add support for state estimation - """ obs, info = self.env.reset() self.drone.reset(obs[[0, 2, 4]], obs[[6, 7, 8]], obs[[1, 3, 5]]) - if self.env.NUM_DRONES > 1: - raise NotImplementedError( - "Firmware controller wrapper does not support multiple drones." - ) + if self.env.n_drones > 1: + raise NotImplementedError("Firmware wrapper does not support multiple drones.") return obs, info def close(self): diff --git a/safe_control_gym/envs/quadrotor.py b/safe_control_gym/envs/quadrotor.py index 5e7dd87bd..0d4289c3a 100644 --- a/safe_control_gym/envs/quadrotor.py +++ b/safe_control_gym/envs/quadrotor.py @@ -7,14 +7,11 @@ import os import logging -import math from copy import deepcopy import casadi as cs -from gym import spaces import numpy as np import pybullet as p -from safe_control_gym.envs.benchmark_env import Cost, Task from safe_control_gym.envs.constraints import GENERAL_CONSTRAINTS from safe_control_gym.math_and_models.symbolic_systems import SymbolicModel from safe_control_gym.envs.base_aviary import BaseAviary @@ -148,15 +145,6 @@ def __init__( # Store initial state info. self.INIT_STATE_LABELS = { - QuadType.ONE_D: ["init_x", "init_x_dot"], - QuadType.TWO_D: [ - "init_x", - "init_x_dot", - "init_z", - "init_z_dot", - "init_theta", - "init_theta_dot", - ], QuadType.THREE_D: [ "init_x", "init_x_dot", @@ -197,29 +185,8 @@ def __init__( # Set prior/symbolic info. self._setup_symbolic() - # Create X_GOAL and U_GOAL references for the assigned task. - self.U_GOAL = np.ones(self.action_dim) * self.MASS * self.GRAVITY_ACC / self.action_dim - - self.X_GOAL = np.hstack( - [ - self.TASK_INFO["stabilization_goal"][0], - 0.0, - self.TASK_INFO["stabilization_goal"][1], - 0.0, - self.TASK_INFO["stabilization_goal"][2], - 0.0, - 0.0, - 0.0, - 0.0, - 0.0, - 0.0, - 0.0, - ] - ) # x = {x, x_dot, y, y_dot, z, z_dot, phi, theta, psi, p, q, r}. - # Equilibrium point at hover for linearization. - self.X_EQ = np.zeros(self.state_dim) - self.U_EQ = self.U_GOAL + self.X_EQ = np.zeros(self.state_space.shape[0]) # IROS 2022 - Load maze. self.OBSTACLES = [] @@ -241,9 +208,6 @@ def __init__( self.GATES_AND_OBS_RAND_INFO = kwargs["gates_and_obstacles_randomization_info"] else: self.RANDOMIZED_GATES_AND_OBS = False - # - self.DONE_ON_COLLISION = kwargs.get("done_on_collision", False) - self.DONE_ON_COMPLETION = kwargs.get("done_on_completion", False) def reset(self): """(Re-)initializes the environment to start an episode. @@ -291,7 +255,6 @@ def reset(self): str(TMP_ID), textPosition=[0, 0, 0.5], textColorRGB=[1, 0, 0], - lifeTime=self.EPISODE_LEN_SEC, textSize=1.5, parentObjectUniqueId=TMP_ID, parentLinkIndex=-1, @@ -337,7 +300,6 @@ def reset(self): str(TMP_ID), textPosition=[0, 0, 0.5], textColorRGB=[1, 0, 0], - lifeTime=self.EPISODE_LEN_SEC, textSize=1.5, parentObjectUniqueId=TMP_ID, parentLinkIndex=-1, @@ -361,8 +323,8 @@ def reset(self): "Iyy": self.J[1, 1], "Izz": self.J[2, 2], } - if self.RANDOMIZED_INERTIAL_PROP: - prop_values = self._randomize_values_by_info(prop_values, self.INERTIAL_PROP_RAND_INFO) + if self._enable_inertial_randomization: + prop_values = self._randomize_values_by_info(prop_values, self._inertial_randomization) if any(phy_quantity < 0 for phy_quantity in prop_values.values()): raise ValueError( "[ERROR] in Quadrotor.reset(), negative randomized inertial properties." @@ -384,8 +346,10 @@ def reset(self): init_name: self.__dict__[init_name.upper()] for init_name in self.INIT_STATE_LABELS[self.QUAD_TYPE] } - if self.RANDOMIZED_INIT: - init_values = self._randomize_values_by_info(init_values, self.INIT_STATE_RAND_INFO) + if self._enable_init_state_randomization: + init_values = self._randomize_values_by_info( + init_values, self._init_state_randomization + ) INIT_XYZ = [init_values.get("init_" + k, 0.0) for k in ["x", "y", "z"]] INIT_VEL = [init_values.get("init_" + k + "_dot", 0.0) for k in ["x", "y", "z"]] INIT_RPY = [init_values.get("init_" + k, 0.0) for k in ["phi", "theta", "psi"]] @@ -409,10 +373,7 @@ def reset(self): obs, info = super().after_reset(obs, info) # Return either an observation and dictionary or just the observation. - if self.INFO_IN_RESET: - return obs, info - else: - return obs + return obs, info def step(self, action): """Advances the environment by one control step. @@ -436,15 +397,9 @@ def step(self, action): # Determine disturbance force. disturb_force = None passive_disturb = "dynamics" in self.disturbances - adv_disturb = self.adversary_disturbance == "dynamics" - if passive_disturb or adv_disturb: - disturb_force = np.zeros(self.DISTURBANCE_MODES["dynamics"]["dim"]) if passive_disturb: + disturb_force = np.zeros(3) disturb_force = self.disturbances["dynamics"].apply(disturb_force, self) - if adv_disturb and self.adv_action is not None: - disturb_force = disturb_force + self.adv_action - # Clear the adversary action, wait for the next one. - self.adv_action = None # Construct full (3D) disturbance force. if disturb_force is not None: disturb_force = np.asarray(disturb_force).flatten() @@ -492,7 +447,7 @@ def _setup_symbolic(self): """ m, g, l = self.MASS, self.GRAVITY_ACC, self.L Iyy = self.J[1, 1] - dt = self.CTRL_TIMESTEP + dt = 1 / self.sim_settings.ctrl_freq # Define states. z = cs.MX.sym("z") z_dot = cs.MX.sym("z_dot") @@ -579,122 +534,6 @@ def _setup_symbolic(self): # Setup symbolic model. self.symbolic = SymbolicModel(dynamics=dynamics, cost=cost, dt=dt) - def _set_action_space(self): - """Returns the action space of the environment. - - Returns: - gym.spaces: The quadrotor environment's action space, of size 1 or 2 depending on - QUAD_TYPE. - - """ - # Define action/input dimension, labels, and units. - action_dim = 4 - self.ACTION_LABELS = ["T1", "T2", "T3", "T4"] - self.ACTION_UNITS = ["N", "N", "N", "N"] - # Direct thrust control. - n_motors = 4 / action_dim - a_low = self.KF * n_motors * (self.PWM2RPM_SCALE * self.MIN_PWM + self.PWM2RPM_CONST) ** 2 - a_high = self.KF * n_motors * (self.PWM2RPM_SCALE * self.MAX_PWM + self.PWM2RPM_CONST) ** 2 - self.action_space = spaces.Box( - low=np.full(action_dim, a_low, np.float32), - high=np.full(action_dim, a_high, np.float32), - dtype=np.float32, - ) - - def _set_observation_space(self): - """Returns the observation space of the environment. - - Returns: - gym.spaces: The bounded observation (state) space, of size 2, 6, or 12 depending on - QUAD_TYPE. - - """ - self.x_threshold = 5 - self.y_threshold = 5 - self.z_threshold = 2.5 - self.phi_threshold_radians = 85 * math.pi / 180 - self.theta_threshold_radians = 85 * math.pi / 180 - self.psi_threshold_radians = 180 * math.pi / 180 # Do not bound yaw. - - # Define obs/state bounds, labels and units. - # obs/state = {x, x_dot, y, y_dot, z, z_dot, phi, theta, psi, p, q, r}. - low = np.array( - [ - -self.x_threshold, - -np.finfo(np.float32).max, - -self.y_threshold, - -np.finfo(np.float32).max, - self.GROUND_PLANE_Z, - -np.finfo(np.float32).max, - -self.phi_threshold_radians, - -self.theta_threshold_radians, - -self.psi_threshold_radians, - -np.finfo(np.float32).max, - -np.finfo(np.float32).max, - -np.finfo(np.float32).max, - ], - np.float32, - ) - high = np.array( - [ - self.x_threshold, - np.finfo(np.float32).max, - self.y_threshold, - np.finfo(np.float32).max, - self.z_threshold, - np.finfo(np.float32).max, - self.phi_threshold_radians, - self.theta_threshold_radians, - self.psi_threshold_radians, - np.finfo(np.float32).max, - np.finfo(np.float32).max, - np.finfo(np.float32).max, - ], - np.float32, - ) - self.STATE_LABELS = [ - "x", - "x_dot", - "y", - "y_dot", - "z", - "z_dot", - "phi", - "theta", - "psi", - "p", - "q", - "r", - ] - self.STATE_UNITS = [ - "m", - "m/s", - "m", - "m/s", - "m", - "m/s", - "rad", - "rad", - "rad", - "rad/s", - "rad/s", - "rad/s", - ] - - # Define the state space for the dynamics. - self.state_space = spaces.Box(low=low, high=high, dtype=np.float32) - - # Define obs space exposed to the controller. - # Note how the obs space can differ from state space (i.e. augmented with the next reference states for RL) - self.observation_space = spaces.Box(low=low, high=high, dtype=np.float32) - - def _setup_disturbances(self): - # Custom disturbance info. - self.DISTURBANCE_MODES["observation"]["dim"] = self.obs_dim - self.DISTURBANCE_MODES["action"]["dim"] = self.action_dim - self.DISTURBANCE_MODES["dynamics"]["dim"] = int(self.QUAD_TYPE) - super()._setup_disturbances() - def _preprocess_control(self, action): """Converts the action passed to .step() into motors' RPMs (ndarray of shape (4,)). @@ -706,14 +545,10 @@ def _preprocess_control(self, action): """ thrust = np.clip(action, self.action_space.low, self.action_space.high) - if not np.array_equal(thrust, np.array(action)) and self.VERBOSE: - logger.warning("Action was clipped in Quadrotor._preprocess_control().") self.current_preprocessed_action = thrust # Apply disturbances. if "action" in self.disturbances: thrust = self.disturbances["action"].apply(thrust, self) - if self.adversary_disturbance == "action": - thrust = thrust + self.adv_action # convert to quad motor rpm commands pwm = cmd2pwm( thrust, self.PWM2RPM_SCALE, self.PWM2RPM_CONST, self.KF, self.MIN_PWM, self.MAX_PWM @@ -752,21 +587,17 @@ def _get_reward(self): """ # IROS 2022 - Competition sparse reward signal. - if self.COST == Cost.COMPETITION: - reward = 0 - # Reward for stepping through the (correct) next gate. - if self.stepped_through_gate: - reward += 100 - # Reward for reaching goal position (after navigating the gates in the correct order). - if self.at_goal_pos: - reward += 100 - # Penalize by collision. - if self.currently_collided: - reward -= 1000 - # Penalize by constraint violation. - if self.cnstr_violation: - reward -= 100 - return reward + reward = 0 + # Reward for stepping through the (correct) next gate. + if self.stepped_through_gate: + reward += 100 + # Reward for reaching goal position (after navigating the gates in the correct order). + if self.at_goal_pos: + reward += 100 + # Penalize by collision. + if self.currently_collided: + reward -= 1000 + return reward def _get_done(self): """Computes the conditions for termination of an episode. @@ -775,9 +606,9 @@ def _get_done(self): bool: Whether an episode is over. """ - # Done if state is out-of-bounds. if self.done_on_out_of_bound: + assert False mask = np.array([1, 0, 1, 0, 1, 0, 1, 1, 1, 0, 0, 0]) # Element-wise or to check out-of-bound conditions. out_of_bound = np.logical_or( @@ -788,12 +619,7 @@ def _get_done(self): # Early terminate if needed. if out_of_bound: return True - - # IROS 2022 - Terminate episode on collision. - if self.DONE_ON_COLLISION and self.currently_collided: - return True - # IROS 2022 - Terminate episode on task completion. - if self.DONE_ON_COMPLETION and self.task_completed: + if self.currently_collided: return True return False @@ -806,30 +632,12 @@ def _get_info(self): """ info = {} - if self.TASK == Task.STABILIZATION and self.COST == Cost.QUADRATIC: - info["goal_reached"] = self.goal_reached # Add boolean flag for the goal being reached. - # Add MSE. - state = deepcopy(self.state) - if self.TASK == Task.STABILIZATION: - state_error = state - self.X_GOAL - elif self.TASK == Task.TRAJ_TRACKING: - # TODO: should use angle wrapping - # state[4] = normalize_angle(state[4]) - wp_idx = min(self.ctrl_step_counter, self.X_GOAL.shape[0] - 1) - state_error = state - self.X_GOAL[wp_idx] - # Filter only relevant dimensions. - state_error = state_error * self.info_mse_metric_state_weight - info["mse"] = np.sum(state_error**2) - - # Note: constraint_values and constraint_violations populated in benchmark_env. - # IROS 2022 - Per-step info. # Collisions for GATE_OBS_ID in self.GATES_IDS + self.OBSTACLES_IDS + [self.PLANE_ID]: ret = p.getContactPoints( bodyA=GATE_OBS_ID, bodyB=self.DRONE_IDS[0], - # linkIndexA=-1, linkIndexB=-1, physicsClientId=self.PYB_CLIENT, ) if ret: @@ -842,7 +650,7 @@ def _get_info(self): # # Gates progress (note: allow 0.5 seconds for initial drop if objects are not on the gound). if ( - self.pyb_step_counter > 0.5 * self.PYB_FREQ + self.pyb_step_counter > 0.5 * self.sim_settings.sim_freq and self.NUM_GATES > 0 and self.current_gate < self.NUM_GATES ): @@ -909,30 +717,6 @@ def _get_info(self): # Final goal position reached info["at_goal_position"] = False info["task_completed"] = False - if self.current_gate == self.NUM_GATES: - if self.QUAD_TYPE == QuadType.THREE_D: - quad_xyz = np.array([self.state[0], self.state[2], self.state[4]]) - goal_xyz = np.array([self.X_GOAL[0], self.X_GOAL[2], self.X_GOAL[4]]) - if ( - np.linalg.norm(quad_xyz - goal_xyz) - < self.TASK_INFO["stabilization_goal_tolerance"] - ): - self.at_goal_pos = True - self.steps_at_goal_pos += 1 - else: - self.at_goal_pos = False - self.steps_at_goal_pos = 0 - if ( - self.steps_at_goal_pos > self.CTRL_FREQ * 2 - ): # Remain near goal position for 2''. - self.task_completed = True - info["at_goal_position"] = self.at_goal_pos - info["task_completed"] = self.task_completed - else: - logger.warning( - '"at_goal_position" and "task_completed" are only intended for used with the 3D quadrotor.' - ) - return info def _get_reset_info(self): @@ -950,17 +734,15 @@ def _get_reset_info(self): "quadrotor_iyy_inertia": self.J[1, 1], "quadrotor_izz_inertia": self.J[2, 2], } - info["x_reference"] = self.X_GOAL - info["u_reference"] = self.U_GOAL if self.constraints is not None: info["symbolic_constraints"] = self.constraints.get_all_symbolic_models() else: info["symbolic_constraints"] = {} # IROS 2022 - Reset info. - info["ctrl_timestep"] = self.CTRL_TIMESTEP - info["ctrl_freq"] = self.CTRL_FREQ - info["episode_len_sec"] = self.EPISODE_LEN_SEC + info["ctrl_timestep"] = 1 / self.sim_settings.ctrl_freq + info["ctrl_freq"] = self.sim_settings.ctrl_freq + # info["episode_len_sec"] = self.EPISODE_LEN_SEC info["quadrotor_kf"] = self.KF info["quadrotor_km"] = self.KM info["gate_dimensions"] = { @@ -975,19 +757,19 @@ def _get_reset_info(self): info["nominal_gates_pos_and_type"] = self.GATES info["nominal_obstacles_pos"] = self.OBSTACLES - if self.RANDOMIZED_INIT: - info["initial_state_randomization"] = self.INIT_STATE_RAND_INFO + if self._enable_init_state_randomization: + info["initial_state_randomization"] = self._init_state_randomization else: info["initial_state_randomization"] = {} - if self.RANDOMIZED_INERTIAL_PROP: - info["inertial_prop_randomization"] = self.INERTIAL_PROP_RAND_INFO + if self._enable_inertial_randomization: + info["inertial_prop_randomization"] = self._inertial_randomization else: info["inertial_prop_randomization"] = {} if self.RANDOMIZED_GATES_AND_OBS: info["gates_and_obs_randomization"] = self.GATES_AND_OBS_RAND_INFO else: info["gates_and_obs_randomization"] = {} - info["disturbances"] = self.DISTURBANCES + info["disturbances"] = self._disturbance_config info["pyb_client"] = self.PYB_CLIENT info["urdf_dir"] = self.URDF_DIR