From 0a9f16d9b9ab29d1e2f9f5bed661ee7eb1272cc4 Mon Sep 17 00:00:00 2001 From: Martin Schuck Date: Thu, 20 Jun 2024 01:03:07 +0200 Subject: [PATCH] [WIP] Continue refactor of sim --- safe_control_gym/envs/base_aviary.py | 13 +- safe_control_gym/envs/benchmark_env.py | 304 ---------- safe_control_gym/envs/constraints.py | 682 ++++++++-------------- safe_control_gym/envs/drone.py | 7 +- safe_control_gym/envs/drone_sim.py | 189 ++++++ safe_control_gym/envs/firmware_wrapper.py | 13 +- safe_control_gym/envs/quadrotor.py | 160 ++--- 7 files changed, 504 insertions(+), 864 deletions(-) delete mode 100644 safe_control_gym/envs/benchmark_env.py create mode 100644 safe_control_gym/envs/drone_sim.py diff --git a/safe_control_gym/envs/base_aviary.py b/safe_control_gym/envs/base_aviary.py index 4bcb6a568..bd96ad2f1 100644 --- a/safe_control_gym/envs/base_aviary.py +++ b/safe_control_gym/envs/base_aviary.py @@ -19,7 +19,7 @@ import pybullet as p import pybullet_data -from safe_control_gym.envs.benchmark_env import BenchmarkEnv +from safe_control_gym.envs.drone_sim import DroneSim logger = logging.getLogger(__name__) egl = pkgutil.get_loader("eglRenderer") @@ -44,7 +44,7 @@ class Physics(str, Enum): ) -class BaseAviary(BenchmarkEnv): +class BaseAviary(DroneSim): """Base class for "drone aviary" Gym environments.""" NAME = "base_aviary" @@ -110,7 +110,7 @@ def __init__( self.MIN_PWM, self.MAX_PWM, ) = self._parse_urdf_parameters(self.URDF_PATH) - self.GROUND_PLANE_Z = 0 # -0.05 + self.GROUND_PLANE_Z = 0 # Compute constants. self.GRAVITY = self.GRAVITY_ACC * self.MASS self.HOVER_RPM = np.sqrt(self.GRAVITY / (4 * self.KF)) @@ -124,17 +124,12 @@ def __init__( * np.sqrt((15 * self.MAX_RPM**2 * self.KF * self.GND_EFF_COEFF) / self.MAX_THRUST) ) # BenchmarkEnv constructor. + super().__init__( seed=kwargs["seed"], gui=gui, - episode_len_sec=kwargs["episode_len_sec"], sim_freq=kwargs["sim_freq"], ctrl_freq=kwargs["ctrl_freq"], - init_state=kwargs["init_state"], - randomized_init=kwargs["randomized_init"], - init_state_randomization_info=kwargs["init_state_randomization_info"], - randomized_inertial_prop=kwargs["randomized_inertial_prop"], - inertial_prop_randomization_info=kwargs["inertial_prop_randomization_info"], constraints=kwargs["constraints"], disturbances=kwargs["disturbances"], reseed=kwargs["reseed"], diff --git a/safe_control_gym/envs/benchmark_env.py b/safe_control_gym/envs/benchmark_env.py deleted file mode 100644 index 48bc25c67..000000000 --- a/safe_control_gym/envs/benchmark_env.py +++ /dev/null @@ -1,304 +0,0 @@ -"""Base environment class module. - -This module also contains enumerations for cost functions, tasks, disturbances, and quadrotor types. - -""" - -from __future__ import annotations - -from dataclasses import dataclass -import numpy as np -import gymnasium -from gymnasium.utils import seeding -import copy -import math - -from safe_control_gym.envs.constraints import GENERAL_CONSTRAINTS -from safe_control_gym.envs.constraints import create_constraint_list -from safe_control_gym.envs.disturbances import create_disturbance_list - - -@dataclass -class SimSettings: - """Simulation settings dataclass.""" - - sim_freq: int = 500 - ctrl_freq: int = 500 - gui: bool = False - - def __post_init__(self): - assert self.sim_freq % self.ctrl_freq == 0, "sim_freq must be divisible by ctrl_freq." - - -class BenchmarkEnv(gymnasium.Env): - """Benchmark environment base class.""" - - def __init__( - self, - gui: bool = False, - sim_freq: int = 500, - ctrl_freq: int = 500, - episode_len_sec: int = 5, - init_state=None, - randomized_init: bool = True, - init_state_randomization_info=None, - inertial_prop=None, - randomized_inertial_prop: bool = False, - inertial_prop_randomization_info=None, - constraints=None, - disturbances=None, - seed: int | None = None, - reseed: bool = False, - ): - """Initialization method for BenchmarkEnv. - - Args: - gui: Option to show PyBullet's GUI. - sim_freq: The frequency at which PyBullet steps (a multiple of ctrl_freq). - ctrl_freq: The frequency at which the environment steps. - init_state: The initial state of the environment - randomized_init: Flag to randomize the initial state. - init_state_randomization_info: A dictionary with information used to randomize the - initial state. - inertial_prop: The ground truth inertial properties of the environment. - randomized_inertial_prop: Flag to randomize the inertial properties. - inertial_prop_randomization_info: A dictionary with information used to randomize the - inert. properties. - constraints: Dictionary to specify the constraints being used. - disturbances: Dictionary to specify disturbances being used. - seed: Seed for the random number generator. - reseed: Option to reseed the environment on each reset with the same seed. - """ - self.sim_settings = SimSettings(sim_freq, ctrl_freq, gui) - - self._init_state = init_state - self.EPISODE_LEN_SEC = episode_len_sec - - self._init_state = init_state - self._init_state_randomization = init_state_randomization_info - self._enable_init_state_randomization = randomized_init - - self._inertial = inertial_prop - self._inertial_randomization = inertial_prop_randomization_info - self._enable_inertial_randomization = randomized_inertial_prop - - # Create action and observation spaces. - self._set_action_space() - self._set_observation_space() - self.state_space = self.observation_space # TODO: Remove this, not always correct - # Store action (input) and observation spaces dimensions. - self.Q = np.eye(self.observation_space.shape[0]) - self.R = np.eye(self.action_space.shape[0]) - # Set constraint info. - self.constraints = None - if constraints is not None: - self.constraints = create_constraint_list(constraints, GENERAL_CONSTRAINTS, self) - # Set disturbance info. - self._disturbance_config = disturbances - self.disturbances = self._setup_disturbances(disturbances) - # Default seed None means pure randomness/no seeding. - self.seed(seed) - # IROS 2022 - Save random seed for re-seeding. - self._seed = seed - self._reseed_on_reset = reseed - - def _set_action_space(self): - a_low = self.KF * (self.PWM2RPM_SCALE * self.MIN_PWM + self.PWM2RPM_CONST) ** 2 - a_high = self.KF * (self.PWM2RPM_SCALE * self.MAX_PWM + self.PWM2RPM_CONST) ** 2 - self.action_space = gymnasium.spaces.Box(low=a_low, high=a_high, shape=(4,)) - - def _set_observation_space(self): - self.x_threshold = 5 - self.y_threshold = 5 - self.z_threshold = 2.5 - self.phi_threshold_radians = 85 * math.pi / 180 - self.theta_threshold_radians = 85 * math.pi / 180 - self.psi_threshold_radians = 180 * math.pi / 180 # Do not bound yaw. - - # Define obs/state bounds, labels and units. - # obs/state = {x, x_dot, y, y_dot, z, z_dot, phi, theta, psi, p, q, r}. - low = np.array( - [ - -self.x_threshold, - -np.finfo(np.float32).max, - -self.y_threshold, - -np.finfo(np.float32).max, - self.GROUND_PLANE_Z, - -np.finfo(np.float32).max, - -self.phi_threshold_radians, - -self.theta_threshold_radians, - -self.psi_threshold_radians, - -np.finfo(np.float32).max, - -np.finfo(np.float32).max, - -np.finfo(np.float32).max, - ], - np.float32, - ) - high = np.array( - [ - self.x_threshold, - np.finfo(np.float32).max, - self.y_threshold, - np.finfo(np.float32).max, - self.z_threshold, - np.finfo(np.float32).max, - self.phi_threshold_radians, - self.theta_threshold_radians, - self.psi_threshold_radians, - np.finfo(np.float32).max, - np.finfo(np.float32).max, - np.finfo(np.float32).max, - ], - np.float32, - ) - self.STATE_LABELS = [ - "x", - "x_dot", - "y", - "y_dot", - "z", - "z_dot", - "phi", - "theta", - "psi", - "p", - "q", - "r", - ] - self.STATE_UNITS = [ - "m", - "m/s", - "m", - "m/s", - "m", - "m/s", - "rad", - "rad", - "rad", - "rad/s", - "rad/s", - "rad/s", - ] - - # Define the state space for the dynamics. - self.state_space = gymnasium.spaces.Box(low=low, high=high, dtype=np.float32) - - # Define obs space exposed to the controller. - # Note how the obs space can differ from state space (i.e. augmented with the next reference states for RL) - self.observation_space = gymnasium.spaces.Box(low=low, high=high, dtype=np.float32) - - def seed(self, seed=None): - """Sets up a random number generator for a given seed. - - Remember to seed all random generators, currently in - - env - - action_space - - disturbances - - """ - self.np_random, seed = seeding.np_random(seed) - self.action_space.seed(seed) - for _, disturbs in self.disturbances.items(): - disturbs.seed(self) - return [seed] - - def _randomize_values_by_info(self, original_values, randomization_info): - """Randomizes a list of values according to desired distributions. - - Args: - original_values (dict): a dict of orginal values. - randomization_info (dict): A dictionary containing information about the distributions - used to randomize original_values. - - Returns: - dict: A dict of randomized values. - - """ - # Start from a copy of the original values. - randomized_values = copy.deepcopy(original_values) - # Copy the info dict to parse it with "pop". - rand_info_copy = copy.deepcopy(randomization_info) - # Randomized and replace values for which randomization info are given. - for key in original_values: - if key in rand_info_copy: - # Get distribution removing it from info dict. - distrib = getattr(self.np_random, rand_info_copy[key].pop("distrib")) - # Pop positional args. - d_args = rand_info_copy[key].pop("args", []) - # Keyword args are just anything left. - d_kwargs = rand_info_copy[key] - # Randomize (adding to the original values). - randomized_values[key] += distrib(*d_args, **d_kwargs) - return randomized_values - - def _setup_disturbances(self, disturbances: dict | None = None): - """Creates attributes and action spaces for the disturbances.""" - # Default: no passive disturbances. - if disturbances is None: - return - dist = {} - modes = { - "observation": {"dim": self.observation_space.shape[0]}, - "action": {"dim": self.action_space.shape[0]}, - "dynamics": {"dim": 3}, - } - for mode, spec in disturbances.items(): - assert mode in modes, "Disturbance mode not available." - dist[mode] = create_disturbance_list(spec, modes[mode], self) - return dist - - @property - def n_constraints(self): - return 0 if self.constraints is None else self.constraints.num_constraints - - def before_reset(self): - """Pre-processing before calling `.reset()`.""" - # IROS 2022 - Re-seed on reset. - if self._reseed_on_reset: - self.seed(self._seed) - - # Housekeeping variables. - self.pyb_step_counter = 0 - self.ctrl_step_counter = 0 - self.current_raw_input_action = None - self.current_preprocessed_action = None - # Reset the disturbances. - for mode in self.disturbances.keys(): - self.disturbances[mode].reset(self) - - def after_reset(self, obs, info): - """Post-processing after calling `.reset()`.""" - # Add initial constraint info (no action/input yet, so only state-based constraints) - if self.constraints is not None: - info["constraint_values"] = self.constraints.get_values(self, only_state=True) - return obs, info - - def before_step(self, action): - """Pre-processing before calling `.step()`.""" - # Save the raw input action. - self.current_raw_input_action = action - # Pre-process/clip the action - processed_action = self._preprocess_control(action) - return processed_action - - def after_step(self, obs, rew, done, info): - """Post-processing after calling `.step()`.""" - # Increment counters - self.pyb_step_counter += self.sim_settings.sim_freq // self.sim_settings.ctrl_freq - self.ctrl_step_counter += 1 - - # Terminate when (any) constraint is violated. - # here we cache the constraint values `c_value`, so we only evaluate the constraints once, - # but use it in 1) info dict; 2) check constraint violation; 3) check near constraint violation/almost active. - # it also allows stateful constraint, where evaluation/update should only be done once per time step. - c_value = None - if self.constraints is not None: - c_value = self.constraints.get_values(self) - info["constraint_values"] = c_value - info["constraint_violation"] = self.constraints.is_violated(self, c_value=c_value) - # Terminate when reaching time limit, - # but distinguish between done due to true termination or time limit reached - if self.ctrl_step_counter >= self.EPISODE_LEN_SEC * self.sim_settings.ctrl_freq: - info["TimeLimit.truncated"] = not done - done = True - return obs, rew, done, info diff --git a/safe_control_gym/envs/constraints.py b/safe_control_gym/envs/constraints.py index 170cf2760..9d5e72245 100644 --- a/safe_control_gym/envs/constraints.py +++ b/safe_control_gym/envs/constraints.py @@ -1,179 +1,160 @@ """Constraints module. Classes for constraints and lists of constraints. - """ -import casadi as cs +from __future__ import annotations + +from gymnasium import Space, spaces +import sys + +from typing import Callable from enum import Enum import numpy as np +import numpy.typing as npt class ConstrainedVariableType(str, Enum): """Allowable constraint type specifiers.""" - STATE = "state" # Constraints who are a function of the state X. - INPUT = "input" # Constraints who are a function of the input U. - INPUT_AND_STATE = ( - "input_and_state" # Constraints who are a function of the input U and state X. - ) + STATE = "state" # Constraints who are a function of the state X + INPUT = "input" # Constraints who are a function of the input U + INPUT_AND_STATE = "input_and_state" # Constraints who are a function of the input U and state X class Constraint: - """Implements a (state-wise/trajectory-wise/stateful) constraint. - - A constraint can contain multiple scalar-valued constraint functions. - Each should be represented as g(x) <= 0. - - Attributes: - constrained_variable: the variable(s) from env to be constrained. - dim (int): Total number of input dimensions to be constrained, i.e. dim of x. - num_constraints (int): total number of output dimensions or number of constraints, i.e. dim of g(x). - sym_func (Callable): the symbolic function of the constraint, can take in np.array or CasADi variable. + """A (state-wise/trajectory-wise/stateful) constraint. + A constraint can contain multiple scalar-valued constraint functions. Each should be represented + as g(x) <= 0. """ def __init__( self, - env, - constrained_variable: ConstrainedVariableType, + state_space: Space, + input_space: Space, + ctype: ConstrainedVariableType, strict: bool = False, - active_dims=None, - tolerance=None, + active_dims: list[int] | None = None, + tolerance: npt.NDArray[np.float64] | None = None, rounding: int = 8, - **kwargs, ): - """Defines params (e.g. bounds) and state. + """Define the bounds and states. Args: - env (safe_control_gym.envs.bechmark_env.BenchmarkEnv): The environment the constraint is for. - constrained_variable (ConstrainedVariableType): Specifies the input type to the constraint as a constraint - that acts on the state, input, or both. - strict (optional, bool): Whether the constraint is violated also when equal to its threshold. - active_dims (list of ints): Filters the constraint to only act only select certian dimensions. - tolerance (list or np.array): The distance from the constraint at which is_almost_active returns True. - rounding (optional, int): Decimal places used in the `get_value()` method. + state_space: System state space. + input_space: System input space. + ctype: Type of constraint (state, input, or both). + strict: Option to check for strict constraint satisfaction at the threshold (< vs <=). + active_dims: Optional list of indices to filter which dimensions are active. + tolerance: The distance from the constraint at which is_almost_active returns True. + rounding: Decimal places used in the `value()` method. """ self.rounding = rounding - self.constrained_variable = ConstrainedVariableType(constrained_variable) - if self.constrained_variable == ConstrainedVariableType.STATE: - self.dim = env.state_space.shape[0] - elif self.constrained_variable == ConstrainedVariableType.INPUT: - self.dim = env.action_space.shape[0] - elif self.constrained_variable == ConstrainedVariableType.INPUT_AND_STATE: - self.dim = env.state_space.shape[0] + env.action_space.shape[0] - else: - raise NotImplementedError( - "[ERROR] invalid constrained_variable (use STATE, INPUT or INPUT_AND_STATE)." - ) - # Save the strictness attribute + if ctype == ConstrainedVariableType.STATE: + dim = spaces.flatdim(state_space) + elif ctype == ConstrainedVariableType.INPUT: + dim = spaces.flatdim(input_space) + elif ctype == ConstrainedVariableType.INPUT_AND_STATE: + dim = spaces.flatdim(state_space) + spaces.flatdim(input_space) self.strict = strict + self.ctype = ctype # Only want to select specific dimensions, implemented via a filter matrix. + self.constraint_filter = np.eye(dim) if active_dims is not None: - if isinstance(active_dims, int): - active_dims = [active_dims] - assert isinstance( - active_dims, (list, np.ndarray) - ), "[ERROR] active_dims is not a list/array." - assert ( - len(active_dims) <= self.dim - ), "[ERROR] more active_dim than constrainable self.dim" - assert all(isinstance(n, int) for n in active_dims), "[ERROR] non-integer active_dim." - assert all( - (n < self.dim) for n in active_dims - ), "[ERROR] active_dim not stricly smaller than self.dim." - assert len(active_dims) == len(set(active_dims)), "[ERROR] duplicates in active_dim" - self.constraint_filter = np.eye(self.dim)[active_dims] - self.dim = len(active_dims) - else: - self.constraint_filter = np.eye(self.dim) - if tolerance is not None: - self.tolerance = np.array(tolerance, ndmin=1) - else: - self.tolerance = None + assert isinstance(active_dims, (list, np.ndarray)), "Active_dims is not a list/array." + assert len(active_dims) <= dim, "More active_dim than constrainable self.dim" + assert all(isinstance(n, int) for n in active_dims), "Non-integer active_dim." + assert max(active_dims) < dim, "active_dim not stricly smaller than dim." + assert len(active_dims) == len(set(active_dims)), "Duplicates in active_dim" + self.constraint_filter = self.constraint_filter[active_dims] + dim = len(active_dims) + self.dim = dim + self.n_constraints = dim + self.tolerance = None if tolerance is None else np.array(tolerance, ndmin=1) def reset(self): """Clears up the constraint state (if any).""" - pass - - def get_symbolic_model(self, env): - """Gets the symbolic form of the constraint function. - - Args: - env: The environment to constrain. - Returns: - obj: The symbolic form of the constraint. - - """ + def symbolic_model(self): + """Create the symbolic form of the constraint function.""" raise NotImplementedError - def get_value(self, env): - """Gets the constraint function value. + def value( + self, + state: npt.NDArray[np.float64] | None = None, + input: npt.NDArray[np.float64] | None = None, + ) -> npt.NDArray[np.float64]: + """Calculate the constraint function value. Args: - env: The environment to constrain. + state: The system state to evaluate the constraint at. + input: The system input to evaluate the constraint at. Returns: - ndarray: The evaulation of the constraint. - + The evaluation of the constraint. """ - env_value = self.get_env_constraint_var(env) + if state is None and input is None: + raise ValueError("Either state or input must be provided.") + if self.ctype == ConstrainedVariableType.INPUT_AND_STATE: + x = np.concatenate((state, input)) + elif self.ctype == ConstrainedVariableType.STATE: + x = state + else: + x = input return np.round_( - np.atleast_1d(np.squeeze(self.sym_func(np.array(env_value, ndmin=1)))), + np.atleast_1d(np.squeeze(self.sym_func(np.array(x, ndmin=1)))), decimals=self.rounding, ) - def is_violated(self, env, c_value=None): - """Checks if constraint is violated. + def is_violated( + self, + state: npt.NDArray[np.float64] | None = None, + input: npt.NDArray[np.float64] | None = None, + c_value: npt.NDArray[np.float64] | None = None, + ) -> bool: + """Check if the constraint is violated. Args: - env: The environment to constrain. - c_value: an already calculated constraint value (no need to recompute). + state: The system state to evaluate the constraint at. + input: The system input to evaluate the constraint at. + c_value: Optional precomputed constraint value. Returns: bool: Whether the constraint was violeted. """ - if c_value is None: - c_value = self.get_value(env) + c_value = self.value(state=state, input=input) if c_value is None else c_value if self.strict: - flag = np.any(np.greater_equal(c_value, 0.0)) - else: - flag = np.any(np.greater(c_value, 0.0)) - return bool(flag) + return np.any(np.greater_equal(c_value, 0.0)) + return np.any(np.greater(c_value, 0.0)) - def is_almost_active(self, env, c_value=None): - """Checks if constraint is nearly violated. + def is_almost_active( + self, + state: npt.NDArray[np.float64] | None = None, + input: npt.NDArray[np.float64] | None = None, + c_value: npt.NDArray[np.float64] | None = None, + ) -> bool: + """Check if the constraint is nearly violated. - This is checked by using a slack variable (from init args). - This can be used for reward shaping/constraint penalty in RL methods. + Can be used for reward shaping/constraint penalty in RL methods. + Args: + state: The system state to evaluate the constraint at. + input: The system input to evaluate the constraint at. + c_value: Optional precomputed constraint value. """ - if not hasattr(self, "tolerance") or self.tolerance is None: + if self.tolerance is None: return False - if c_value is None: - c_value = self.get_value(env) - flag = np.any(np.greater(c_value + self.tolerance, 0.0)) - return bool(flag) - - def get_env_constraint_var(self, env): - """Gets the env variable(s) subject to the constraint.""" - if self.constrained_variable == ConstrainedVariableType.STATE: - return env.state - elif self.constrained_variable == ConstrainedVariableType.INPUT: - return env.current_raw_input_action - elif self.constrained_variable == ConstrainedVariableType.INPUT_AND_STATE: - return (env.state, env.current_raw_input_action) - else: - raise NotImplementedError("Constraint input type not implemented.") + c_value = self.value(state=state, input=input) if c_value is None else c_value + return np.any(np.greater(c_value + self.tolerance, 0.0)) def check_tolerance_shape(self): - if self.tolerance is not None and len(self.tolerance) != self.num_constraints: - raise ValueError( - "[ERROR] the tolerance dimension does not match the number of constraints." - ) + if self.tolerance is None: + return + if len(self.tolerance) != self.n_constraints: + raise ValueError("tolerance dimension does not match the number of constraints.") class QuadraticContstraint(Constraint): @@ -181,42 +162,48 @@ class QuadraticContstraint(Constraint): def __init__( self, - env, + state_space, + input_space, + ctype: ConstrainedVariableType, P: np.ndarray, b: float, - constrained_variable: ConstrainedVariableType, strict: bool = False, - active_dims=None, - tolerance=None, + active_dims: list[int] | None = None, + tolerance: list[float] | None = None, ): """Initializes the class. Args: - env (safe_control_gym.envs.bechmark_env.BenchmarkEnv): The environment the constraint is for. - P (np.array): The square matrix representing the quadratic. - b (float): The scalar limit for the quadatic constraint. - constrained_variable (ConstrainedVariableType): Specifies the input type to the constraint as a constraint - that acts on the state, input, or both. - strict (optional, bool): Whether the constraint is violated also when equal to its threshold. - active_dims (list of ints): Filters the constraint to only act only select certian dimensions. - tolerance (list or np.array): The distance from the constraint at which is_almost_active returns True. + state_space: System state space. + input_space: System input space. + ctype: Type of constraint (state, input, or both). + P: The square matrix representing the quadratic. + b: The scalar limit for the quadatic constraint. + strict: Option to check for strict constraint satisfaction at the threshold (< vs <=). + active_dims: Filters the constraint to only act only select certian dimensions. + tolerance: The distance from the constraint at which is_almost_active returns True. """ super().__init__( - env, constrained_variable, strict=strict, active_dims=active_dims, tolerance=tolerance + state_space, + input_space, + ctype, + strict=strict, + active_dims=active_dims, + tolerance=tolerance, ) P = np.array(P, ndmin=1) - assert P.shape == (self.dim, self.dim), "[ERROR] P has the wrong dimension!" + assert P.shape == (self.dim, self.dim), "P has the wrong dimension!" self.P = P - assert isinstance(b, float), "[ERROR] b is not a scalar!" + assert isinstance(b, float), "b is not a scalar!" self.b = b - self.num_constraints = 1 # Always scalar. + self.n_constraints = 1 # Always scalar. self.sym_func = ( lambda x: x.T @ self.constraint_filter.T @ self.P @ self.constraint_filter @ x - self.b ) self.check_tolerance_shape() - def get_symbolic_model(self): + def symbolic_model(self): """Gets the symbolic form of the constraint function. Returns: @@ -231,10 +218,11 @@ class LinearConstraint(Constraint): def __init__( self, - env, - A: np.ndarray, - b: np.ndarray, - constrained_variable: ConstrainedVariableType, + state_space: Space, + input_space: Space, + ctype: ConstrainedVariableType, + A: npt.NDArray[np.float64], + b: npt.NDArray[np.float64], strict: bool = False, active_dims=None, tolerance=None, @@ -242,29 +230,34 @@ def __init__( """Initialize the class. Args: - env (BenchmarkEnv): The environment to constraint. - A (np.array or list): A matrix of the constraint (self.num_constraints by self.dim). - b (np.array or list): b matrix of the constraint (1D array self.num_constraints) - constrained_variable (ConstrainedVariableType): Type of constraint. - strict (optional, bool): Whether the constraint is violated also when equal to its threshold. - active_dims (list or int): List specifying which dimensions the constraint is active for. - tolerance (float): The distance at which is_almost_active(env) triggers. - + state_space: System state space. + input_space: System input space. + ctype: Type of constraint (state, input, or both). + A: A matrix of the constraint (self.n_constraints by self.dim). + b: b matrix of the constraint (1D array self.n_constraints). + strict: Option to check for strict constraint satisfaction at the threshold (< vs <=). + active_dims: Filters the constraint to only act only select certian dimensions. + tolerance: The distance from the constraint at which is_almost_active returns True. """ super().__init__( - env, constrained_variable, strict=strict, active_dims=active_dims, tolerance=tolerance + state_space, + input_space, + ctype, + strict=strict, + active_dims=active_dims, + tolerance=tolerance, ) A = np.array(A, ndmin=1) b = np.array(b, ndmin=1) - assert A.shape[1] == self.dim, "[ERROR] A has the wrong dimension!" + assert A.shape[1] == self.dim, "A has the wrong dimension!" self.A = A - assert b.shape[0] == A.shape[0], "[ERROR] Dimension 0 of b does not match A!" + assert b.shape[0] == A.shape[0], "Dimension 0 of b does not match A!" self.b = b - self.num_constraints = A.shape[0] + self.n_constraints = A.shape[0] self.sym_func = lambda x: self.A @ self.constraint_filter @ x - self.b self.check_tolerance_shape() - def get_symbolic_model(self): + def symbolic_model(self): """Gets the symbolic form of the constraint function. Returns: @@ -279,24 +272,27 @@ class BoundedConstraint(LinearConstraint): def __init__( self, - env, + state_space: Space, + input_space: Space, + ctype: ConstrainedVariableType, lower_bounds: np.ndarray, upper_bounds: np.ndarray, - constrained_variable: ConstrainedVariableType, strict: bool = False, - active_dims=None, - tolerance=None, + active_dims: list[int] | None = None, + tolerance: float | None = None, ): """Initialize the constraint. Args: - env (BenchmarkEnv): The environment to constraint. + state_space: System state space. + input_space: System input space. + ctype: Type of constraint (state, input, or both). lower_bounds (np.array or list): Lower bound of constraint. upper_bounds (np.array or list): Uppbound of constraint. constrained_variable (ConstrainedVariableType): Type of constraint. - strict (optional, bool): Whether the constraint is violated also when equal to its threshold. - active_dims (list or int): List specifying which dimensions the constraint is active for. - tolerance (float): The distance at which is_almost_active(env) triggers. + strict: Option to check for strict constraint satisfaction at the threshold (< vs <=). + active_dims: List specifying which dimensions the constraint is active for. + tolerance: The distance at which is_almost_active(env) triggers. """ self.lower_bounds = np.array(lower_bounds, ndmin=1) @@ -305,10 +301,11 @@ def __init__( A = np.vstack((-np.eye(dim), np.eye(dim))) b = np.hstack((-self.lower_bounds, self.upper_bounds)) super().__init__( - env, + state_space, + input_space, + ctype, A, b, - constrained_variable, strict=strict, active_dims=active_dims, tolerance=tolerance, @@ -319,315 +316,154 @@ def __init__( class DefaultConstraint(BoundedConstraint): """Use the environment's observation_space or action_space for default state or input bound constraints. - This class only constraint either STATE or INPUT constraint but not both - (to constrain both, use two DefaultConstraints). - The class constrain the entire variable, i.e. no `active_dims` option - (to constrain subset of the variable, use the BoundedConstraint instead). - + This class only constraint either STATE or INPUT constraint but not both. The class constrains + the entire variable, i.e. no `active_dims` option. For other options, use BoundedConstraint. """ def __init__( self, - env, - constrained_variable: ConstrainedVariableType, - lower_bounds=None, - upper_bounds=None, + state_space: Space, + input_space: Space, + ctype: ConstrainedVariableType, + lower_bounds: npt.NDArray[np.float64] | None = None, + upper_bounds: npt.NDArray[np.float64] | None = None, strict: bool = False, - tolerance=None, + tolerance: float | None = None, ): """ "Initialize the class. Args: - env (BenchmarkEnv): Environment for the constraint. - lower_bounds (list, np.array): 1D array or list of the lower bounds. Length must match - the environemt observation space dimension. If none, the env defaults are used - upper_bounds (list, np.array): 1D array or list of the lower bounds. Length must match - the environemt observation space dimension. If None, the env defaults are used. - strict (optional, bool): Whether the constraint is violated also when equal to its threshold. - tolerance (float): The distance at which is_almost_active(env) triggers. - + state_space: System state space. + input_space: System input space. + ctype: Type of constraint (state, input, or both). + lower_bounds: 1D array of the lower bounds. Length must match the space dimension. If + None, the env defaults are used. + upper_bounds: 1D array of the lower bounds. Length must match the space dimension. If + None, the env defaults are used. + strict: Option to check for strict constraint satisfaction at the threshold (< vs <=). + tolerance: The distance at which is_almost_active(env) triggers. """ - if constrained_variable == ConstrainedVariableType.STATE: - # for now we only constrain the underlying env state, and assume either the observation - # is the same as state, or observation contain additional info other than state and so - # the env has separate `state_space` and `observation_space` - if hasattr(env, "state_space"): - default_constraint_space = env.state_space - else: - default_constraint_space = env.observation_space - elif constrained_variable == ConstrainedVariableType.INPUT: - default_constraint_space = env.action_space - else: - raise NotImplementedError( - "[ERROR] DefaultConstraint can only be of type STATE or INPUT" - ) - # extract bounds from the space - if upper_bounds is None: - upper_bounds = default_constraint_space.high + if ctype == ConstrainedVariableType.STATE: + cspace = spaces.flatten_space(state_space) + elif ctype == ConstrainedVariableType.INPUT: + cspace = spaces.flatten_space(input_space) else: - upper_bounds = np.array(upper_bounds, ndmin=1) - assert len(upper_bounds) == default_constraint_space.shape[0], ValueError( - "[ERROR]: Upper bound must have length equal to space dimension." - ) - if lower_bounds is None: - lower_bounds = default_constraint_space.low - else: - lower_bounds = np.array(lower_bounds, ndmin=1) - assert len(lower_bounds) == default_constraint_space.shape[0], ValueError( - "[ERROR]: Lower bound must have length equal to space dimension." - ) + raise NotImplementedError("DefaultConstraint can only be of type STATE or INPUT") + upper_bounds = cspace.high if upper_bounds is None else np.array(upper_bounds, ndmin=1) + lower_bounds = cspace.low if lower_bounds is None else np.array(lower_bounds, ndmin=1) + assert len(upper_bounds) == cspace.shape[0], "Upper bound does not match space dimension." + assert len(lower_bounds) == cspace.shape[0], "Lower bound does not match space dimension." super().__init__( - env, + state_space, + input_space, + ctype, lower_bounds=lower_bounds.astype(np.float64), upper_bounds=upper_bounds.astype(np.float64), - constrained_variable=constrained_variable, strict=strict, active_dims=None, tolerance=tolerance, ) -class SymmetricStateConstraint(BoundedConstraint): - """Symmetric state bound constraint. - - Note: speficially intended for Cartpole and Safe Exploration (Dalal 2018). - - """ - - def __init__( - self, - env, - constrained_variable, - bound, - strict: bool = False, - active_dims=None, - tolerance=None, - **kwrags, - ): - """ """ - assert bound is not None - self.bound = np.array(bound, ndmin=1) - super().__init__( - env, - lower_bounds=-bound, - upper_bounds=bound, - constrained_variable=constrained_variable, - strict=strict, - active_dims=active_dims, - tolerance=tolerance, - **kwrags, - ) - assert ( - env.NAME == "cartpole" - ), "[ERROR] SymmetricStateConstraint is meant for CartPole environments" - assert ( - env.COST == "rl_reward" - ), "[ERROR] SymmetricStateConstraint is meant for RL environments" - self.num_constraints = self.bound.shape[0] - - def get_value(self, env): - c_value = np.abs(self.constraint_filter @ env.state) - self.bound - return c_value - - # TODO: temp addition - def check_tolerance_shape(self): - """Note we compare tolerance shape to bound shape (instead of num_constraints), since - num_constraints will be set as 2x due to subclassing BoundedConstraint, - it will be overwritten at the end of __init__ to the correct shape. - """ - if self.tolerance is not None and len(self.tolerance) != len(self.bound): - raise ValueError( - "[ERROR] the tolerance dimension does not match the number of constraints." - ) - - -def get_symbolic_constraint_models(constraint_list): - """Create list of symbolic models from list of constraints.""" - symbolic_models = [con.get_symbolic_model() for con in constraint_list] - return symbolic_models - - class ConstraintList: """Collates a (ordered) list of constraints.""" - def __init__(self, constraints): + def __init__(self, constraints: list[Constraint]): """Initialize the constraint list. Args: - constraints: The list of constraints. - + constraints: A list of constraints. """ self.constraints = constraints - self.constraint_lengths = [con.num_constraints for con in self.constraints] + constraint_lengths = [con.n_constraints for con in self.constraints] # 1st constraint is always index 0, hence ignored - self.constraint_indices = np.cumsum(self.constraint_lengths[:-1]) - self.num_constraints = sum(self.constraint_lengths) - # constraint subsets + self.constraint_indices = np.cumsum(constraint_lengths[:-1]) + self.n_constraints = sum(constraint_lengths) + # Constraint subsets self.state_constraints = [ - con - for con in self.constraints - if con.constrained_variable == ConstrainedVariableType.STATE + con for con in self.constraints if con.ctype == ConstrainedVariableType.STATE ] - self.num_state_constraints = sum([con.num_constraints for con in self.state_constraints]) self.input_constraints = [ - con - for con in self.constraints - if con.constrained_variable == ConstrainedVariableType.INPUT + con for con in self.constraints if con.ctype == ConstrainedVariableType.INPUT ] - self.num_input_constraints = sum([con.num_constraints for con in self.input_constraints]) self.input_state_constraints = [ - con - for con in self.constraints - if con.constrained_variable == ConstrainedVariableType.INPUT_AND_STATE + con for con in self.constraints if con.ctype == ConstrainedVariableType.INPUT_AND_STATE ] - self.num_input_state_constraints = sum( - [con.num_constraints for con in self.input_state_constraints] - ) - def __len__(self): - """Gets the constraint list length. - - Returns: - int: The number of constraints in the list. - - """ + def __len__(self) -> int: + """Get the constraint list length.""" return len(self.constraints) - def get_all_symbolic_models(self): + def symbolic_model( + self, state_models: bool = True, input_models: bool = True + ) -> list[Callable]: """Return all the symbolic models the constraints.""" - return get_symbolic_constraint_models(self.constraints) - - def get_state_constraint_symbolic_models(self): - """Return only the constraints that act on the state.""" - return get_symbolic_constraint_models(self.state_constraints) - - def get_input_constraint_symbolic_models(self): - """Return only the constraints that act on the input.""" - return get_symbolic_constraint_models(self.input_constraints) - - def get_input_and_state_constraint_symbolic_models(self): - """Return only the constraints that act on both state and inputs simultaneously.""" - return get_symbolic_constraint_models(self.input_state_constraints) - - def get_stacked_symbolic_model(self, env): - """Gets the symbolic form of all constraints. - - Args: - env: The environment to constrain. - - Returns: - obj: The symbolic form of the constraint. - - """ - symbolic_models = [con.get_symbolic_model() for con in self.constraints] - X = env.symbolic.x_sym - U = env.symbolic.u_sym - stack_c_sym = cs.vertcat(*[func(X, U) for func in symbolic_models]) - sym_func = cs.Function("constraints", [X, U], [stack_c_sym]) - return sym_func - - def get_values(self, env, only_state=False): - """Gets all constraint function values. - - Args: - env: The environment to constrain. - - Returns: - ndarray: An array with the evaluation of each constraint. - - """ - if only_state: - con_values = np.concatenate([con.get_value(env) for con in self.state_constraints]) - else: - con_values = np.concatenate([con.get_value(env) for con in self.constraints]) - return con_values - - def get_violations(self, env, only_state=False): - """Gets all constraint violations. - - Args: - env: The environment to constrain. - - Returns: - list: A list of booleans saying whether each constraint was violated. - - """ + assert state_models or input_models, "Select at least one of state_models or input_models." + if state_models and input_models: + return [con.symbolic_model() for con in self.constraints] + if state_models: + return [con.symbolic_model() for con in self.state_constraints] + return [con.symbolic_model() for con in self.input_constraints] + + def value( + self, + state: npt.NDArray[np.float64] | None = None, + input: npt.NDArray[np.float64] | None = None, + only_state: bool = False, + ): + """Get all constraint function values.""" + if self.n_constraints == 0: + return np.array([]) if only_state: - flags = [con.is_violated(env) for con in self.state_constraints] - else: - flags = [con.is_violated(env) for con in self.constraints] - return flags - - def is_violated(self, env, c_value=None): - """Checks if any of the constraints is violated. - - Args: - env: The environment to constrain. - c_value: an already calculated constraint value (no need to recompute). - - Returns: - bool: A boolean flag if any constraint is violeted. + return np.concatenate([con.value(state) for con in self.state_constraints]) + return np.concatenate([con.value(state, input) for con in self.constraints]) - """ + def is_violated( + self, + state: npt.NDArray[np.float64] | None = None, + input: npt.NDArray[np.float64] | None = None, + c_value=None, + ): + """Check if any of the constraints is violated.""" if c_value is not None: - c_value_splits = np.split(c_value, self.constraint_indices) - flag = any( - [ - con.is_violated(env, c_value=c_value_split) - for con, c_value_split in zip(self.constraints, c_value_splits) - ] + splits = np.split(c_value, self.constraint_indices) + return any( + con.is_violated(state, input, c_value=split) + for con, split in zip(self.constraints, splits) ) - else: - flag = any([con.is_violated(env) for con in self.constraints]) - return flag - - def is_almost_active(self, env, c_value=None): - """Checks if constraint is nearly violated. + return any(con.is_violated(state, input) for con in self.constraints) - This is checked by using a slack variable (from init args) and can be used - for reward shaping/constraint penalty in RL methods. - - """ + def is_almost_active( + self, + state: npt.NDArray[np.float64] | None = None, + input: npt.NDArray[np.float64] | None = None, + c_value=None, + ): + """Check if constraint is nearly violated.""" if c_value is not None: - c_value_splits = np.split(c_value, self.constraint_indices) - flag = any( - [ - con.is_almost_active(env, c_value=c_value_split) - for con, c_value_split in zip(self.constraints, c_value_splits) - ] + splits = np.split(c_value, self.constraint_indices) + return any( + con.is_almost_active(state, input, c_value=split) + for con, split in zip(self.constraints, splits) ) - else: - flag = any([con.is_almost_active(env) for con in self.constraints]) - return flag - - -GENERAL_CONSTRAINTS = { - "linear_constraint": LinearConstraint, - "quadratic_constraint": QuadraticContstraint, - "bounded_constraint": BoundedConstraint, - "default_constraint": DefaultConstraint, -} - + return any(con.is_almost_active(state, input) for con in self.constraints) -def create_constraint_list(constraint_specs, available_constraints, env): - """Creates a ConstraintList from yaml constraint specification. + @staticmethod + def from_specs( + state_space: Space, action_space: Space, constraint_specs: list[dict] + ) -> ConstraintList: + """Creates a ConstraintList from constraint specification. - Args: - constraint_specs (list): List of dicts defining the constraints info. - available_constraints (dict): Dict of the constraints that are available - env (BenchmarkEnv): Env for which the constraints will be applied - """ - constraint_list = [] - for constraint in constraint_specs: - assert isinstance(constraint, dict), "[ERROR]: Each constraint must be specified as a dict." - assert ( - "constraint_form" in constraint.keys() - ), "[ERROR]: Each constraint must have a key 'constraint_form'" - con_form = constraint["constraint_form"] - assert ( - con_form in available_constraints - ), "[ERROR]. constraint not in list of available constraints" - con_class = available_constraints[con_form] - cfg = {key: constraint[key] for key in constraint if key != "constraint_form"} - constraint_list.append(con_class(env, **cfg)) - return ConstraintList(constraint_list) + Args: + state_space: System state space. + input_space: System input space. + constraint_specs: List of dicts defining the constraints info. + """ + constraint_list = [] + for constraint in constraint_specs: + assert isinstance(constraint, dict), "Each constraint must be specified as a dict." + assert "type" in constraint.keys(), "Each constraint must have a 'type' key" + c_class = getattr(sys.modules[__name__], constraint["type"]) + kwargs = {key: constraint[key] for key in constraint if key != "type"} + constraint_list.append(c_class(state_space, action_space, **kwargs)) + return ConstraintList(constraint_list) diff --git a/safe_control_gym/envs/drone.py b/safe_control_gym/envs/drone.py index 539e04861..7b8c26cad 100644 --- a/safe_control_gym/envs/drone.py +++ b/safe_control_gym/envs/drone.py @@ -406,10 +406,15 @@ class DroneParams: pwm2rpm_const: float min_pwm: float max_pwm: float - J_inv: npt.NDArray[np.float64] = np.zeros((3, 3)) # Is calculated in __post_init__ + # Defaults are calculated in __post_init__ according to the other parameters + min_thrust: float = 0.0 + max_thrust: float = 0.0 + J_inv: npt.NDArray[np.float64] = np.zeros((3, 3)) def __post_init__(self): self.J_inv = np.linalg.inv(self.J) + self.min_thrust = self.kf * (self.pwm2rpm_scale * self.min_pwm + self.pwm2rpm_const) ** 2 + self.max_thrust = self.kf * (self.pwm2rpm_scale * self.max_pwm + self.pwm2rpm_const) ** 2 @staticmethod def from_urdf(path: Path) -> DroneParams: diff --git a/safe_control_gym/envs/drone_sim.py b/safe_control_gym/envs/drone_sim.py new file mode 100644 index 000000000..6b8fffd53 --- /dev/null +++ b/safe_control_gym/envs/drone_sim.py @@ -0,0 +1,189 @@ +"""Base environment class module. + +This module also contains enumerations for cost functions, tasks, disturbances, and quadrotor types. + +""" + +from __future__ import annotations + +from dataclasses import dataclass +import numpy as np +import gymnasium +from gymnasium import spaces +from gymnasium.utils import seeding +import copy +import math + +from safe_control_gym.envs.constraints import ConstraintList +from safe_control_gym.envs.disturbances import create_disturbance_list +from safe_control_gym.envs.drone import Drone + + +@dataclass +class SimSettings: + """Simulation settings dataclass.""" + + sim_freq: int = 500 + ctrl_freq: int = 500 + gui: bool = False + + def __post_init__(self): + assert self.sim_freq % self.ctrl_freq == 0, "sim_freq must be divisible by ctrl_freq." + + +class DroneSim(gymnasium.Env): + """Drone simulation based on gym-pybullet-drones.""" + + def __init__( + self, + sim_freq: int = 500, + ctrl_freq: int = 500, + constraints=None, + disturbances=None, + gui: bool = False, + seed: int | None = None, + reseed: bool = False, + ): + """Initialization method for BenchmarkEnv. + + Args: + gui: Option to show PyBullet's GUI. + sim_freq: The frequency at which PyBullet steps (a multiple of ctrl_freq). + ctrl_freq: The frequency at which the environment steps. + constraints: Dictionary to specify the constraints being used. + disturbances: Dictionary to specify disturbances being used. + seed: Seed for the random number generator. + reseed: Option to reseed the environment on each reset with the same seed. + """ + self.sim_settings = SimSettings(sim_freq, ctrl_freq, gui) + self.drone = Drone(controller="mellinger") + + # Create action, observation and state spaces. + min_thrust, max_thrust = self.drone.params.min_thrust, self.drone.params.max_thrust + self.action_space = spaces.Box(low=min_thrust, high=max_thrust, shape=(4,)) + + # pos: x, y, z in meters + # rpy: roll, pitch, yaw in radians + # vel: vx, vy, vz in m/s + # ang_vel: p, q, r in rad/s + rpy_max = np.array([85 / 180 * np.pi, 85 / 180 * np.pi, np.pi], np.float32) # Yaw unbounded + max_flt = np.full(3, np.finfo(np.float32).max, np.float32) + self.observation_space = spaces.Dict( + { + "pos": spaces.Box(low=np.array([-5, -5, 0]), high=np.array([5, 5, 2.5])), + "rpy": spaces.Box(low=-rpy_max, high=rpy_max), + "vel": spaces.Box(low=-max_flt, high=max_flt), + "ang_vel": spaces.Box(low=-max_flt, high=max_flt), + } + ) + self.state_space = self.observation_space + self.constraints = ConstraintList.from_specs( + self.state_space, self.action_space, constraints + ) + # Set disturbance info. + self._disturbance_config = disturbances + self.disturbances = self._setup_disturbances(disturbances) + # Default seed None means pure randomness/no seeding. + self.seed(seed) + # IROS 2022 - Save random seed for re-seeding. + self._seed = seed + self._reseed_on_reset = reseed + + def seed(self, seed=None): + """Set up a random number generator for a given seed. + + Remember to seed all random generators, currently in + - env + - action_space + - disturbances + + """ + self.np_random, seed = seeding.np_random(seed) + self.action_space.seed(seed) + for d in self.disturbances.values(): + d.seed(self) + return [seed] + + def _randomize_values_by_info(self, original_values, randomization_info): + """Randomizes a list of values according to desired distributions. + + Args: + original_values (dict): a dict of orginal values. + randomization_info (dict): A dictionary containing information about the distributions + used to randomize original_values. + + Returns: + dict: A dict of randomized values. + + """ + # Start from a copy of the original values. + randomized_values = copy.deepcopy(original_values) + # Copy the info dict to parse it with "pop". + rand_info_copy = copy.deepcopy(randomization_info) + # Randomized and replace values for which randomization info are given. + for key in original_values: + if key in rand_info_copy: + # Get distribution removing it from info dict. + distrib = getattr(self.np_random, rand_info_copy[key].pop("distrib")) + # Pop positional args. + d_args = rand_info_copy[key].pop("args", []) + # Keyword args are just anything left. + d_kwargs = rand_info_copy[key] + # Randomize (adding to the original values). + randomized_values[key] += distrib(*d_args, **d_kwargs) + return randomized_values + + def _setup_disturbances(self, disturbances: dict | None = None): + """Creates attributes and action spaces for the disturbances.""" + # Default: no passive disturbances. + if disturbances is None: + return + dist = {} + modes = { + "observation": {"dim": spaces.flatdim(self.observation_space)}, + "action": {"dim": spaces.flatdim(self.action_space)}, + "dynamics": {"dim": 3}, + } + for mode, spec in disturbances.items(): + assert mode in modes, "Disturbance mode not available." + dist[mode] = create_disturbance_list(spec, modes[mode], self) + return dist + + @property + def n_constraints(self): + return 0 if self.constraints is None else self.constraints.n_constraints + + def before_reset(self): + """Pre-processing before calling `.reset()`.""" + if self._reseed_on_reset: # IROS 2022 - Re-seed on reset. + self.seed(self._seed) + + # Housekeeping variables. + self.pyb_step_counter = 0 + self.ctrl_step_counter = 0 + self.current_raw_input_action = None + self.current_preprocessed_action = None + # Reset the disturbances. + for mode in self.disturbances.keys(): + self.disturbances[mode].reset(self) + + def after_step(self, obs, rew, done, info): + """Post-processing after calling `.step()`.""" + # Increment counters + self.pyb_step_counter += self.sim_settings.sim_freq // self.sim_settings.ctrl_freq + self.ctrl_step_counter += 1 + + # Terminate when (any) constraint is violated. + # here we cache the constraint values `c_value`, so we only evaluate the constraints once, + # but use it in 1) info dict; 2) check constraint violation; 3) check near constraint violation/almost active. + # it also allows stateful constraint, where evaluation/update should only be done once per time step. + c_value = None + if self.constraints is not None: + c_value = self.constraints.value(self.state, self.current_raw_input_action) + info["constraint_values"] = c_value + info["constraint_violation"] = self.constraints.is_violated( + self.state, self.current_raw_input_action, c_value=c_value + ) + # Terminate when reaching time limit, + # but distinguish between done due to true termination or time limit reached + return obs, rew, done, info diff --git a/safe_control_gym/envs/firmware_wrapper.py b/safe_control_gym/envs/firmware_wrapper.py index 6803459c3..1e147876b 100644 --- a/safe_control_gym/envs/firmware_wrapper.py +++ b/safe_control_gym/envs/firmware_wrapper.py @@ -51,7 +51,11 @@ def __init__(self, env_func: Callable[[], Quadrotor], firmware_freq: int, ctrl_f def reset(self): obs, info = self.env.reset() - self.drone.reset(obs[[0, 2, 4]], obs[[6, 7, 8]], obs[[1, 3, 5]]) + pos, vel, rpy = obs["pos"], obs["vel"], obs["rpy"] + self.drone.reset(pos, rpy, vel) + obs = np.concatenate( + [np.array([pos[0], vel[0], pos[1], vel[1], pos[2], vel[2]]), rpy, obs["ang_vel"]] + ) if self.env.n_drones > 1: raise NotImplementedError("Firmware wrapper does not support multiple drones.") return obs, info @@ -75,9 +79,10 @@ def step(self, sim_time: float, action: np.ndarray): """ while self.drone.tick / self.drone.firmware_freq < sim_time + 1 / self.step_freq: obs, reward, done, info = self.env.step(action) - pos = obs[[0, 2, 4]] - vel = obs[[1, 3, 5]] - rpy = obs[[6, 7, 8]] + pos, vel, rpy = obs["pos"], obs["vel"], obs["rpy"] + obs = np.concatenate( + [np.array([pos[0], vel[0], pos[1], vel[1], pos[2], vel[2]]), rpy, obs["ang_vel"]] + ) action = self.drone.step_controller(pos, rpy, vel)[::-1] return obs, reward, done, info, action diff --git a/safe_control_gym/envs/quadrotor.py b/safe_control_gym/envs/quadrotor.py index 0d4289c3a..ad2e795f0 100644 --- a/safe_control_gym/envs/quadrotor.py +++ b/safe_control_gym/envs/quadrotor.py @@ -12,11 +12,11 @@ import numpy as np import pybullet as p -from safe_control_gym.envs.constraints import GENERAL_CONSTRAINTS from safe_control_gym.math_and_models.symbolic_systems import SymbolicModel from safe_control_gym.envs.base_aviary import BaseAviary from safe_control_gym.envs.quadrotor_utils import QuadType, cmd2pwm, pwm2rpm from safe_control_gym.math_and_models.transformations import csRotXYZ +from gymnasium import spaces logger = logging.getLogger(__name__) @@ -29,77 +29,13 @@ class Quadrotor(BaseAviary): """ - NAME = "quadrotor" - AVAILABLE_CONSTRAINTS = deepcopy(GENERAL_CONSTRAINTS) - - DISTURBANCE_MODES = { # Set at runtime by QUAD_TYPE - "observation": {"dim": -1}, - "action": {"dim": -1}, - "dynamics": {"dim": -1}, - } - - INERTIAL_PROP_RAND_INFO = { - "M": { # Nominal: 0.027 - "distrib": "uniform", - "low": 0.022, - "high": 0.032, - }, - "Ixx": { # Nominal: 1.4e-5 - "distrib": "uniform", - "low": 1.3e-5, - "high": 1.5e-5, - }, - "Iyy": { # Nominal: 1.4e-5 - "distrib": "uniform", - "low": 1.3e-5, - "high": 1.5e-5, - }, - "Izz": { # Nominal: 2.17e-5 - "distrib": "uniform", - "low": 2.07e-5, - "high": 2.27e-5, - }, - } GATE_Z_LOW = 0.525 GATE_Z_HIGH = 1.0 OBSTACLE_Z = 1.05 - INIT_STATE_RAND_INFO = { - "init_x": {"distrib": "uniform", "low": -0.5, "high": 0.5}, - "init_x_dot": {"distrib": "uniform", "low": -0.01, "high": 0.01}, - "init_y": {"distrib": "uniform", "low": -0.5, "high": 0.5}, - "init_y_dot": {"distrib": "uniform", "low": -0.01, "high": 0.01}, - "init_z": {"distrib": "uniform", "low": 0.1, "high": 1.5}, - "init_z_dot": {"distrib": "uniform", "low": -0.01, "high": 0.01}, - "init_phi": {"distrib": "uniform", "low": -0.3, "high": 0.3}, - "init_theta": {"distrib": "uniform", "low": -0.3, "high": 0.3}, - "init_psi": {"distrib": "uniform", "low": -0.3, "high": 0.3}, - "init_p": {"distrib": "uniform", "low": -0.01, "high": 0.01}, - "init_theta_dot": { # Only used in 2D quad. - "distrib": "uniform", - "low": -0.01, - "high": 0.01, - }, - "init_q": {"distrib": "uniform", "low": -0.01, "high": 0.01}, - "init_r": {"distrib": "uniform", "low": -0.01, "high": 0.01}, - } - - TASK_INFO = { - "stabilization_goal": [0, 1], - "stabilization_goal_tolerance": 0.05, - "trajectory_type": "circle", - "num_cycles": 1, - "trajectory_plane": "zx", - "trajectory_position_offset": [0.5, 0], - "trajectory_scale": -0.5, - "proj_point": [0, 0, 0.5], - "proj_normal": [0, 1, 1], - } - def __init__( self, init_state=None, - inertial_prop=None, # custom args quad_type: QuadType = QuadType.TWO_D, norm_act_scale=0.1, @@ -108,14 +44,12 @@ def __init__( rew_act_weight=0.0001, rew_exponential=True, done_on_out_of_bound=True, - info_mse_metric_state_weight=None, **kwargs, ): """Initialize a quadrotor environment. Args: init_state (ndarray, optional): The initial state of the environment, (z, z_dot) or (x, x_dot, z, z_dot theta, theta_dot). - inertial_prop (ndarray, optional): The inertial properties of the environment (mass, Iyy). quad_type (QuadType, optional): The choice of motion type (1D along z or 2D in the x-z plane). norm_act_scale (float): scaling the [-1,1] action space around hover thrust when `normalized_action_space` is True. obs_goal_horizon (int): how many future goal states to append to obervation. @@ -123,9 +57,16 @@ def __init__( rew_act_weight (list/ndarray): quadratic weights for action in rl reward. rew_exponential (bool): if to exponentiate negative quadratic cost to positive, bounded [0,1] reward. done_on_out_of_bound (bool): if to termiante when state is out of bound. - info_mse_metric_state_weight (list/ndarray): quadratic weights for state in mse calculation for info dict. """ + self._init_state = init_state + self._init_state_randomization = kwargs["init_state_randomization_info"] + self._enable_init_state_randomization = kwargs["randomized_init"] + + self._inertial = None + self._inertial_randomization = kwargs["inertial_prop_randomization_info"] + self._enable_inertial_randomization = kwargs["randomized_inertial_prop"] + # Select the 1D (moving along z) or 2D (moving in the xz plane) quadrotor. self.QUAD_TYPE = QuadType(quad_type) self.norm_act_scale = norm_act_scale @@ -135,58 +76,35 @@ def __init__( self.rew_exponential = rew_exponential self.done_on_out_of_bound = done_on_out_of_bound - self.info_mse_metric_state_weight = np.array( - [1, 0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0], ndmin=1, dtype=float - ) - # BaseAviary constructor, called after defining the custom args, # since some BenchmarkEnv init setup can be task(custom args)-dependent. - super().__init__(init_state=init_state, inertial_prop=inertial_prop, **kwargs) + super().__init__(init_state=init_state, **kwargs) # Store initial state info. - self.INIT_STATE_LABELS = { - QuadType.THREE_D: [ - "init_x", - "init_x_dot", - "init_y", - "init_y_dot", - "init_z", - "init_z_dot", - "init_phi", - "init_theta", - "init_psi", - "init_p", - "init_q", - "init_r", - ], - } + self.INIT_STATE_LABELS = [ + "init_x", + "init_x_dot", + "init_y", + "init_y_dot", + "init_z", + "init_z_dot", + "init_phi", + "init_theta", + "init_psi", + "init_p", + "init_q", + "init_r", + ] + assert isinstance(init_state, dict), "Expected init_state as dictionary." - for init_name in self.INIT_STATE_LABELS[self.QUAD_TYPE]: + for init_name in self.INIT_STATE_LABELS: self.__dict__[init_name.upper()] = init_state.get(init_name, 0.0) - # Remove randomization info of initial state components inconsistent with quad type. - for init_name in list(self.INIT_STATE_RAND_INFO.keys()): - if init_name not in self.INIT_STATE_LABELS[self.QUAD_TYPE]: - self.INIT_STATE_RAND_INFO.pop(init_name, None) - - # Override inertial properties of passed as arguments. - if inertial_prop is None: - pass - elif self.QUAD_TYPE == QuadType.THREE_D and np.array(inertial_prop).shape == (4,): - self.MASS, self.J[0, 0], self.J[1, 1], self.J[2, 2] = inertial_prop - elif isinstance(inertial_prop, dict): - self.MASS = inertial_prop.get("M", self.MASS) - self.J[0, 0] = inertial_prop.get("Ixx", self.J[0, 0]) - self.J[1, 1] = inertial_prop.get("Iyy", self.J[1, 1]) - self.J[2, 2] = inertial_prop.get("Izz", self.J[2, 2]) - else: - raise ValueError("Inertial_prop incorrect format.") - # Set prior/symbolic info. self._setup_symbolic() # Equilibrium point at hover for linearization. - self.X_EQ = np.zeros(self.state_space.shape[0]) + self.X_EQ = np.zeros(spaces.flatdim(self.state_space)) # IROS 2022 - Load maze. self.OBSTACLES = [] @@ -318,7 +236,7 @@ def reset(self): # Choose randomized or deterministic inertial properties. prop_values = { - "M": self.MASS, + "M": self.drone.params.mass, "Ixx": self.J[0, 0], "Iyy": self.J[1, 1], "Izz": self.J[2, 2], @@ -343,8 +261,7 @@ def reset(self): # Randomize initial state. init_values = { - init_name: self.__dict__[init_name.upper()] - for init_name in self.INIT_STATE_LABELS[self.QUAD_TYPE] + init_name: self.__dict__[init_name.upper()] for init_name in self.INIT_STATE_LABELS } if self._enable_init_state_randomization: init_values = self._randomize_values_by_info( @@ -353,10 +270,7 @@ def reset(self): INIT_XYZ = [init_values.get("init_" + k, 0.0) for k in ["x", "y", "z"]] INIT_VEL = [init_values.get("init_" + k + "_dot", 0.0) for k in ["x", "y", "z"]] INIT_RPY = [init_values.get("init_" + k, 0.0) for k in ["phi", "theta", "psi"]] - if self.QUAD_TYPE == QuadType.TWO_D: - INIT_ANG_VEL = [0, init_values.get("init_theta_dot", 0.0), 0] - else: - INIT_ANG_VEL = [init_values.get("init_" + k, 0.0) for k in ["p", "q", "r"]] + INIT_ANG_VEL = [init_values.get("init_" + k, 0.0) for k in ["p", "q", "r"]] p.resetBasePositionAndOrientation( self.DRONE_IDS[0], INIT_XYZ, @@ -370,7 +284,6 @@ def reset(self): # Update BaseAviary internal variables before calling self._get_observation(). self._update_and_store_kinematic_information() obs, info = self._get_observation(), self._get_reset_info() - obs, info = super().after_reset(obs, info) # Return either an observation and dictionary or just the observation. return obs, info @@ -392,7 +305,8 @@ def step(self, action): """ # Get the preprocessed rpm for each motor - rpm = super().before_step(action) + self.current_raw_input_action = action + rpm = self._preprocess_control(action) # Pre-process/clip the action # Determine disturbance force. disturb_force = None @@ -435,7 +349,6 @@ def render(self, mode="human"): flags=p.ER_SEGMENTATION_MASK_OBJECT_AND_LINKINDEX, physicsClientId=self.PYB_CLIENT, ) - # Image.fromarray(np.reshape(rgb, (h, w, 4)), 'RGBA').show() return np.reshape(rgb, (h, w, 4)) def _setup_symbolic(self): @@ -445,7 +358,7 @@ def _setup_symbolic(self): SymbolicModel: CasADi symbolic model of the environment. """ - m, g, l = self.MASS, self.GRAVITY_ACC, self.L + m, g, l = self.drone.params.mass, self.GRAVITY_ACC, self.L Iyy = self.J[1, 1] dt = 1 / self.sim_settings.ctrl_freq # Define states. @@ -577,6 +490,7 @@ def _get_observation(self): obs = deepcopy(self.state) if "observation" in self.disturbances: obs = self.disturbances["observation"].apply(obs, self) + return {"pos": pos, "rpy": rpy, "vel": vel, "ang_vel": ang_v_body_frame} return obs def _get_reward(self): @@ -729,20 +643,19 @@ def _get_reset_info(self): info = {} info["symbolic_model"] = self.symbolic info["nominal_physical_parameters"] = { - "quadrotor_mass": self.MASS, + "quadrotor_mass": self.drone.params.mass, "quadrotor_ixx_inertia": self.J[0, 0], "quadrotor_iyy_inertia": self.J[1, 1], "quadrotor_izz_inertia": self.J[2, 2], } if self.constraints is not None: - info["symbolic_constraints"] = self.constraints.get_all_symbolic_models() + info["symbolic_constraints"] = self.constraints.symbolic_model() else: info["symbolic_constraints"] = {} # IROS 2022 - Reset info. info["ctrl_timestep"] = 1 / self.sim_settings.ctrl_freq info["ctrl_freq"] = self.sim_settings.ctrl_freq - # info["episode_len_sec"] = self.EPISODE_LEN_SEC info["quadrotor_kf"] = self.KF info["quadrotor_km"] = self.KM info["gate_dimensions"] = { @@ -772,6 +685,7 @@ def _get_reset_info(self): info["disturbances"] = self._disturbance_config info["pyb_client"] = self.PYB_CLIENT info["urdf_dir"] = self.URDF_DIR + info["constraint_values"] = self.constraints.value(self.state, only_state=True) info.update(self._get_info()) return info