Skip to content

Commit

Permalink
Merge pull request #142 from f1tenth/dev-dynamics-ros
Browse files Browse the repository at this point in the history
Dev dynamics ros
  • Loading branch information
nandantumu authored Jan 22, 2025
2 parents 967e1aa + 5ba30b4 commit a481224
Show file tree
Hide file tree
Showing 8 changed files with 236 additions and 16 deletions.
4 changes: 2 additions & 2 deletions f1tenth_gym/envs/dynamic_models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from .single_track import vehicle_dynamics_st, get_standardized_state_st
from .multi_body import init_mb, vehicle_dynamics_mb, get_standardized_state_mb
from .utils import pid_steer, pid_accl

from typing import Optional

class DynamicModel(Enum):
KS = 1 # Kinematic Single Track
Expand All @@ -32,7 +32,7 @@ def from_string(model: str):
else:
raise ValueError(f"Unknown model type {model}")

def get_initial_state(self, pose=None, params: dict | None = None):
def get_initial_state(self, pose=None, params: Optional[dict] = None):
# Assert that if self is MB, params is not None
if self == DynamicModel.MB and params is None:
raise ValueError("MultiBody model requires parameters to be provided.")
Expand Down
2 changes: 1 addition & 1 deletion f1tenth_gym/envs/dynamic_models/single_track.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def vehicle_dynamics_st(x: np.ndarray, u_init: np.ndarray, params: dict):
ACCL = u[1]

# switch to kinematic model for small velocities
if V < 0.1:
if V < 0.5:
# wheelbase
lwb = params["lf"] + params["lr"]
BETA_HAT = np.arctan(np.tan(DELTA) * params["lr"] / lwb)
Expand Down
24 changes: 24 additions & 0 deletions f1tenth_gym/envs/f110_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,30 @@ def fullscale_vehicle_params(cls) -> dict:
}
return params

@classmethod
def f1fifth_vehicle_params(cls) -> dict:
params = {
"mu": 1.1,
"C_Sf": 1.3507,
"C_Sr": 1.3507,
"lf": 0.2725,
"lr": 0.2585,
"h": 0.1825,
"m": 15.32,
"I": 0.64332,
"s_min": -0.4189,
"s_max": 0.4189,
"sv_min": -3.2,
"sv_max": 3.2,
"v_switch": 7.319,
"a_max": 9.51,
"v_min": -5.0,
"v_max": 20.0,
"width": 0.55,
"length": 0.8,
}
return params

