From 4c5fdc352d35c5a2f58a57b15e546d56abbb9039 Mon Sep 17 00:00:00 2001 From: Niklas Rousset Date: Fri, 1 Nov 2024 18:53:56 +0100 Subject: [PATCH] Add doc strings to motion planning module --- .../src/local_planner/motion_planning.py | 82 ++++++++++++++----- 1 file changed, 62 insertions(+), 20 deletions(-) diff --git a/code/planning/src/local_planner/motion_planning.py b/code/planning/src/local_planner/motion_planning.py index 48abf69b..74aac44d 100755 --- a/code/planning/src/local_planner/motion_planning.py +++ b/code/planning/src/local_planner/motion_planning.py @@ -30,10 +30,8 @@ class MotionPlanning(CompatibleNode): """ - This node selects speeds according to the behavior in the Decision Tree - and the ACC. - Later this Node should compute a local Trajectory and forward - it to the Acting. + This node selects speeds based on the current behaviour and ACC to forward to the acting components. + It also handles the generation of trajectories for overtaking maneuvers. """ def __init__(self): @@ -235,12 +233,27 @@ def change_trajectory(self, distance_obj): pose_list = self.trajectory.poses # Only use fallback - self.overtake_fallback(distance_obj, pose_list) + self.generate_overtake_trajectory(distance_obj, pose_list) self.__overtake_status = 1 self.overtake_success_pub.publish(self.__overtake_status) return - def overtake_fallback(self, distance, pose_list, unstuck=False): + def generate_overtake_trajectory(self, distance, pose_list, unstuck=False): + """ + Generates a trajectory for overtaking maneuvers. + + This method creates a new trajectory for the vehicle to follow when an overtaking maneuver is required. + It adjusts the waypoints based on the current waypoint and distance, and applies an offset to the waypoints + to create a path that avoids obstacles or gets the vehicle unstuck. + + Args: + distance (int): The distance over which the overtaking maneuver should be planned. + pose_list (list): A list of PoseStamped objects representing the current planned path. + unstuck (bool, optional): A flag indicating whether the vehicle is stuck and requires a larger offset to get unstuck. Defaults to False. + + Returns: + None: The method updates the self.trajectory attribute with the new path. + """ currentwp = self.current_wp normal_x_offset = 2 unstuck_x_offset = 3 # could need adjustment with better steering @@ -306,9 +319,21 @@ def __set_trajectory(self, data: Path): """ self.trajectory = data self.loginfo("Trajectory received") + self.__corners = self.__calc_corner_points() - def __calc_corner_points(self): + def __calc_corner_points(self) -> list[list[np.ndarray]]: + """ + Calculate the corner points of the trajectory. + + This method converts the poses in the trajectory to an array of coordinates, + calculates the angles between consecutive points, and identifies the points + where there is a significant change in the angle, indicating a corner or curve. + The corner points are then grouped by proximity and returned. + + Returns: + list: A list of lists, where each sublist contains 2D points that form a corner. + """ coords = self.convert_pose_to_array(np.array(self.trajectory.poses)) x_values = np.array([point[0] for point in coords]) y_values = np.array([point[1] for point in coords]) @@ -320,13 +345,24 @@ def __calc_corner_points(self): threshold = 1 # in degree curve_change_indices = np.where(np.abs(np.diff(angles)) > threshold)[0] - sublist = self.create_sublists(curve_change_indices, proximity=5) + sublist = self.group_points_by_proximity(curve_change_indices, proximity=5) coords_of_curve = [coords[i] for i in sublist] return coords_of_curve - def create_sublists(self, points, proximity=5): + def group_points_by_proximity(self, points, proximity=5): + """ + Groups a list of points into sublists based on their proximity to each other. + + Args: + points (list): A list of points to be grouped. + proximity (int, optional): The maximum distance between points in a sublist. Defaults to 5. + + Returns: + list: A list of sublists, where each sublist contains points that are within the specified proximity of each other. + Sublists with only one point are filtered out. + """ sublists = [] current_sublist = [] @@ -395,14 +431,14 @@ def map_corner(dist): return self.__get_speed_cruise() @staticmethod - def convert_pose_to_array(poses: np.array): - """convert pose array to numpy array + def convert_pose_to_array(poses: np.ndarray) -> np.ndarray: + """Convert an array of PoseStamped objects to a numpy array of positions. Args: - poses (np.array): pose array + poses (np.ndarray): Array of PoseStamped objects. Returns: - np.array: numpy array + np.ndarray: Numpy array of shape (n, 2) containing the x and y positions. """ result_array = np.empty((len(poses), 2)) for pose in range(len(poses)): @@ -424,6 +460,11 @@ def __check_emergency(self, data: Bool): self.emergency_pub.publish(data) def update_target_speed(self, acc_speed, behavior): + """ + Updates the target velocity based on the current behavior and ACC velocity and overtake status and publishes it. + The unit of the velocity is m/s. + """ + be_speed = self.get_speed_by_behavior(behavior) if behavior == bs.parking.name or self.__overtake_status == 1: self.target_speed = be_speed @@ -439,6 +480,10 @@ def __set_acc_speed(self, data: Float32): self.__acc_speed = data.data def __set_curr_behavior(self, data: String): + """ + Sets the received current behavior of the vehicle. + If the behavior is an overtake behavior, a trajectory change is triggered. + """ self.__curr_behavior = data.data if data.data == bs.ot_enter_init.name: if np.isinf(self.__collision_point): @@ -507,7 +552,9 @@ def __get_speed_unstuck(self, behavior: str) -> float: # create overtake trajectory starting 6 meteres before # the obstacle # 6 worked well in tests, but can be adjusted - self.overtake_fallback(self.unstuck_distance, pose_list, unstuck=True) + self.generate_overtake_trajectory( + self.unstuck_distance, pose_list, unstuck=True + ) self.logfatal("Overtake Trajectory while unstuck!") self.unstuck_overtake_flag = True self.init_overtake_pos = self.current_pos[:2] @@ -628,8 +675,7 @@ def __calc_virtual_overtake(self) -> float: def run(self): """ - Control loop - :return: + Control loop that updates the target speed and publishes the target trajectory and speed over ROS topics. """ def loop(timer_event=None): @@ -649,10 +695,6 @@ def loop(timer_event=None): if __name__ == "__main__": - """ - main function starts the MotionPlanning node - :param args: - """ roscomp.init("MotionPlanning") try: node = MotionPlanning()