Skip to content

Commit

Permalink
Add doc strings to motion planning module
Browse files Browse the repository at this point in the history
  • Loading branch information
niklasr22 committed Nov 1, 2024
1 parent c65875e commit 4c5fdc3
Showing 1 changed file with 62 additions and 20 deletions.
82 changes: 62 additions & 20 deletions code/planning/src/local_planner/motion_planning.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,8 @@

class MotionPlanning(CompatibleNode):
"""
This node selects speeds according to the behavior in the Decision Tree
and the ACC.
Later this Node should compute a local Trajectory and forward
it to the Acting.
This node selects speeds based on the current behaviour and ACC to forward to the acting components.
It also handles the generation of trajectories for overtaking maneuvers.
"""

def __init__(self):
Expand Down Expand Up @@ -235,12 +233,27 @@ def change_trajectory(self, distance_obj):
pose_list = self.trajectory.poses

# Only use fallback
self.overtake_fallback(distance_obj, pose_list)
self.generate_overtake_trajectory(distance_obj, pose_list)
self.__overtake_status = 1
self.overtake_success_pub.publish(self.__overtake_status)
return

def overtake_fallback(self, distance, pose_list, unstuck=False):
def generate_overtake_trajectory(self, distance, pose_list, unstuck=False):
"""
Generates a trajectory for overtaking maneuvers.
This method creates a new trajectory for the vehicle to follow when an overtaking maneuver is required.
It adjusts the waypoints based on the current waypoint and distance, and applies an offset to the waypoints
to create a path that avoids obstacles or gets the vehicle unstuck.
Args:
distance (int): The distance over which the overtaking maneuver should be planned.
pose_list (list): A list of PoseStamped objects representing the current planned path.
unstuck (bool, optional): A flag indicating whether the vehicle is stuck and requires a larger offset to get unstuck. Defaults to False.
Returns:
None: The method updates the self.trajectory attribute with the new path.
"""
currentwp = self.current_wp
normal_x_offset = 2
unstuck_x_offset = 3 # could need adjustment with better steering
Expand Down Expand Up @@ -306,9 +319,21 @@ def __set_trajectory(self, data: Path):
"""
self.trajectory = data
self.loginfo("Trajectory received")

self.__corners = self.__calc_corner_points()

def __calc_corner_points(self):
def __calc_corner_points(self) -> list[list[np.ndarray]]:
"""
Calculate the corner points of the trajectory.
This method converts the poses in the trajectory to an array of coordinates,
calculates the angles between consecutive points, and identifies the points
where there is a significant change in the angle, indicating a corner or curve.
The corner points are then grouped by proximity and returned.
Returns:
list: A list of lists, where each sublist contains 2D points that form a corner.
"""
coords = self.convert_pose_to_array(np.array(self.trajectory.poses))
x_values = np.array([point[0] for point in coords])
y_values = np.array([point[1] for point in coords])
Expand All @@ -320,13 +345,24 @@ def __calc_corner_points(self):
threshold = 1 # in degree
curve_change_indices = np.where(np.abs(np.diff(angles)) > threshold)[0]

sublist = self.create_sublists(curve_change_indices, proximity=5)
sublist = self.group_points_by_proximity(curve_change_indices, proximity=5)

coords_of_curve = [coords[i] for i in sublist]

return coords_of_curve

def create_sublists(self, points, proximity=5):
def group_points_by_proximity(self, points, proximity=5):
"""
Groups a list of points into sublists based on their proximity to each other.
Args:
points (list): A list of points to be grouped.
proximity (int, optional): The maximum distance between points in a sublist. Defaults to 5.
Returns:
list: A list of sublists, where each sublist contains points that are within the specified proximity of each other.
Sublists with only one point are filtered out.
"""
sublists = []
current_sublist = []

Expand Down Expand Up @@ -395,14 +431,14 @@ def map_corner(dist):
return self.__get_speed_cruise()

@staticmethod
def convert_pose_to_array(poses: np.array):
"""convert pose array to numpy array
def convert_pose_to_array(poses: np.ndarray) -> np.ndarray:
"""Convert an array of PoseStamped objects to a numpy array of positions.
Args:
poses (np.array): pose array
poses (np.ndarray): Array of PoseStamped objects.
Returns:
np.array: numpy array
np.ndarray: Numpy array of shape (n, 2) containing the x and y positions.
"""
result_array = np.empty((len(poses), 2))
for pose in range(len(poses)):
Expand All @@ -424,6 +460,11 @@ def __check_emergency(self, data: Bool):
self.emergency_pub.publish(data)

def update_target_speed(self, acc_speed, behavior):
"""
Updates the target velocity based on the current behavior and ACC velocity and overtake status and publishes it.
The unit of the velocity is m/s.
"""

be_speed = self.get_speed_by_behavior(behavior)
if behavior == bs.parking.name or self.__overtake_status == 1:
self.target_speed = be_speed
Expand All @@ -439,6 +480,10 @@ def __set_acc_speed(self, data: Float32):
self.__acc_speed = data.data

def __set_curr_behavior(self, data: String):
"""
Sets the received current behavior of the vehicle.
If the behavior is an overtake behavior, a trajectory change is triggered.
"""
self.__curr_behavior = data.data
if data.data == bs.ot_enter_init.name:
if np.isinf(self.__collision_point):
Expand Down Expand Up @@ -507,7 +552,9 @@ def __get_speed_unstuck(self, behavior: str) -> float:
# create overtake trajectory starting 6 meteres before
# the obstacle
# 6 worked well in tests, but can be adjusted
self.overtake_fallback(self.unstuck_distance, pose_list, unstuck=True)
self.generate_overtake_trajectory(
self.unstuck_distance, pose_list, unstuck=True
)
self.logfatal("Overtake Trajectory while unstuck!")
self.unstuck_overtake_flag = True
self.init_overtake_pos = self.current_pos[:2]
Expand Down Expand Up @@ -628,8 +675,7 @@ def __calc_virtual_overtake(self) -> float:

def run(self):
"""
Control loop
:return:
Control loop that updates the target speed and publishes the target trajectory and speed over ROS topics.
"""

def loop(timer_event=None):
Expand All @@ -649,10 +695,6 @@ def loop(timer_event=None):


if __name__ == "__main__":
"""
main function starts the MotionPlanning node
:param args:
"""
roscomp.init("MotionPlanning")
try:
node = MotionPlanning()
Expand Down

0 comments on commit 4c5fdc3

Please sign in to comment.