@classmethod
def f1tenth_vehicle_params(cls) -> dict:
params = {
Expand Down
3 changes: 2 additions & 1 deletion f1tenth_gym/envs/rendering/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ def make_renderer(
if render_spec.render_type == "pygame":
from .rendering_pygame import PygameEnvRenderer as EnvRenderer
elif render_spec.render_type == "pyqt6":
os.environ["QT_QPA_PLATFORM"] = "offscreen"
if render_mode in ["rgb_array", "rgb_array_list"]:
os.environ["QT_QPA_PLATFORM"] = "offscreen"
from .rendering_pyqt import PyQtEnvRenderer as EnvRenderer
else:
raise ValueError(f"Unknown render type: {render_spec.render_type}")
Expand Down
7 changes: 7 additions & 0 deletions f1tenth_gym/envs/reset/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations
from .masked_reset import GridResetFn, AllTrackResetFn
from .map_reset import AllMapResetFn
from .reset_fn import ResetFn
from ..track import Track

Expand All @@ -10,6 +11,12 @@ def make_reset_fn(type: str | None, track: Track, num_agents: int, **kwargs) ->
try:
refline_token, reset_token, shuffle_token = type.split("_")

if refline_token == "map":
reset_fn = {"random": AllMapResetFn}[reset_token]
shuffle = {"static": False, "random": True}[shuffle_token]
return reset_fn(track=track, num_agents=num_agents, shuffle=shuffle, **kwargs)

# "cl" or "rl"
refline = {"cl": track.centerline, "rl": track.raceline}[refline_token]
reset_fn = {"grid": GridResetFn, "random": AllTrackResetFn}[reset_token]
shuffle = {"static": False, "random": True}[shuffle_token]
Expand Down
85 changes: 85 additions & 0 deletions f1tenth_gym/envs/reset/map_reset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
from abc import abstractmethod

import cv2
import numpy as np

from .reset_fn import ResetFn
from .utils import sample_around_pose
from ..track import Track


class MapResetFn(ResetFn):
@abstractmethod
def get_mask(self) -> np.ndarray:
pass

def __init__(
self,
track: Track,
num_agents: int,
move_laterally: bool,
min_dist: float,
max_dist: float,
):
self.track = track
self.n_agents = num_agents
self.min_dist = min_dist
self.max_dist = max_dist
self.move_laterally = move_laterally
# Mask is a 2D array of booleans of where the agents can be placed
# Should acount for max_dist from obstacles
self.mask = self.get_mask()


def sample(self) -> np.ndarray:
# Random ample an x-y position from the mask
valid_x, valid_y = np.where(self.mask)
idx = np.random.choice(len(valid_x))
pose_x = valid_x[idx] * self.track.spec.resolution + self.track.spec.origin[0]
pose_y = valid_y[idx] * self.track.spec.resolution + self.track.spec.origin[1]
pose_theta = np.random.uniform(-np.pi, np.pi)
pose = np.array([pose_x, pose_y, pose_theta])

poses = sample_around_pose(
pose=pose,
n_agents=self.n_agents,
min_dist=self.min_dist,
max_dist=self.max_dist,
)
return poses

class AllMapResetFn(MapResetFn):
def __init__(
self,
track: Track,
num_agents: int,
move_laterally: bool = True,
shuffle: bool = True,
min_dist: float = 0.5,
max_dist: float = 1.0,
):
super().__init__(
track=track,
num_agents=num_agents,
move_laterally=move_laterally,
min_dist=min_dist,
max_dist=max_dist,
)
self.shuffle = shuffle

def get_mask(self) -> np.ndarray:
# Create mask from occupancy grid enlarged by max_dist
dilation_size = int(self.max_dist / self.track.spec.resolution)
kernel = np.ones((dilation_size, dilation_size), np.uint8)
inverted_occ_map = (255 - self.track.occupancy_map)
dilated = cv2.dilate(inverted_occ_map, kernel, iterations=1)
dilated_inverted = (255 - dilated)
return dilated_inverted == 255

def sample(self) -> np.ndarray:
poses = super().sample()

if self.shuffle:
np.random.shuffle(poses)

return poses
33 changes: 33 additions & 0 deletions f1tenth_gym/envs/reset/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,36 @@ def sample_around_waypoint(
)

return np.array(poses)

def sample_around_pose(
pose: np.ndarray,
n_agents: int,
min_dist: float,
max_dist: float,
) -> np.ndarray:
"""
Compute n poses around a given pose.
It iteratively samples the next agent within a distance range from the previous one.
Note: no guarantee that the agents are on the track nor that they are not colliding with the environment.
Args:
- pose: the initial pose
- n_agents: the number of agents
- min_dist: the minimum distance between two consecutive agents
- max_dist: the maximum distance between two consecutive agents
"""
current_pose = pose

poses = []
for i in range(n_agents):
x, y, theta = current_pose
pose = np.array([x, y, theta])
poses.append(pose)
# sample next pose
dist = np.random.uniform(min_dist, max_dist)
theta = np.random.uniform(-np.pi, np.pi)
x += dist * np.cos(theta)
y += dist * np.sin(theta)
current_pose = np.array([x, y, theta])

return np.array(poses)
94 changes: 82 additions & 12 deletions f1tenth_gym/envs/track/track.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,72 @@ def from_track_name(track: str, track_scale: float = 1.0) -> Track:
print(ex)
raise FileNotFoundError(f"It could not load track {track}") from ex

@staticmethod
def from_track_path(path: pathlib.Path, track_scale: float = 1.0) -> Track:
"""
Load track from track path.
Parameters
----------
path : pathlib.Path
path to the track yaml file
Returns
-------
Track
track object
Raises
------
FileNotFoundError
if the track cannot be loaded
"""
try:
if type(path) is str:
path = pathlib.Path(path)

track_spec = Track.load_spec(
track=path.stem, filespec=path
)
track_spec.resolution = track_spec.resolution * track_scale
track_spec.origin = (
track_spec.origin[0] * track_scale,
track_spec.origin[1] * track_scale,
track_spec.origin[2],
)

# load occupancy grid
# Image path is from path + image name from track_spec
image_path = path.parent / track_spec.image
image = Image.open(image_path).transpose(Transpose.FLIP_TOP_BOTTOM)
occupancy_map = np.array(image).astype(np.float32)
occupancy_map[occupancy_map <= 128] = 0.0
occupancy_map[occupancy_map > 128] = 255.0

# if exists, load centerline
if (path / f"{path.stem}_centerline.csv").exists():
centerline = Raceline.from_centerline_file(path / f"{path.stem}_centerline.csv")
else:
centerline = None

# if exists, load raceline
if (path / f"{path.stem}_raceline.csv").exists():
raceline = Raceline.from_raceline_file(path / f"{path.stem}_raceline.csv")
else:
raceline = centerline

return Track(
spec=track_spec,
filepath=str(path.absolute()),
ext=image_path.suffix,
occupancy_map=occupancy_map,
centerline=centerline,
raceline=raceline,
)
except Exception as ex:
print(ex)
raise FileNotFoundError(f"It could not load track {path}") from ex

@staticmethod
def from_refline(x: np.ndarray, y: np.ndarray, velx: np.ndarray):
"""
Expand Down Expand Up @@ -249,7 +315,7 @@ def from_refline(x: np.ndarray, y: np.ndarray, velx: np.ndarray):
centerline=refline,
)

def frenet_to_cartesian(self, s, ey, ephi):
def frenet_to_cartesian(self, s, ey, ephi, use_raceline=False):
"""
Convert Frenet coordinates to Cartesian coordinates.
Expand All @@ -262,8 +328,9 @@ def frenet_to_cartesian(self, s, ey, ephi):
y: y-coordinate
psi: yaw angle
"""
x, y = self.centerline.spline.calc_position(s)
psi = self.centerline.spline.calc_yaw(s)
line = self.raceline if use_raceline else self.centerline
x, y = line.spline.calc_position(s)
psi = line.spline.calc_yaw(s)

# Adjust x,y by shifting along the normal vector
x -= ey * np.sin(psi)
Expand All @@ -274,7 +341,7 @@ def frenet_to_cartesian(self, s, ey, ephi):

return x, y, psi

def cartesian_to_frenet(self, x, y, phi, s_guess=0):
def cartesian_to_frenet(self, x, y, phi, use_raceline=False, s_guess=0):
"""
Convert Cartesian coordinates to Frenet coordinates.
Expand All @@ -287,22 +354,25 @@ def cartesian_to_frenet(self, x, y, phi, s_guess=0):
ey: lateral deviation
ephi: heading deviation
"""
s, ey = self.centerline.spline.calc_arclength_inaccurate(x, y)
if s > self.centerline.spline.s[-1]:
line = self.raceline if use_raceline else self.centerline
s, ey = line.spline.calc_arclength_inaccurate(x, y)
if s > line.spline.s[-1]:
# Wrap around
s = s - self.centerline.spline.s[-1]
s = s - line.spline.s[-1]
if s < 0:
# Negative s means we are behind the start point
s = s + self.centerline.spline.s[-1]
s = s + line.spline.s[-1]

# Use the normal to calculate the signed lateral deviation
normal = self.centerline.spline._calc_normal(s)
x_eval, y_eval = self.centerline.spline.calc_position(s)
normal = line.spline._calc_normal(s)
x_eval, y_eval = line.spline.calc_position(s)
dx = x - x_eval
dy = y - y_eval
distance_sign = np.sign(np.dot([dx, dy], normal))
ey = ey * distance_sign

phi = phi - self.centerline.spline.calc_yaw(s)
ephi = phi - line.spline.calc_yaw(s)
# ephi is unbouded, so we need to wrap it to [-pi, pi]
ephi = (ephi + np.pi) % (2 * np.pi) - np.pi

return s, ey, phi
return s, ey, ephi

0 comments on commit a481224

Please sign in to comment.