diff --git a/f1tenth_gym/envs/dynamic_models/__init__.py b/f1tenth_gym/envs/dynamic_models/__init__.py index 9837f013..bc011941 100644 --- a/f1tenth_gym/envs/dynamic_models/__init__.py +++ b/f1tenth_gym/envs/dynamic_models/__init__.py @@ -11,7 +11,7 @@ from .single_track import vehicle_dynamics_st, get_standardized_state_st from .multi_body import init_mb, vehicle_dynamics_mb, get_standardized_state_mb from .utils import pid_steer, pid_accl - +from typing import Optional class DynamicModel(Enum): KS = 1 # Kinematic Single Track @@ -32,7 +32,7 @@ def from_string(model: str): else: raise ValueError(f"Unknown model type {model}") - def get_initial_state(self, pose=None, params: dict | None = None): + def get_initial_state(self, pose=None, params: Optional[dict] = None): # Assert that if self is MB, params is not None if self == DynamicModel.MB and params is None: raise ValueError("MultiBody model requires parameters to be provided.") diff --git a/f1tenth_gym/envs/dynamic_models/single_track.py b/f1tenth_gym/envs/dynamic_models/single_track.py index 522a57be..7a1e3516 100644 --- a/f1tenth_gym/envs/dynamic_models/single_track.py +++ b/f1tenth_gym/envs/dynamic_models/single_track.py @@ -83,7 +83,7 @@ def vehicle_dynamics_st(x: np.ndarray, u_init: np.ndarray, params: dict): ACCL = u[1] # switch to kinematic model for small velocities - if V < 0.1: + if V < 0.5: # wheelbase lwb = params["lf"] + params["lr"] BETA_HAT = np.arctan(np.tan(DELTA) * params["lr"] / lwb) diff --git a/f1tenth_gym/envs/f110_env.py b/f1tenth_gym/envs/f110_env.py index 9dd7174b..fe78a918 100644 --- a/f1tenth_gym/envs/f110_env.py +++ b/f1tenth_gym/envs/f110_env.py @@ -324,6 +324,30 @@ def fullscale_vehicle_params(cls) -> dict: } return params + @classmethod + def f1fifth_vehicle_params(cls) -> dict: + params = { + "mu": 1.1, + "C_Sf": 1.3507, + "C_Sr": 1.3507, + "lf": 0.2725, + "lr": 0.2585, + "h": 0.1825, + "m": 15.32, + "I": 0.64332, + "s_min": -0.4189, + "s_max": 0.4189, + "sv_min": -3.2, + "sv_max": 3.2, + "v_switch": 7.319, + "a_max": 9.51, + "v_min": -5.0, + "v_max": 20.0, + "width": 0.55, + "length": 0.8, + } + return params + @classmethod def f1tenth_vehicle_params(cls) -> dict: params = { diff --git a/f1tenth_gym/envs/rendering/__init__.py b/f1tenth_gym/envs/rendering/__init__.py index 0e52410f..2325a937 100644 --- a/f1tenth_gym/envs/rendering/__init__.py +++ b/f1tenth_gym/envs/rendering/__init__.py @@ -35,7 +35,8 @@ def make_renderer( if render_spec.render_type == "pygame": from .rendering_pygame import PygameEnvRenderer as EnvRenderer elif render_spec.render_type == "pyqt6": - os.environ["QT_QPA_PLATFORM"] = "offscreen" + if render_mode in ["rgb_array", "rgb_array_list"]: + os.environ["QT_QPA_PLATFORM"] = "offscreen" from .rendering_pyqt import PyQtEnvRenderer as EnvRenderer else: raise ValueError(f"Unknown render type: {render_spec.render_type}") diff --git a/f1tenth_gym/envs/reset/__init__.py b/f1tenth_gym/envs/reset/__init__.py index bae55b2f..c1a80191 100644 --- a/f1tenth_gym/envs/reset/__init__.py +++ b/f1tenth_gym/envs/reset/__init__.py @@ -1,5 +1,6 @@ from __future__ import annotations from .masked_reset import GridResetFn, AllTrackResetFn +from .map_reset import AllMapResetFn from .reset_fn import ResetFn from ..track import Track @@ -10,6 +11,12 @@ def make_reset_fn(type: str | None, track: Track, num_agents: int, **kwargs) -> try: refline_token, reset_token, shuffle_token = type.split("_") + if refline_token == "map": + reset_fn = {"random": AllMapResetFn}[reset_token] + shuffle = {"static": False, "random": True}[shuffle_token] + return reset_fn(track=track, num_agents=num_agents, shuffle=shuffle, **kwargs) + + # "cl" or "rl" refline = {"cl": track.centerline, "rl": track.raceline}[refline_token] reset_fn = {"grid": GridResetFn, "random": AllTrackResetFn}[reset_token] shuffle = {"static": False, "random": True}[shuffle_token] diff --git a/f1tenth_gym/envs/reset/map_reset.py b/f1tenth_gym/envs/reset/map_reset.py new file mode 100644 index 00000000..f92ab4de --- /dev/null +++ b/f1tenth_gym/envs/reset/map_reset.py @@ -0,0 +1,85 @@ +from abc import abstractmethod + +import cv2 +import numpy as np + +from .reset_fn import ResetFn +from .utils import sample_around_pose +from ..track import Track + + +class MapResetFn(ResetFn): + @abstractmethod + def get_mask(self) -> np.ndarray: + pass + + def __init__( + self, + track: Track, + num_agents: int, + move_laterally: bool, + min_dist: float, + max_dist: float, + ): + self.track = track + self.n_agents = num_agents + self.min_dist = min_dist + self.max_dist = max_dist + self.move_laterally = move_laterally + # Mask is a 2D array of booleans of where the agents can be placed + # Should acount for max_dist from obstacles + self.mask = self.get_mask() + + + def sample(self) -> np.ndarray: + # Random ample an x-y position from the mask + valid_x, valid_y = np.where(self.mask) + idx = np.random.choice(len(valid_x)) + pose_x = valid_x[idx] * self.track.spec.resolution + self.track.spec.origin[0] + pose_y = valid_y[idx] * self.track.spec.resolution + self.track.spec.origin[1] + pose_theta = np.random.uniform(-np.pi, np.pi) + pose = np.array([pose_x, pose_y, pose_theta]) + + poses = sample_around_pose( + pose=pose, + n_agents=self.n_agents, + min_dist=self.min_dist, + max_dist=self.max_dist, + ) + return poses + +class AllMapResetFn(MapResetFn): + def __init__( + self, + track: Track, + num_agents: int, + move_laterally: bool = True, + shuffle: bool = True, + min_dist: float = 0.5, + max_dist: float = 1.0, + ): + super().__init__( + track=track, + num_agents=num_agents, + move_laterally=move_laterally, + min_dist=min_dist, + max_dist=max_dist, + ) + self.shuffle = shuffle + + def get_mask(self) -> np.ndarray: + # Create mask from occupancy grid enlarged by max_dist + dilation_size = int(self.max_dist / self.track.spec.resolution) + kernel = np.ones((dilation_size, dilation_size), np.uint8) + inverted_occ_map = (255 - self.track.occupancy_map) + dilated = cv2.dilate(inverted_occ_map, kernel, iterations=1) + dilated_inverted = (255 - dilated) + return dilated_inverted == 255 + + def sample(self) -> np.ndarray: + poses = super().sample() + + if self.shuffle: + np.random.shuffle(poses) + + return poses diff --git a/f1tenth_gym/envs/reset/utils.py b/f1tenth_gym/envs/reset/utils.py index 74846cf8..9571d195 100644 --- a/f1tenth_gym/envs/reset/utils.py +++ b/f1tenth_gym/envs/reset/utils.py @@ -83,3 +83,36 @@ def sample_around_waypoint( ) return np.array(poses) + +def sample_around_pose( + pose: np.ndarray, + n_agents: int, + min_dist: float, + max_dist: float, +) -> np.ndarray: + """ + Compute n poses around a given pose. + It iteratively samples the next agent within a distance range from the previous one. + Note: no guarantee that the agents are on the track nor that they are not colliding with the environment. + + Args: + - pose: the initial pose + - n_agents: the number of agents + - min_dist: the minimum distance between two consecutive agents + - max_dist: the maximum distance between two consecutive agents + """ + current_pose = pose + + poses = [] + for i in range(n_agents): + x, y, theta = current_pose + pose = np.array([x, y, theta]) + poses.append(pose) + # sample next pose + dist = np.random.uniform(min_dist, max_dist) + theta = np.random.uniform(-np.pi, np.pi) + x += dist * np.cos(theta) + y += dist * np.sin(theta) + current_pose = np.array([x, y, theta]) + + return np.array(poses) \ No newline at end of file diff --git a/f1tenth_gym/envs/track/track.py b/f1tenth_gym/envs/track/track.py index f95bda3d..6e338255 100644 --- a/f1tenth_gym/envs/track/track.py +++ b/f1tenth_gym/envs/track/track.py @@ -164,6 +164,72 @@ def from_track_name(track: str, track_scale: float = 1.0) -> Track: print(ex) raise FileNotFoundError(f"It could not load track {track}") from ex + @staticmethod + def from_track_path(path: pathlib.Path, track_scale: float = 1.0) -> Track: + """ + Load track from track path. + + Parameters + ---------- + path : pathlib.Path + path to the track yaml file + + Returns + ------- + Track + track object + + Raises + ------ + FileNotFoundError + if the track cannot be loaded + """ + try: + if type(path) is str: + path = pathlib.Path(path) + + track_spec = Track.load_spec( + track=path.stem, filespec=path + ) + track_spec.resolution = track_spec.resolution * track_scale + track_spec.origin = ( + track_spec.origin[0] * track_scale, + track_spec.origin[1] * track_scale, + track_spec.origin[2], + ) + + # load occupancy grid + # Image path is from path + image name from track_spec + image_path = path.parent / track_spec.image + image = Image.open(image_path).transpose(Transpose.FLIP_TOP_BOTTOM) + occupancy_map = np.array(image).astype(np.float32) + occupancy_map[occupancy_map <= 128] = 0.0 + occupancy_map[occupancy_map > 128] = 255.0 + + # if exists, load centerline + if (path / f"{path.stem}_centerline.csv").exists(): + centerline = Raceline.from_centerline_file(path / f"{path.stem}_centerline.csv") + else: + centerline = None + + # if exists, load raceline + if (path / f"{path.stem}_raceline.csv").exists(): + raceline = Raceline.from_raceline_file(path / f"{path.stem}_raceline.csv") + else: + raceline = centerline + + return Track( + spec=track_spec, + filepath=str(path.absolute()), + ext=image_path.suffix, + occupancy_map=occupancy_map, + centerline=centerline, + raceline=raceline, + ) + except Exception as ex: + print(ex) + raise FileNotFoundError(f"It could not load track {path}") from ex + @staticmethod def from_refline(x: np.ndarray, y: np.ndarray, velx: np.ndarray): """ @@ -249,7 +315,7 @@ def from_refline(x: np.ndarray, y: np.ndarray, velx: np.ndarray): centerline=refline, ) - def frenet_to_cartesian(self, s, ey, ephi): + def frenet_to_cartesian(self, s, ey, ephi, use_raceline=False): """ Convert Frenet coordinates to Cartesian coordinates. @@ -262,8 +328,9 @@ def frenet_to_cartesian(self, s, ey, ephi): y: y-coordinate psi: yaw angle """ - x, y = self.centerline.spline.calc_position(s) - psi = self.centerline.spline.calc_yaw(s) + line = self.raceline if use_raceline else self.centerline + x, y = line.spline.calc_position(s) + psi = line.spline.calc_yaw(s) # Adjust x,y by shifting along the normal vector x -= ey * np.sin(psi) @@ -274,7 +341,7 @@ def frenet_to_cartesian(self, s, ey, ephi): return x, y, psi - def cartesian_to_frenet(self, x, y, phi, s_guess=0): + def cartesian_to_frenet(self, x, y, phi, use_raceline=False, s_guess=0): """ Convert Cartesian coordinates to Frenet coordinates. @@ -287,22 +354,25 @@ def cartesian_to_frenet(self, x, y, phi, s_guess=0): ey: lateral deviation ephi: heading deviation """ - s, ey = self.centerline.spline.calc_arclength_inaccurate(x, y) - if s > self.centerline.spline.s[-1]: + line = self.raceline if use_raceline else self.centerline + s, ey = line.spline.calc_arclength_inaccurate(x, y) + if s > line.spline.s[-1]: # Wrap around - s = s - self.centerline.spline.s[-1] + s = s - line.spline.s[-1] if s < 0: # Negative s means we are behind the start point - s = s + self.centerline.spline.s[-1] + s = s + line.spline.s[-1] # Use the normal to calculate the signed lateral deviation - normal = self.centerline.spline._calc_normal(s) - x_eval, y_eval = self.centerline.spline.calc_position(s) + normal = line.spline._calc_normal(s) + x_eval, y_eval = line.spline.calc_position(s) dx = x - x_eval dy = y - y_eval distance_sign = np.sign(np.dot([dx, dy], normal)) ey = ey * distance_sign - phi = phi - self.centerline.spline.calc_yaw(s) + ephi = phi - line.spline.calc_yaw(s) + # ephi is unbouded, so we need to wrap it to [-pi, pi] + ephi = (ephi + np.pi) % (2 * np.pi) - np.pi - return s, ey, phi + return s, ey, ephi